You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/02/21 21:41:36 UTC

git commit: Set future for release response when it comes

Repository: helix
Updated Branches:
  refs/heads/helix-provisioning 186283457 -> 64e153144


Set future for release response when it comes


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/64e15314
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/64e15314
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/64e15314

Branch: refs/heads/helix-provisioning
Commit: 64e153144c1b5602808a3f99f950f14465ca1e17
Parents: 1862834
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Feb 21 12:40:03 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Feb 21 12:40:03 2014 -0800

----------------------------------------------------------------------
 .../helix/controller/stages/ContainerProvisioningStage.java | 8 ++++++++
 .../apache/helix/provisioning/yarn/NMCallbackHandler.java   | 5 ++---
 .../apache/helix/provisioning/yarn/RMCallbackHandler.java   | 4 ++--
 .../org/apache/helix/provisioning/yarn/YarnProvisioner.java | 9 ++++-----
 4 files changed, 16 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/64e15314/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 5cccd68..f258525 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -155,10 +155,12 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
             // create the helix participant and add it to cluster
             helixAdmin.addInstance(cluster.getId().toString(), instanceConfig);
           }
+          LOG.info("Allocating container for " + participantId);
           ListenableFuture<ContainerId> future = containerProvider.allocateContainer(spec);
           FutureCallback<ContainerId> callback = new FutureCallback<ContainerId>() {
             @Override
             public void onSuccess(ContainerId containerId) {
+              LOG.info("Container " + containerId + " acquired. Marking " + participantId);
               InstanceConfig existingInstance =
                   helixAdmin
                       .getInstanceConfig(cluster.getId().toString(), participantId.toString());
@@ -188,12 +190,14 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
           accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
           // create the helix participant and add it to cluster
+          LOG.info("Starting container " + containerId + " for " + participant.getId());
           ListenableFuture<Boolean> future =
               containerProvider.startContainer(containerId, participant);
           FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {
               // Do nothing yet, need to wait for live instance
+              LOG.info("Container " + containerId + " started for " + participant.getId());
             }
 
             @Override
@@ -218,10 +222,12 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
           accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
           // remove the participant
+          LOG.info("Deallocating container " + containerId + " for " + participant.getId());
           ListenableFuture<Boolean> future = containerProvider.deallocateContainer(containerId);
           FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {
+              LOG.info("Container " + containerId + " deallocated. Dropping " + participant.getId());
               InstanceConfig existingInstance =
                   helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
                       .toString());
@@ -251,10 +257,12 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
           accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
           // stop the container
+          LOG.info("Stopping container " + containerId + " for " + participant.getId());
           ListenableFuture<Boolean> future = containerProvider.stopContainer(containerId);
           FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {
+              LOG.info("Container " + containerId + " stopped. Marking " + participant.getId());
               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
                   ContainerState.HALTED);
             }

http://git-wip-us.apache.org/repos/asf/helix/blob/64e15314/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
index da6c01f..1566c28 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
@@ -8,7 +8,6 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.log4j.Logger;
 
@@ -39,7 +38,7 @@ class NMCallbackHandler implements NMClientAsync.CallbackHandler {
       applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
     }
     SettableFuture<ContainerStopResponse> settableFuture =
-        applicationMaster.containerStopMap.get(containerId);
+        applicationMaster.containerStopMap.remove(containerId);
     ContainerStopResponse value = new ContainerStopResponse();
     settableFuture.set(value);
     containers.remove(containerId);
@@ -59,7 +58,7 @@ class NMCallbackHandler implements NMClientAsync.CallbackHandler {
       applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
     }
     SettableFuture<ContainerLaunchResponse> settableFuture =
-        applicationMaster.containerLaunchResponseMap.get(containerId);
+        applicationMaster.containerLaunchResponseMap.remove(containerId);
     ContainerLaunchResponse value = new ContainerLaunchResponse();
     settableFuture.set(value);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/64e15314/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
index dae28a8..fe2c854 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
@@ -43,13 +43,13 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
       // non complete containers should not be here
       assert (containerStatus.getState() == ContainerState.COMPLETE);
       SettableFuture<ContainerStopResponse> stopResponseFuture =
-          _genericApplicationMaster.containerStopMap.get(containerStatus.getContainerId());
+          _genericApplicationMaster.containerStopMap.remove(containerStatus.getContainerId());
       if (stopResponseFuture != null) {
         ContainerStopResponse value = new ContainerStopResponse();
         stopResponseFuture.set(value);
       } else {
         SettableFuture<ContainerReleaseResponse> releaseResponseFuture =
-            _genericApplicationMaster.containerReleaseMap.get(containerStatus.getContainerId());
+            _genericApplicationMaster.containerReleaseMap.remove(containerStatus.getContainerId());
         if (releaseResponseFuture != null) {
           ContainerReleaseResponse value = new ContainerReleaseResponse();
           releaseResponseFuture.set(value);

http://git-wip-us.apache.org/repos/asf/helix/blob/64e15314/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index 2eedfd0..2d6e306 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -33,7 +33,6 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.config.ContainerConfig;
-import org.apache.helix.api.config.ParticipantConfig;
 import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.ResourceId;
@@ -210,7 +209,7 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
     vargs.add("--cluster " + appName);
     vargs.add("--participantId " + participant.getId().stringify());
     vargs.add("--participantClass " + mainClass);
-    
+
     vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout");
     vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr");
 
@@ -323,9 +322,9 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
       excessActiveContainers.remove(participantId); // don't stop this container if active
       if (excessHaltedContainers.containsKey(participantId)) {
         // Halted containers can be restarted if necessary
-        Participant participant = excessHaltedContainers.get(participantId);
-        //containersToStart.add(participant);
-        //excessHaltedContainers.remove(participantId); // don't release this container
+        // Participant participant = excessHaltedContainers.get(participantId);
+        // containersToStart.add(participant);
+        // excessHaltedContainers.remove(participantId); // don't release this container
       } else if (!existingContainersIdSet.contains(participantId)) {
         // Unallocated containers must be allocated
         ContainerSpec containerSpec = new ContainerSpec(participantId);