You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/02/21 23:57:23 UTC

git commit: Fix deallocation issue for killed processes

Repository: helix
Updated Branches:
  refs/heads/helix-provisioning 64e153144 -> b3dacb7ab


Fix deallocation issue for killed processes


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b3dacb7a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b3dacb7a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b3dacb7a

Branch: refs/heads/helix-provisioning
Commit: b3dacb7abadb526c8cb1661a6b51ad47d39d9537
Parents: 64e1531
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Feb 21 14:57:15 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Feb 21 14:57:15 2014 -0800

----------------------------------------------------------------------
 .../yarn/GenericApplicationMaster.java          | 19 +++++++++-----
 .../provisioning/yarn/RMCallbackHandler.java    | 27 ++++++++++++--------
 2 files changed, 29 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b3dacb7a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
index a006363..79eb402 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
@@ -23,13 +23,11 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Set;
 
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
@@ -61,6 +59,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
@@ -104,6 +103,7 @@ public class GenericApplicationMaster {
       new LinkedHashMap<ContainerId, SettableFuture<ContainerStopResponse>>();
   Map<ContainerId, SettableFuture<ContainerLaunchResponse>> containerLaunchResponseMap =
       new LinkedHashMap<ContainerId, SettableFuture<ContainerLaunchResponse>>();
+  Set<ContainerId> allocatedContainerSet = Sets.newHashSet();
 
   ByteBuffer allTokens;
 
@@ -246,14 +246,21 @@ public class GenericApplicationMaster {
   public ListenableFuture<ContainerReleaseResponse> releaseContainer(Container container) {
     LOG.info("Requesting container RELEASE:" + container);
     SettableFuture<ContainerReleaseResponse> future = SettableFuture.create();
-    containerReleaseMap.put(container.getId(), future);
-    amRMClient.releaseAssignedContainer(container.getId());
+    synchronized (allocatedContainerSet) {
+      if (!allocatedContainerSet.contains(container.getId())) {
+        future.set(new ContainerReleaseResponse());
+      } else {
+        containerReleaseMap.put(container.getId(), future);
+        amRMClient.releaseAssignedContainer(container.getId());
+      }
+    }
     return future;
   }
 
   public ListenableFuture<ContainerLaunchResponse> launchContainer(Container container,
       ContainerLaunchContext containerLaunchContext) {
-    LOG.info("Requesting container LAUNCH:" + container + " :" + Joiner.on(" ").join(containerLaunchContext.getCommands()));
+    LOG.info("Requesting container LAUNCH:" + container + " :"
+        + Joiner.on(" ").join(containerLaunchContext.getCommands()));
     SettableFuture<ContainerLaunchResponse> future = SettableFuture.create();
     containerLaunchResponseMap.put(container.getId(), future);
     nmClientAsync.startContainerAsync(container, containerLaunchContext);

http://git-wip-us.apache.org/repos/asf/helix/blob/b3dacb7a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
index fe2c854..8612d3a 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
@@ -42,17 +42,21 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
 
       // non complete containers should not be here
       assert (containerStatus.getState() == ContainerState.COMPLETE);
-      SettableFuture<ContainerStopResponse> stopResponseFuture =
-          _genericApplicationMaster.containerStopMap.remove(containerStatus.getContainerId());
-      if (stopResponseFuture != null) {
-        ContainerStopResponse value = new ContainerStopResponse();
-        stopResponseFuture.set(value);
-      } else {
-        SettableFuture<ContainerReleaseResponse> releaseResponseFuture =
-            _genericApplicationMaster.containerReleaseMap.remove(containerStatus.getContainerId());
-        if (releaseResponseFuture != null) {
-          ContainerReleaseResponse value = new ContainerReleaseResponse();
-          releaseResponseFuture.set(value);
+      synchronized (_genericApplicationMaster.allocatedContainerSet) {
+        _genericApplicationMaster.allocatedContainerSet.remove(containerStatus.getContainerId());
+        SettableFuture<ContainerStopResponse> stopResponseFuture =
+            _genericApplicationMaster.containerStopMap.remove(containerStatus.getContainerId());
+        if (stopResponseFuture != null) {
+          ContainerStopResponse value = new ContainerStopResponse();
+          stopResponseFuture.set(value);
+        } else {
+          SettableFuture<ContainerReleaseResponse> releaseResponseFuture =
+              _genericApplicationMaster.containerReleaseMap
+                  .remove(containerStatus.getContainerId());
+          if (releaseResponseFuture != null) {
+            ContainerReleaseResponse value = new ContainerReleaseResponse();
+            releaseResponseFuture.set(value);
+          }
         }
       }
       // increment counters for completed/failed containers
@@ -96,6 +100,7 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
               _genericApplicationMaster.containerRequestMap.remove(containerRequest);
           ContainerAskResponse response = new ContainerAskResponse();
           response.setContainer(allocatedContainer);
+          _genericApplicationMaster.allocatedContainerSet.add(allocatedContainer.getId());
           future.set(response);
           break;
         }