You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/02/21 21:41:36 UTC
git commit: Set future for release response when it comes
Repository: helix
Updated Branches:
refs/heads/helix-provisioning 186283457 -> 64e153144
Set future for release response when it comes
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/64e15314
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/64e15314
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/64e15314
Branch: refs/heads/helix-provisioning
Commit: 64e153144c1b5602808a3f99f950f14465ca1e17
Parents: 1862834
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Feb 21 12:40:03 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Feb 21 12:40:03 2014 -0800
----------------------------------------------------------------------
.../helix/controller/stages/ContainerProvisioningStage.java | 8 ++++++++
.../apache/helix/provisioning/yarn/NMCallbackHandler.java | 5 ++---
.../apache/helix/provisioning/yarn/RMCallbackHandler.java | 4 ++--
.../org/apache/helix/provisioning/yarn/YarnProvisioner.java | 9 ++++-----
4 files changed, 16 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/64e15314/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 5cccd68..f258525 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -155,10 +155,12 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
// create the helix participant and add it to cluster
helixAdmin.addInstance(cluster.getId().toString(), instanceConfig);
}
+ LOG.info("Allocating container for " + participantId);
ListenableFuture<ContainerId> future = containerProvider.allocateContainer(spec);
FutureCallback<ContainerId> callback = new FutureCallback<ContainerId>() {
@Override
public void onSuccess(ContainerId containerId) {
+ LOG.info("Container " + containerId + " acquired. Marking " + participantId);
InstanceConfig existingInstance =
helixAdmin
.getInstanceConfig(cluster.getId().toString(), participantId.toString());
@@ -188,12 +190,14 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
existingInstance);
// create the helix participant and add it to cluster
+ LOG.info("Starting container " + containerId + " for " + participant.getId());
ListenableFuture<Boolean> future =
containerProvider.startContainer(containerId, participant);
FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
// Do nothing yet, need to wait for live instance
+ LOG.info("Container " + containerId + " started for " + participant.getId());
}
@Override
@@ -218,10 +222,12 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
existingInstance);
// remove the participant
+ LOG.info("Deallocating container " + containerId + " for " + participant.getId());
ListenableFuture<Boolean> future = containerProvider.deallocateContainer(containerId);
FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
+ LOG.info("Container " + containerId + " deallocated. Dropping " + participant.getId());
InstanceConfig existingInstance =
helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
.toString());
@@ -251,10 +257,12 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
existingInstance);
// stop the container
+ LOG.info("Stopping container " + containerId + " for " + participant.getId());
ListenableFuture<Boolean> future = containerProvider.stopContainer(containerId);
FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
+ LOG.info("Container " + containerId + " stopped. Marking " + participant.getId());
updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
ContainerState.HALTED);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/64e15314/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
index da6c01f..1566c28 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
@@ -8,7 +8,6 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.log4j.Logger;
@@ -39,7 +38,7 @@ class NMCallbackHandler implements NMClientAsync.CallbackHandler {
applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
}
SettableFuture<ContainerStopResponse> settableFuture =
- applicationMaster.containerStopMap.get(containerId);
+ applicationMaster.containerStopMap.remove(containerId);
ContainerStopResponse value = new ContainerStopResponse();
settableFuture.set(value);
containers.remove(containerId);
@@ -59,7 +58,7 @@ class NMCallbackHandler implements NMClientAsync.CallbackHandler {
applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
}
SettableFuture<ContainerLaunchResponse> settableFuture =
- applicationMaster.containerLaunchResponseMap.get(containerId);
+ applicationMaster.containerLaunchResponseMap.remove(containerId);
ContainerLaunchResponse value = new ContainerLaunchResponse();
settableFuture.set(value);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/64e15314/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
index dae28a8..fe2c854 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
@@ -43,13 +43,13 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
// non complete containers should not be here
assert (containerStatus.getState() == ContainerState.COMPLETE);
SettableFuture<ContainerStopResponse> stopResponseFuture =
- _genericApplicationMaster.containerStopMap.get(containerStatus.getContainerId());
+ _genericApplicationMaster.containerStopMap.remove(containerStatus.getContainerId());
if (stopResponseFuture != null) {
ContainerStopResponse value = new ContainerStopResponse();
stopResponseFuture.set(value);
} else {
SettableFuture<ContainerReleaseResponse> releaseResponseFuture =
- _genericApplicationMaster.containerReleaseMap.get(containerStatus.getContainerId());
+ _genericApplicationMaster.containerReleaseMap.remove(containerStatus.getContainerId());
if (releaseResponseFuture != null) {
ContainerReleaseResponse value = new ContainerReleaseResponse();
releaseResponseFuture.set(value);
http://git-wip-us.apache.org/repos/asf/helix/blob/64e15314/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index 2eedfd0..2d6e306 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -33,7 +33,6 @@ import org.apache.helix.HelixManager;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
import org.apache.helix.api.config.ContainerConfig;
-import org.apache.helix.api.config.ParticipantConfig;
import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.ResourceId;
@@ -210,7 +209,7 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
vargs.add("--cluster " + appName);
vargs.add("--participantId " + participant.getId().stringify());
vargs.add("--participantClass " + mainClass);
-
+
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr");
@@ -323,9 +322,9 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
excessActiveContainers.remove(participantId); // don't stop this container if active
if (excessHaltedContainers.containsKey(participantId)) {
// Halted containers can be restarted if necessary
- Participant participant = excessHaltedContainers.get(participantId);
- //containersToStart.add(participant);
- //excessHaltedContainers.remove(participantId); // don't release this container
+ // Participant participant = excessHaltedContainers.get(participantId);
+ // containersToStart.add(participant);
+ // excessHaltedContainers.remove(participantId); // don't release this container
} else if (!existingContainersIdSet.contains(participantId)) {
// Unallocated containers must be allocated
ContainerSpec containerSpec = new ContainerSpec(participantId);