You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/02/20 02:17:07 UTC
git commit: Making minor changes to YarnProvisioner to maintain a
fixed number of containers
Repository: helix
Updated Branches:
refs/heads/helix-provisioning d1e7ca604 -> cb6aa4fa0
Making minor changes to YarnProvisioner to maintain a fixed number of containers
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/cb6aa4fa
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/cb6aa4fa
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/cb6aa4fa
Branch: refs/heads/helix-provisioning
Commit: cb6aa4fa0e82436f1d6714c3cdcf1435c510024a
Parents: d1e7ca6
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Wed Feb 19 17:16:55 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Wed Feb 19 17:16:55 2014 -0800
----------------------------------------------------------------------
.../controller/provisioner/ContainerSpec.java | 18 ++++--
.../stages/ContainerProvisioningStage.java | 2 +-
.../provisioning/yarn/YarnProvisioner.java | 63 ++++++++++++++------
3 files changed, 58 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/cb6aa4fa/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
index b393a64..4d3a521 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
@@ -23,23 +23,31 @@ public class ContainerSpec {
/**
* Some unique id representing the container.
*/
- ContainerId containerId;
+ ContainerId _containerId;
- String memory;
+ int _memory;
public ContainerSpec(ContainerId containerId) {
- this.containerId = containerId;
+ this._containerId = containerId;
}
public ContainerId getContainerId() {
- return containerId;
+ return _containerId;
}
@Override
public String toString() {
- return containerId.toString();
+ return _containerId.toString();
+ }
+
+ public void setMemory(int memory){
+ _memory = memory;
}
+ public int getMemory(){
+ return _memory;
+ }
+
public static ContainerSpec from(String serialized) {
return new ContainerSpec(ContainerId.from(serialized));
}
http://git-wip-us.apache.org/repos/asf/helix/blob/cb6aa4fa/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 2f97c5a..48166bf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -123,7 +123,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
// allocate new containers
for (final ContainerSpec spec : response.getContainersToAcquire()) {
// random participant id
- final ParticipantId participantId = ParticipantId.from(UUID.randomUUID().toString());
+ final ParticipantId participantId = ParticipantId.from(spec.getContainerId().stringify());
// create a new Participant, attach the container spec
InstanceConfig instanceConfig = new InstanceConfig(participantId);
instanceConfig.setContainerSpec(spec);
http://git-wip-us.apache.org/repos/asf/helix/blob/cb6aa4fa/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index 477023b..4fcc219 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -8,8 +8,10 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -44,7 +46,9 @@ import org.apache.helix.HelixManager;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
import org.apache.helix.api.config.ContainerConfig;
+import org.apache.helix.api.config.ParticipantConfig;
import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.controller.provisioner.ContainerId;
import org.apache.helix.controller.provisioner.ContainerProvider;
@@ -54,6 +58,7 @@ import org.apache.helix.controller.provisioner.Provisioner;
import org.apache.helix.controller.provisioner.ProvisionerConfig;
import org.apache.helix.controller.provisioner.TargetProvider;
import org.apache.helix.controller.provisioner.TargetProviderResponse;
+import org.apache.helix.model.InstanceConfig;
import com.google.common.collect.Lists;
import com.google.common.base.Function;
@@ -73,8 +78,9 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
Map<ContainerId, Container> allocatedContainersMap = new HashMap<ContainerId, Container>();
private HelixManager _helixManager;
private ResourceConfig _resourceConfig;
- public YarnProvisioner(){
-
+
+ public YarnProvisioner() {
+
}
@Override
@@ -109,7 +115,8 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
}
@Override
- public ListenableFuture<Boolean> startContainer(final ContainerId containerId, Participant participant) {
+ public ListenableFuture<Boolean> startContainer(final ContainerId containerId,
+ Participant participant) {
Container container = allocatedContainersMap.get(containerId);
ContainerLaunchContext launchContext;
try {
@@ -128,11 +135,12 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
}, service);
}
- private ContainerLaunchContext createLaunchContext(ContainerId containerId, Container container, Participant participant) throws Exception {
+ private ContainerLaunchContext createLaunchContext(ContainerId containerId, Container container,
+ Participant participant) throws Exception {
ContainerLaunchContext participantContainer = Records.newRecord(ContainerLaunchContext.class);
-// Map<String, String> envs = System.getenv();
+ // Map<String, String> envs = System.getenv();
String appName = applicationMasterConfig.getAppName();
int appId = applicationMasterConfig.getAppId();
String serviceName = _resourceConfig.getId().stringify();
@@ -166,7 +174,7 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
// resource the client intended to use with the application
servicePackageResource.setTimestamp(destStatus.getModificationTime());
servicePackageResource.setSize(destStatus.getLen());
- LOG.info("Setting local resource:" + servicePackageResource + " for service" + serviceName );
+ LOG.info("Setting local resource:" + servicePackageResource + " for service" + serviceName);
localResources.put(serviceName, servicePackageResource);
// Set local resource info into app master container launch context
@@ -195,7 +203,7 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
classPathEnv.append(c.trim());
}
classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties");
- LOG.info("Setting classpath for service:\n"+ classPathEnv.toString());
+ LOG.info("Setting classpath for service:\n" + classPathEnv.toString());
env.put("CLASSPATH", classPathEnv.toString());
participantContainer.setEnvironment(env);
@@ -214,8 +222,8 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
vargs.add("--zkAddress " + zkAddress);
vargs.add("--cluster " + appName);
vargs.add("--participantId " + participant.getId().stringify());
- vargs.add("--participantClass " + mainClass);;
-
+ vargs.add("--participantClass " + mainClass);
+ ;
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr");
@@ -226,7 +234,8 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
command.append(str).append(" ");
}
- LOG.info("Completed setting up container launch command " + command.toString() + " with arguments \n" + vargs);
+ LOG.info("Completed setting up container launch command " + command.toString()
+ + " with arguments \n" + vargs);
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
participantContainer.setCommands(commands);
@@ -260,13 +269,13 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
List<Participant> containersToStart = Lists.newArrayList();
List<Participant> containersToRelease = Lists.newArrayList();
List<Participant> containersToStop = Lists.newArrayList();
- YarnProvisionerConfig provisionerConfig = (YarnProvisionerConfig) cluster.getConfig().getResourceMap().get(resourceId).getProvisionerConfig();
+ YarnProvisionerConfig provisionerConfig =
+ (YarnProvisionerConfig) cluster.getConfig().getResourceMap().get(resourceId)
+ .getProvisionerConfig();
int targetNumContainers = provisionerConfig.getNumContainers();
- for (int i = 0; i < targetNumContainers - participants.size(); i++) {
- containersToAcquire.add(new ContainerSpec(ContainerId.from("container"
- + (targetNumContainers - i))));
- }
- response.setContainersToAcquire(containersToAcquire);
+
+ Set<ContainerId> existingContainersIdSet = new HashSet<ContainerId>();
+
for (Participant participant : participants) {
ContainerConfig containerConfig = participant.getContainerConfig();
@@ -278,17 +287,20 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
containersToStart.add(participant);
break;
case ACTIVE:
-
+ existingContainersIdSet.add(containerConfig.getId());
break;
case HALTED:
// halted containers can be released
- // containersToRelease.add(participant);
+ containersToRelease.add(participant);
break;
case ACQUIRING:
+ existingContainersIdSet.add(containerConfig.getId());
break;
case CONNECTING:
break;
case FAILED:
+ //remove the failed instance
+ _helixManager.getClusterManagmentTool().dropInstance(cluster.getId().toString(), new InstanceConfig(participant.getId()));
break;
case FINALIZED:
break;
@@ -306,6 +318,19 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
}
}
}
+
+ for (int i = 0; i < targetNumContainers; i++) {
+ ContainerId containerId = ContainerId.from(resourceId + "_container_" + (i));
+ if(!existingContainersIdSet.contains(containerId)){
+ ContainerSpec containerSpec = new ContainerSpec(containerId);
+ ParticipantId participantId = ParticipantId.from(containerId.stringify());
+ ParticipantConfig participantConfig = applicationSpec.getParticipantConfig(resourceId.stringify(), participantId);
+ containerSpec.setMemory(participantConfig.getUserConfig().getIntField("memory", 1024));
+ containersToAcquire.add(containerSpec);
+ }
+ }
+
+ response.setContainersToAcquire(containersToAcquire);
response.setContainersToStart(containersToStart);
response.setContainersToRelease(containersToRelease);
response.setContainersToStop(containersToStop);
@@ -326,7 +351,7 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
// Set up resource type requirements
// For now, only memory is supported so we set memory requirements
Resource capability = Records.newRecord(Resource.class);
- int memory = 1024;
+ int memory = spec.getMemory();
capability.setMemory(memory);
ContainerRequest request = new ContainerRequest(capability, null, null, pri);