You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/02/21 19:22:05 UTC
git commit: Handling stop/release requests
Repository: helix
Updated Branches:
refs/heads/helix-provisioning 0037b745a -> 2709b07c2
Handling stop/release requests
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2709b07c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2709b07c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2709b07c
Branch: refs/heads/helix-provisioning
Commit: 2709b07c26d509ba71b1c83ae9c37dc07a46aa8c
Parents: 0037b74
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Fri Feb 21 10:21:58 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Fri Feb 21 10:21:58 2014 -0800
----------------------------------------------------------------------
.../helix/provisioning/yarn/AppLauncher.java | 67 +++++++++++++++++++-
.../yarn/GenericApplicationMaster.java | 9 ++-
.../provisioning/yarn/RMCallbackHandler.java | 43 +++++++++----
.../provisioning/yarn/YarnProvisioner.java | 4 +-
4 files changed, 103 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/2709b07c/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
index d06ae67..3bba018 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
@@ -29,16 +29,20 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
@@ -345,10 +349,36 @@ public class AppLauncher {
* @return true if successfully completed, it will print status every X seconds
*/
public boolean waitUntilDone() {
+ String prevReport = "";
while (true) {
try {
+ // Get application report for the appId we are interested in
+ ApplicationReport report = yarnClient.getApplicationReport(_appId);
+
+ String reportMessage = generateReport(report);
+ if (!reportMessage.equals(prevReport)) {
+ LOG.info(reportMessage);
+ }
+ YarnApplicationState state = report.getYarnApplicationState();
+ FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
+ if (YarnApplicationState.FINISHED == state) {
+ if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
+ LOG.info("Application has completed successfully. Breaking monitoring loop");
+ return true;
+ } else {
+ LOG.info("Application did finished unsuccessfully." + " YarnState=" + state.toString()
+ + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop");
+ return false;
+ }
+ } else if (YarnApplicationState.KILLED == state || YarnApplicationState.FAILED == state) {
+ LOG.info("Application did not finish." + " YarnState=" + state.toString()
+ + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop");
+ return false;
+ }
+ prevReport = reportMessage;
Thread.sleep(10000);
- } catch (InterruptedException e) {
+ } catch (Exception e) {
+ LOG.error("Exception while getting info ");
break;
}
}
@@ -356,6 +386,31 @@ public class AppLauncher {
}
/**
+ * TODO: kill the app only in dev mode. In prod, its ok for the app to continue running if the
+ * launcher dies after launching
+ */
+
+ private String generateReport(ApplicationReport report) {
+ return "Got application report from ASM for" + ", appId=" + _appId.getId()
+ + ", clientToAMToken=" + report.getClientToAMToken() + ", appDiagnostics="
+ + report.getDiagnostics() + ", appMasterHost=" + report.getHost() + ", appQueue="
+ + report.getQueue() + ", appMasterRpcPort=" + report.getRpcPort() + ", appStartTime="
+ + report.getStartTime() + ", yarnAppState=" + report.getYarnApplicationState().toString()
+ + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
+ + ", appTrackingUrl=" + report.getTrackingUrl() + ", appUser=" + report.getUser();
+ }
+
+ protected void cleanup() {
+ LOG.info("Cleaning up");
+ try {
+ ApplicationReport applicationReport = yarnClient.getApplicationReport(_appId);
+ LOG.info("Killing application:"+ _appId + " \n Application report" + generateReport(applicationReport));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
* will take the input file and AppSpecFactory class name as input
* @param args
* @throws Exception
@@ -364,8 +419,16 @@ public class AppLauncher {
ApplicationSpecFactory applicationSpecFactory =
(ApplicationSpecFactory) Class.forName(args[0]).newInstance();
File yamlConfigFile = new File(args[1]);
- AppLauncher launcher = new AppLauncher(applicationSpecFactory, yamlConfigFile);
+ final AppLauncher launcher = new AppLauncher(applicationSpecFactory, yamlConfigFile);
launcher.launch();
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ launcher.cleanup();
+ }
+ }));
launcher.waitUntilDone();
}
+
}
http://git-wip-us.apache.org/repos/asf/helix/blob/2709b07c/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
index 3adffd6..a006363 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -226,7 +228,7 @@ public class GenericApplicationMaster {
}
public ListenableFuture<ContainerAskResponse> acquireContainer(ContainerRequest containerAsk) {
- amRMClient.addContainerRequest(containerAsk);
+ LOG.info("Requesting container ACQUIRE:" + containerAsk);
SettableFuture<ContainerAskResponse> future = SettableFuture.create();
containerRequestMap.put(containerAsk, future);
amRMClient.addContainerRequest(containerAsk);
@@ -234,7 +236,7 @@ public class GenericApplicationMaster {
}
public ListenableFuture<ContainerStopResponse> stopContainer(Container container) {
- nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
+ LOG.info("Requesting container STOP:" + container);
SettableFuture<ContainerStopResponse> future = SettableFuture.create();
containerStopMap.put(container.getId(), future);
nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
@@ -242,7 +244,7 @@ public class GenericApplicationMaster {
}
public ListenableFuture<ContainerReleaseResponse> releaseContainer(Container container) {
- amRMClient.releaseAssignedContainer(container.getId());
+ LOG.info("Requesting container RELEASE:" + container);
SettableFuture<ContainerReleaseResponse> future = SettableFuture.create();
containerReleaseMap.put(container.getId(), future);
amRMClient.releaseAssignedContainer(container.getId());
@@ -251,6 +253,7 @@ public class GenericApplicationMaster {
public ListenableFuture<ContainerLaunchResponse> launchContainer(Container container,
ContainerLaunchContext containerLaunchContext) {
+ LOG.info("Requesting container LAUNCH:" + container + " :" + Joiner.on(" ").join(containerLaunchContext.getCommands()));
SettableFuture<ContainerLaunchResponse> future = SettableFuture.create();
containerLaunchResponseMap.put(container.getId(), future);
nmClientAsync.startContainerAsync(container, containerLaunchContext);
http://git-wip-us.apache.org/repos/asf/helix/blob/2709b07c/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
index 50c38b5..dae28a8 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
@@ -35,23 +35,36 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
public void onContainersCompleted(List<ContainerStatus> completedContainers) {
LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
for (ContainerStatus containerStatus : completedContainers) {
- GenericApplicationMaster.LOG.info("Got container status for containerID=" + containerStatus.getContainerId()
- + ", state=" + containerStatus.getState() + ", exitStatus="
- + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics());
+ GenericApplicationMaster.LOG.info("Got container status for containerID="
+ + containerStatus.getContainerId() + ", state=" + containerStatus.getState()
+ + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics="
+ + containerStatus.getDiagnostics());
// non complete containers should not be here
assert (containerStatus.getState() == ContainerState.COMPLETE);
-
+ SettableFuture<ContainerStopResponse> stopResponseFuture =
+ _genericApplicationMaster.containerStopMap.get(containerStatus.getContainerId());
+ if (stopResponseFuture != null) {
+ ContainerStopResponse value = new ContainerStopResponse();
+ stopResponseFuture.set(value);
+ } else {
+ SettableFuture<ContainerReleaseResponse> releaseResponseFuture =
+ _genericApplicationMaster.containerReleaseMap.get(containerStatus.getContainerId());
+ if (releaseResponseFuture != null) {
+ ContainerReleaseResponse value = new ContainerReleaseResponse();
+ releaseResponseFuture.set(value);
+ }
+ }
// increment counters for completed/failed containers
int exitStatus = containerStatus.getExitStatus();
if (0 != exitStatus) {
// container failed
if (ContainerExitStatus.ABORTED != exitStatus) {
-
+
} else {
// container was killed by framework, possibly preempted
// we should re-try as the container was lost for some reason
-
+
// we do not need to release the container as it would be done
// by the RM
}
@@ -66,7 +79,8 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
- GenericApplicationMaster.LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
+ GenericApplicationMaster.LOG.info("Got response from RM for container ask, allocatedCnt="
+ + allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
GenericApplicationMaster.LOG.info("Allocated new container." + ", containerId="
+ allocatedContainer.getId() + ", containerNode="
@@ -74,15 +88,18 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
+ allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
+ allocatedContainer.getResource().getMemory());
- for(ContainerRequest containerRequest: _genericApplicationMaster.containerRequestMap.keySet()){
- if(containerRequest.getCapability().getMemory() == allocatedContainer.getResource().getMemory()){
- SettableFuture<ContainerAskResponse> future = _genericApplicationMaster.containerRequestMap.remove(containerRequest);
+ for (ContainerRequest containerRequest : _genericApplicationMaster.containerRequestMap
+ .keySet()) {
+ if (containerRequest.getCapability().getMemory() == allocatedContainer.getResource()
+ .getMemory()) {
+ SettableFuture<ContainerAskResponse> future =
+ _genericApplicationMaster.containerRequestMap.remove(containerRequest);
ContainerAskResponse response = new ContainerAskResponse();
response.setContainer(allocatedContainer);
future.set(response);
break;
}
- }
+ }
}
}
@@ -97,11 +114,11 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
@Override
public float getProgress() {
// set progress to deliver to RM on next heartbeat
- return (System.currentTimeMillis()-startTime) % Integer.MAX_VALUE;
+ return (System.currentTimeMillis() - startTime) % Integer.MAX_VALUE;
}
@Override
public void onError(Throwable e) {
_genericApplicationMaster.amRMClient.stop();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/2709b07c/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index 8fd308e..2eedfd0 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -324,8 +324,8 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
if (excessHaltedContainers.containsKey(participantId)) {
// Halted containers can be restarted if necessary
Participant participant = excessHaltedContainers.get(participantId);
- containersToStart.add(participant);
- excessHaltedContainers.remove(participantId); // don't release this container
+ //containersToStart.add(participant);
+ //excessHaltedContainers.remove(participantId); // don't release this container
} else if (!existingContainersIdSet.contains(participantId)) {
// Unallocated containers must be allocated
ContainerSpec containerSpec = new ContainerSpec(participantId);