You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/02/20 02:17:07 UTC

git commit: Making minor changes to YarnProvisioner to maintain a fixed number of containers

Repository: helix
Updated Branches:
  refs/heads/helix-provisioning d1e7ca604 -> cb6aa4fa0


Making minor changes to YarnProvisioner to maintain a fixed number of containers


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/cb6aa4fa
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/cb6aa4fa
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/cb6aa4fa

Branch: refs/heads/helix-provisioning
Commit: cb6aa4fa0e82436f1d6714c3cdcf1435c510024a
Parents: d1e7ca6
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Wed Feb 19 17:16:55 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Wed Feb 19 17:16:55 2014 -0800

----------------------------------------------------------------------
 .../controller/provisioner/ContainerSpec.java   | 18 ++++--
 .../stages/ContainerProvisioningStage.java      |  2 +-
 .../provisioning/yarn/YarnProvisioner.java      | 63 ++++++++++++++------
 3 files changed, 58 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/cb6aa4fa/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
index b393a64..4d3a521 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
@@ -23,23 +23,31 @@ public class ContainerSpec {
   /**
    * Some unique id representing the container.
    */
-  ContainerId containerId;
+  ContainerId _containerId;
   
-  String memory;
+  int _memory;
 
   public ContainerSpec(ContainerId containerId) {
-    this.containerId = containerId;
+    this._containerId = containerId;
   }
 
   public ContainerId getContainerId() {
-    return containerId;
+    return _containerId;
   }
 
   @Override
   public String toString() {
-    return containerId.toString();
+    return _containerId.toString();
+  }
+  
+  public void setMemory(int memory){
+    _memory = memory;
   }
 
+  public int getMemory(){
+    return _memory;
+  }
+  
   public static ContainerSpec from(String serialized) {
     return new ContainerSpec(ContainerId.from(serialized));
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/cb6aa4fa/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 2f97c5a..48166bf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -123,7 +123,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
         // allocate new containers
         for (final ContainerSpec spec : response.getContainersToAcquire()) {
           // random participant id
-          final ParticipantId participantId = ParticipantId.from(UUID.randomUUID().toString());
+          final ParticipantId participantId = ParticipantId.from(spec.getContainerId().stringify());
           // create a new Participant, attach the container spec
           InstanceConfig instanceConfig = new InstanceConfig(participantId);
           instanceConfig.setContainerSpec(spec);

http://git-wip-us.apache.org/repos/asf/helix/blob/cb6aa4fa/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index 477023b..4fcc219 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -8,8 +8,10 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Vector;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -44,7 +46,9 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.config.ContainerConfig;
+import org.apache.helix.api.config.ParticipantConfig;
 import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.controller.provisioner.ContainerId;
 import org.apache.helix.controller.provisioner.ContainerProvider;
@@ -54,6 +58,7 @@ import org.apache.helix.controller.provisioner.Provisioner;
 import org.apache.helix.controller.provisioner.ProvisionerConfig;
 import org.apache.helix.controller.provisioner.TargetProvider;
 import org.apache.helix.controller.provisioner.TargetProviderResponse;
+import org.apache.helix.model.InstanceConfig;
 
 import com.google.common.collect.Lists;
 import com.google.common.base.Function;
@@ -73,8 +78,9 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
   Map<ContainerId, Container> allocatedContainersMap = new HashMap<ContainerId, Container>();
   private HelixManager _helixManager;
   private ResourceConfig _resourceConfig;
-  public YarnProvisioner(){
-    
+
+  public YarnProvisioner() {
+
   }
 
   @Override
@@ -109,7 +115,8 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
   }
 
   @Override
-  public ListenableFuture<Boolean> startContainer(final ContainerId containerId, Participant participant) {
+  public ListenableFuture<Boolean> startContainer(final ContainerId containerId,
+      Participant participant) {
     Container container = allocatedContainersMap.get(containerId);
     ContainerLaunchContext launchContext;
     try {
@@ -128,11 +135,12 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
     }, service);
   }
 
-  private ContainerLaunchContext createLaunchContext(ContainerId containerId, Container container, Participant participant) throws Exception {
+  private ContainerLaunchContext createLaunchContext(ContainerId containerId, Container container,
+      Participant participant) throws Exception {
 
     ContainerLaunchContext participantContainer = Records.newRecord(ContainerLaunchContext.class);
 
-//    Map<String, String> envs = System.getenv();
+    // Map<String, String> envs = System.getenv();
     String appName = applicationMasterConfig.getAppName();
     int appId = applicationMasterConfig.getAppId();
     String serviceName = _resourceConfig.getId().stringify();
@@ -166,7 +174,7 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
     // resource the client intended to use with the application
     servicePackageResource.setTimestamp(destStatus.getModificationTime());
     servicePackageResource.setSize(destStatus.getLen());
-    LOG.info("Setting local resource:" + servicePackageResource + " for service" + serviceName );
+    LOG.info("Setting local resource:" + servicePackageResource + " for service" + serviceName);
     localResources.put(serviceName, servicePackageResource);
 
     // Set local resource info into app master container launch context
@@ -195,7 +203,7 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
       classPathEnv.append(c.trim());
     }
     classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties");
-    LOG.info("Setting classpath for service:\n"+ classPathEnv.toString());
+    LOG.info("Setting classpath for service:\n" + classPathEnv.toString());
     env.put("CLASSPATH", classPathEnv.toString());
 
     participantContainer.setEnvironment(env);
@@ -214,8 +222,8 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
     vargs.add("--zkAddress " + zkAddress);
     vargs.add("--cluster " + appName);
     vargs.add("--participantId " + participant.getId().stringify());
-    vargs.add("--participantClass " + mainClass);;
-    
+    vargs.add("--participantClass " + mainClass);
+    ;
 
     vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout");
     vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr");
@@ -226,7 +234,8 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
       command.append(str).append(" ");
     }
 
-    LOG.info("Completed setting up  container launch command " + command.toString() + " with arguments \n" + vargs);
+    LOG.info("Completed setting up  container launch command " + command.toString()
+        + " with arguments \n" + vargs);
     List<String> commands = new ArrayList<String>();
     commands.add(command.toString());
     participantContainer.setCommands(commands);
@@ -260,13 +269,13 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
     List<Participant> containersToStart = Lists.newArrayList();
     List<Participant> containersToRelease = Lists.newArrayList();
     List<Participant> containersToStop = Lists.newArrayList();
-    YarnProvisionerConfig  provisionerConfig = (YarnProvisionerConfig) cluster.getConfig().getResourceMap().get(resourceId).getProvisionerConfig();
+    YarnProvisionerConfig provisionerConfig =
+        (YarnProvisionerConfig) cluster.getConfig().getResourceMap().get(resourceId)
+            .getProvisionerConfig();
     int targetNumContainers = provisionerConfig.getNumContainers();
-    for (int i = 0; i < targetNumContainers - participants.size(); i++) {
-      containersToAcquire.add(new ContainerSpec(ContainerId.from("container"
-          + (targetNumContainers - i))));
-    }
-    response.setContainersToAcquire(containersToAcquire);
+
+    Set<ContainerId> existingContainersIdSet = new HashSet<ContainerId>();
+    
 
     for (Participant participant : participants) {
       ContainerConfig containerConfig = participant.getContainerConfig();
@@ -278,17 +287,20 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
           containersToStart.add(participant);
           break;
         case ACTIVE:
-
+          existingContainersIdSet.add(containerConfig.getId());
           break;
         case HALTED:
           // halted containers can be released
-          // containersToRelease.add(participant);
+          containersToRelease.add(participant);
           break;
         case ACQUIRING:
+          existingContainersIdSet.add(containerConfig.getId());
           break;
         case CONNECTING:
           break;
         case FAILED:
+          //remove the failed instance
+          _helixManager.getClusterManagmentTool().dropInstance(cluster.getId().toString(), new InstanceConfig(participant.getId()));
           break;
         case FINALIZED:
           break;
@@ -306,6 +318,19 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
         }
       }
     }
+    
+    for (int i = 0; i < targetNumContainers; i++) {
+      ContainerId containerId = ContainerId.from(resourceId + "_container_" + (i));
+      if(!existingContainersIdSet.contains(containerId)){
+        ContainerSpec containerSpec = new ContainerSpec(containerId);
+        ParticipantId participantId = ParticipantId.from(containerId.stringify());
+        ParticipantConfig participantConfig = applicationSpec.getParticipantConfig(resourceId.stringify(), participantId);
+        containerSpec.setMemory(participantConfig.getUserConfig().getIntField("memory", 1024));
+        containersToAcquire.add(containerSpec);
+      }
+    }
+    
+    response.setContainersToAcquire(containersToAcquire);
     response.setContainersToStart(containersToStart);
     response.setContainersToRelease(containersToRelease);
     response.setContainersToStop(containersToStop);
@@ -326,7 +351,7 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
     // Set up resource type requirements
     // For now, only memory is supported so we set memory requirements
     Resource capability = Records.newRecord(Resource.class);
-    int memory = 1024;
+    int memory = spec.getMemory();
     capability.setMemory(memory);
 
     ContainerRequest request = new ContainerRequest(capability, null, null, pri);