You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/02/21 23:57:23 UTC
git commit: Fix deallocation issue for killed processes
Repository: helix
Updated Branches:
refs/heads/helix-provisioning 64e153144 -> b3dacb7ab
Fix deallocation issue for killed processes
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b3dacb7a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b3dacb7a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b3dacb7a
Branch: refs/heads/helix-provisioning
Commit: b3dacb7abadb526c8cb1661a6b51ad47d39d9537
Parents: 64e1531
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Feb 21 14:57:15 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Feb 21 14:57:15 2014 -0800
----------------------------------------------------------------------
.../yarn/GenericApplicationMaster.java | 19 +++++++++-----
.../provisioning/yarn/RMCallbackHandler.java | 27 ++++++++++++--------
2 files changed, 29 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/b3dacb7a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
index a006363..79eb402 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
@@ -23,13 +23,11 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Set;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
@@ -61,6 +59,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -104,6 +103,7 @@ public class GenericApplicationMaster {
new LinkedHashMap<ContainerId, SettableFuture<ContainerStopResponse>>();
Map<ContainerId, SettableFuture<ContainerLaunchResponse>> containerLaunchResponseMap =
new LinkedHashMap<ContainerId, SettableFuture<ContainerLaunchResponse>>();
+ Set<ContainerId> allocatedContainerSet = Sets.newHashSet();
ByteBuffer allTokens;
@@ -246,14 +246,21 @@ public class GenericApplicationMaster {
public ListenableFuture<ContainerReleaseResponse> releaseContainer(Container container) {
LOG.info("Requesting container RELEASE:" + container);
SettableFuture<ContainerReleaseResponse> future = SettableFuture.create();
- containerReleaseMap.put(container.getId(), future);
- amRMClient.releaseAssignedContainer(container.getId());
+ synchronized (allocatedContainerSet) {
+ if (!allocatedContainerSet.contains(container.getId())) {
+ future.set(new ContainerReleaseResponse());
+ } else {
+ containerReleaseMap.put(container.getId(), future);
+ amRMClient.releaseAssignedContainer(container.getId());
+ }
+ }
return future;
}
public ListenableFuture<ContainerLaunchResponse> launchContainer(Container container,
ContainerLaunchContext containerLaunchContext) {
- LOG.info("Requesting container LAUNCH:" + container + " :" + Joiner.on(" ").join(containerLaunchContext.getCommands()));
+ LOG.info("Requesting container LAUNCH:" + container + " :"
+ + Joiner.on(" ").join(containerLaunchContext.getCommands()));
SettableFuture<ContainerLaunchResponse> future = SettableFuture.create();
containerLaunchResponseMap.put(container.getId(), future);
nmClientAsync.startContainerAsync(container, containerLaunchContext);
http://git-wip-us.apache.org/repos/asf/helix/blob/b3dacb7a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
index fe2c854..8612d3a 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
@@ -42,17 +42,21 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
// non complete containers should not be here
assert (containerStatus.getState() == ContainerState.COMPLETE);
- SettableFuture<ContainerStopResponse> stopResponseFuture =
- _genericApplicationMaster.containerStopMap.remove(containerStatus.getContainerId());
- if (stopResponseFuture != null) {
- ContainerStopResponse value = new ContainerStopResponse();
- stopResponseFuture.set(value);
- } else {
- SettableFuture<ContainerReleaseResponse> releaseResponseFuture =
- _genericApplicationMaster.containerReleaseMap.remove(containerStatus.getContainerId());
- if (releaseResponseFuture != null) {
- ContainerReleaseResponse value = new ContainerReleaseResponse();
- releaseResponseFuture.set(value);
+ synchronized (_genericApplicationMaster.allocatedContainerSet) {
+ _genericApplicationMaster.allocatedContainerSet.remove(containerStatus.getContainerId());
+ SettableFuture<ContainerStopResponse> stopResponseFuture =
+ _genericApplicationMaster.containerStopMap.remove(containerStatus.getContainerId());
+ if (stopResponseFuture != null) {
+ ContainerStopResponse value = new ContainerStopResponse();
+ stopResponseFuture.set(value);
+ } else {
+ SettableFuture<ContainerReleaseResponse> releaseResponseFuture =
+ _genericApplicationMaster.containerReleaseMap
+ .remove(containerStatus.getContainerId());
+ if (releaseResponseFuture != null) {
+ ContainerReleaseResponse value = new ContainerReleaseResponse();
+ releaseResponseFuture.set(value);
+ }
}
}
// increment counters for completed/failed containers
@@ -96,6 +100,7 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
_genericApplicationMaster.containerRequestMap.remove(containerRequest);
ContainerAskResponse response = new ContainerAskResponse();
response.setContainer(allocatedContainer);
+ _genericApplicationMaster.allocatedContainerSet.add(allocatedContainer.getId());
future.set(response);
break;
}