You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/10 19:04:48 UTC

[05/50] [abbrv] git commit: Made container states more consistent, changed yarn target provider logic

Made container states more consistent, changed yarn target provider logic


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/57b4b180
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/57b4b180
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/57b4b180

Branch: refs/heads/master
Commit: 57b4b180e0c0b7f3ae0c21191af1f72bca61732f
Parents: cb6aa4f
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Feb 19 18:58:00 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Feb 19 18:58:00 2014 -0800

----------------------------------------------------------------------
 .../controller/provisioner/ContainerState.java  |  5 +-
 .../stages/ContainerProvisioningStage.java      |  9 ++-
 .../integration/TestLocalContainerProvider.java |  4 +-
 .../provisioning/yarn/YarnProvisioner.java      | 83 ++++++++++++--------
 4 files changed, 61 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
index cf4b736..449f636 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
@@ -23,8 +23,9 @@ public enum ContainerState {
   ACQUIRING,
   ACQUIRED,
   CONNECTING,
-  ACTIVE,
-  TEARDOWN,
+  CONNECTED,
+  DISCONNECTED,
+  HALTING,
   HALTED,
   FINALIZING,
   FINALIZED,

http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 48166bf..42c8218 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -22,7 +22,6 @@ package org.apache.helix.controller.stages;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.UUID;
 
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
@@ -166,12 +165,13 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
           accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
           // create the helix participant and add it to cluster
-          ListenableFuture<Boolean> future = containerProvider.startContainer(containerId, participant);
+          ListenableFuture<Boolean> future =
+              containerProvider.startContainer(containerId, participant);
           FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {
               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
-                  ContainerState.ACTIVE);
+                  ContainerState.CONNECTED);
             }
 
             @Override
@@ -225,7 +225,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
                   .toString());
           final ContainerId containerId = existingInstance.getContainerId();
           existingInstance.setInstanceEnabled(false);
-          existingInstance.setContainerState(ContainerState.TEARDOWN);
+          existingInstance.setContainerState(ContainerState.HALTING);
           accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
           // stop the container
@@ -267,6 +267,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
     InstanceConfig existingInstance =
         helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
     existingInstance.setContainerState(state);
+    existingInstance.setInstanceEnabled(state.equals(ContainerState.CONNECTED));
     accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
index 0e4c803..0f7be64 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
@@ -250,7 +250,7 @@ public class TestLocalContainerProvider extends ZkUnitTestBase {
       participantService.startAsync();
       participantService.awaitRunning();
       _participants.put(containerId, participantService);
-      _states.put(containerId, ContainerState.ACTIVE);
+      _states.put(containerId, ContainerState.CONNECTED);
       started++;
       SettableFuture<Boolean> future = SettableFuture.create();
       future.set(true);
@@ -294,7 +294,7 @@ public class TestLocalContainerProvider extends ZkUnitTestBase {
             // acquired containers are ready to start
             containersToStart.add(participant);
             break;
-          case ACTIVE:
+          case CONNECTED:
             // stop at most two active at a time, wait for everything to be up first
             if (stopCount < 2 && _askCount >= MAX_PARTICIPANTS) {
               containersToStop.add(participant);

http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index 4fcc219..daac87b 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -1,10 +1,6 @@
 package org.apache.helix.provisioning.yarn;
 
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -13,22 +9,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Vector;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.Executors;
 
-import org.apache.commons.compress.archivers.ArchiveStreamFactory;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -55,13 +42,13 @@ import org.apache.helix.controller.provisioner.ContainerProvider;
 import org.apache.helix.controller.provisioner.ContainerSpec;
 import org.apache.helix.controller.provisioner.ContainerState;
 import org.apache.helix.controller.provisioner.Provisioner;
-import org.apache.helix.controller.provisioner.ProvisionerConfig;
 import org.apache.helix.controller.provisioner.TargetProvider;
 import org.apache.helix.controller.provisioner.TargetProviderResponse;
 import org.apache.helix.model.InstanceConfig;
 
-import com.google.common.collect.Lists;
 import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -274,39 +261,57 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
             .getProvisionerConfig();
     int targetNumContainers = provisionerConfig.getNumContainers();
 
+    // Any container that is in a state should be put in this set
     Set<ContainerId> existingContainersIdSet = new HashSet<ContainerId>();
-    
+
+    // Cache halted containers to determine which to restart and which to release
+    Map<ContainerId, Participant> excessHaltedContainers = Maps.newHashMap();
+
+    // Cache participants to ensure that excess participants are stopped
+    Map<ContainerId, Participant> excessActiveContainers = Maps.newHashMap();
 
     for (Participant participant : participants) {
       ContainerConfig containerConfig = participant.getContainerConfig();
       if (containerConfig != null && containerConfig.getState() != null) {
         ContainerState state = containerConfig.getState();
         switch (state) {
+        case ACQUIRING:
+          existingContainersIdSet.add(containerConfig.getId());
+          break;
         case ACQUIRED:
           // acquired containers are ready to start
+          existingContainersIdSet.add(containerConfig.getId());
           containersToStart.add(participant);
           break;
-        case ACTIVE:
+        case CONNECTING:
           existingContainersIdSet.add(containerConfig.getId());
           break;
-        case HALTED:
-          // halted containers can be released
-          containersToRelease.add(participant);
+        case CONNECTED:
+          // active containers can be stopped or kept active
+          existingContainersIdSet.add(containerConfig.getId());
+          excessActiveContainers.put(containerConfig.getId(), participant);
           break;
-        case ACQUIRING:
+        case DISCONNECTED:
+          // disconnected containers must be stopped
+          existingContainersIdSet.add(containerConfig.getId());
+          containersToStop.add(participant);
+        case HALTING:
           existingContainersIdSet.add(containerConfig.getId());
           break;
-        case CONNECTING:
+        case HALTED:
+          // halted containers can be released or restarted
+          existingContainersIdSet.add(containerConfig.getId());
+          excessHaltedContainers.put(containerConfig.getId(), participant);
           break;
-        case FAILED:
-          //remove the failed instance
-          _helixManager.getClusterManagmentTool().dropInstance(cluster.getId().toString(), new InstanceConfig(participant.getId()));
+        case FINALIZING:
+          existingContainersIdSet.add(containerConfig.getId());
           break;
         case FINALIZED:
           break;
-        case FINALIZING:
-          break;
-        case TEARDOWN:
+        case FAILED:
+          // remove the failed instance
+          _helixManager.getClusterManagmentTool().dropInstance(cluster.getId().toString(),
+              new InstanceConfig(participant.getId()));
           break;
         default:
           break;
@@ -318,18 +323,32 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
         }
       }
     }
-    
+
     for (int i = 0; i < targetNumContainers; i++) {
       ContainerId containerId = ContainerId.from(resourceId + "_container_" + (i));
-      if(!existingContainersIdSet.contains(containerId)){
+      excessActiveContainers.remove(containerId); // don't stop this container if active
+      if (excessHaltedContainers.containsKey(containerId)) {
+        // Halted containers can be restarted if necessary
+        Participant participant = excessHaltedContainers.get(containerId);
+        containersToStart.add(participant);
+        excessHaltedContainers.remove(containerId); // don't release this container
+      } else if (!existingContainersIdSet.contains(containerId)) {
+        // Unallocated containers must be allocated
         ContainerSpec containerSpec = new ContainerSpec(containerId);
         ParticipantId participantId = ParticipantId.from(containerId.stringify());
-        ParticipantConfig participantConfig = applicationSpec.getParticipantConfig(resourceId.stringify(), participantId);
+        ParticipantConfig participantConfig =
+            applicationSpec.getParticipantConfig(resourceId.stringify(), participantId);
         containerSpec.setMemory(participantConfig.getUserConfig().getIntField("memory", 1024));
         containersToAcquire.add(containerSpec);
       }
     }
-    
+
+    // Add all the containers that should be stopped because they fall outside the target range
+    containersToStop.addAll(excessActiveContainers.values());
+
+    // Add halted containers that should not be restarted
+    containersToRelease.addAll(excessHaltedContainers.values());
+
     response.setContainersToAcquire(containersToAcquire);
     response.setContainersToStart(containersToStart);
     response.setContainersToRelease(containersToRelease);