You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/02/21 19:22:05 UTC

git commit: Handling stop/release requests

Repository: helix
Updated Branches:
  refs/heads/helix-provisioning 0037b745a -> 2709b07c2


Handling stop/release requests


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2709b07c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2709b07c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2709b07c

Branch: refs/heads/helix-provisioning
Commit: 2709b07c26d509ba71b1c83ae9c37dc07a46aa8c
Parents: 0037b74
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Fri Feb 21 10:21:58 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Fri Feb 21 10:21:58 2014 -0800

----------------------------------------------------------------------
 .../helix/provisioning/yarn/AppLauncher.java    | 67 +++++++++++++++++++-
 .../yarn/GenericApplicationMaster.java          |  9 ++-
 .../provisioning/yarn/RMCallbackHandler.java    | 43 +++++++++----
 .../provisioning/yarn/YarnProvisioner.java      |  4 +-
 4 files changed, 103 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/2709b07c/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
index d06ae67..3bba018 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
@@ -29,16 +29,20 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -345,10 +349,36 @@ public class AppLauncher {
    * @return true if successfully completed, it will print status every X seconds
    */
   public boolean waitUntilDone() {
+    String prevReport = "";
     while (true) {
       try {
+        // Get application report for the appId we are interested in
+        ApplicationReport report = yarnClient.getApplicationReport(_appId);
+
+        String reportMessage = generateReport(report);
+        if (!reportMessage.equals(prevReport)) {
+          LOG.info(reportMessage);
+        }
+        YarnApplicationState state = report.getYarnApplicationState();
+        FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
+        if (YarnApplicationState.FINISHED == state) {
+          if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
+            LOG.info("Application has completed successfully. Breaking monitoring loop");
+            return true;
+          } else {
+            LOG.info("Application did finished unsuccessfully." + " YarnState=" + state.toString()
+                + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop");
+            return false;
+          }
+        } else if (YarnApplicationState.KILLED == state || YarnApplicationState.FAILED == state) {
+          LOG.info("Application did not finish." + " YarnState=" + state.toString()
+              + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop");
+          return false;
+        }
+        prevReport = reportMessage;
         Thread.sleep(10000);
-      } catch (InterruptedException e) {
+      } catch (Exception e) {
+        LOG.error("Exception while getting info ");
         break;
       }
     }
@@ -356,6 +386,31 @@ public class AppLauncher {
   }
 
   /**
+   * TODO: kill the app only in dev mode. In prod, its ok for the app to continue running if the
+   * launcher dies after launching
+   */
+
+  private String generateReport(ApplicationReport report) {
+    return "Got application report from ASM for" + ", appId=" + _appId.getId()
+        + ", clientToAMToken=" + report.getClientToAMToken() + ", appDiagnostics="
+        + report.getDiagnostics() + ", appMasterHost=" + report.getHost() + ", appQueue="
+        + report.getQueue() + ", appMasterRpcPort=" + report.getRpcPort() + ", appStartTime="
+        + report.getStartTime() + ", yarnAppState=" + report.getYarnApplicationState().toString()
+        + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
+        + ", appTrackingUrl=" + report.getTrackingUrl() + ", appUser=" + report.getUser();
+  }
+
+  protected void cleanup() {
+    LOG.info("Cleaning up");
+    try {
+      ApplicationReport applicationReport = yarnClient.getApplicationReport(_appId);
+      LOG.info("Killing application:"+ _appId + " \n Application report" + generateReport(applicationReport));
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
    * will take the input file and AppSpecFactory class name as input
    * @param args
    * @throws Exception
@@ -364,8 +419,16 @@ public class AppLauncher {
     ApplicationSpecFactory applicationSpecFactory =
         (ApplicationSpecFactory) Class.forName(args[0]).newInstance();
     File yamlConfigFile = new File(args[1]);
-    AppLauncher launcher = new AppLauncher(applicationSpecFactory, yamlConfigFile);
+    final AppLauncher launcher = new AppLauncher(applicationSpecFactory, yamlConfigFile);
     launcher.launch();
+    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+        launcher.cleanup();
+      }
+    }));
     launcher.waitUntilDone();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2709b07c/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
index 3adffd6..a006363 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
@@ -226,7 +228,7 @@ public class GenericApplicationMaster {
   }
 
   public ListenableFuture<ContainerAskResponse> acquireContainer(ContainerRequest containerAsk) {
-    amRMClient.addContainerRequest(containerAsk);
+    LOG.info("Requesting container ACQUIRE:" + containerAsk);
     SettableFuture<ContainerAskResponse> future = SettableFuture.create();
     containerRequestMap.put(containerAsk, future);
     amRMClient.addContainerRequest(containerAsk);
@@ -234,7 +236,7 @@ public class GenericApplicationMaster {
   }
 
   public ListenableFuture<ContainerStopResponse> stopContainer(Container container) {
-    nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
+    LOG.info("Requesting container STOP:" + container);
     SettableFuture<ContainerStopResponse> future = SettableFuture.create();
     containerStopMap.put(container.getId(), future);
     nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
@@ -242,7 +244,7 @@ public class GenericApplicationMaster {
   }
 
   public ListenableFuture<ContainerReleaseResponse> releaseContainer(Container container) {
-    amRMClient.releaseAssignedContainer(container.getId());
+    LOG.info("Requesting container RELEASE:" + container);
     SettableFuture<ContainerReleaseResponse> future = SettableFuture.create();
     containerReleaseMap.put(container.getId(), future);
     amRMClient.releaseAssignedContainer(container.getId());
@@ -251,6 +253,7 @@ public class GenericApplicationMaster {
 
   public ListenableFuture<ContainerLaunchResponse> launchContainer(Container container,
       ContainerLaunchContext containerLaunchContext) {
+    LOG.info("Requesting container LAUNCH:" + container + " :" + Joiner.on(" ").join(containerLaunchContext.getCommands()));
     SettableFuture<ContainerLaunchResponse> future = SettableFuture.create();
     containerLaunchResponseMap.put(container.getId(), future);
     nmClientAsync.startContainerAsync(container, containerLaunchContext);

http://git-wip-us.apache.org/repos/asf/helix/blob/2709b07c/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
index 50c38b5..dae28a8 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
@@ -35,23 +35,36 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
   public void onContainersCompleted(List<ContainerStatus> completedContainers) {
     LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
     for (ContainerStatus containerStatus : completedContainers) {
-      GenericApplicationMaster.LOG.info("Got container status for containerID=" + containerStatus.getContainerId()
-          + ", state=" + containerStatus.getState() + ", exitStatus="
-          + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics());
+      GenericApplicationMaster.LOG.info("Got container status for containerID="
+          + containerStatus.getContainerId() + ", state=" + containerStatus.getState()
+          + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics="
+          + containerStatus.getDiagnostics());
 
       // non complete containers should not be here
       assert (containerStatus.getState() == ContainerState.COMPLETE);
-     
+      SettableFuture<ContainerStopResponse> stopResponseFuture =
+          _genericApplicationMaster.containerStopMap.get(containerStatus.getContainerId());
+      if (stopResponseFuture != null) {
+        ContainerStopResponse value = new ContainerStopResponse();
+        stopResponseFuture.set(value);
+      } else {
+        SettableFuture<ContainerReleaseResponse> releaseResponseFuture =
+            _genericApplicationMaster.containerReleaseMap.get(containerStatus.getContainerId());
+        if (releaseResponseFuture != null) {
+          ContainerReleaseResponse value = new ContainerReleaseResponse();
+          releaseResponseFuture.set(value);
+        }
+      }
       // increment counters for completed/failed containers
       int exitStatus = containerStatus.getExitStatus();
       if (0 != exitStatus) {
         // container failed
         if (ContainerExitStatus.ABORTED != exitStatus) {
-      
+
         } else {
           // container was killed by framework, possibly preempted
           // we should re-try as the container was lost for some reason
-      
+
           // we do not need to release the container as it would be done
           // by the RM
         }
@@ -66,7 +79,8 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
 
   @Override
   public void onContainersAllocated(List<Container> allocatedContainers) {
-    GenericApplicationMaster.LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
+    GenericApplicationMaster.LOG.info("Got response from RM for container ask, allocatedCnt="
+        + allocatedContainers.size());
     for (Container allocatedContainer : allocatedContainers) {
       GenericApplicationMaster.LOG.info("Allocated new container." + ", containerId="
           + allocatedContainer.getId() + ", containerNode="
@@ -74,15 +88,18 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
           + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
           + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
           + allocatedContainer.getResource().getMemory());
-      for(ContainerRequest containerRequest: _genericApplicationMaster.containerRequestMap.keySet()){
-        if(containerRequest.getCapability().getMemory() == allocatedContainer.getResource().getMemory()){
-          SettableFuture<ContainerAskResponse> future = _genericApplicationMaster.containerRequestMap.remove(containerRequest);
+      for (ContainerRequest containerRequest : _genericApplicationMaster.containerRequestMap
+          .keySet()) {
+        if (containerRequest.getCapability().getMemory() == allocatedContainer.getResource()
+            .getMemory()) {
+          SettableFuture<ContainerAskResponse> future =
+              _genericApplicationMaster.containerRequestMap.remove(containerRequest);
           ContainerAskResponse response = new ContainerAskResponse();
           response.setContainer(allocatedContainer);
           future.set(response);
           break;
         }
-      }     
+      }
     }
   }
 
@@ -97,11 +114,11 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
   @Override
   public float getProgress() {
     // set progress to deliver to RM on next heartbeat
-    return (System.currentTimeMillis()-startTime) % Integer.MAX_VALUE;
+    return (System.currentTimeMillis() - startTime) % Integer.MAX_VALUE;
   }
 
   @Override
   public void onError(Throwable e) {
     _genericApplicationMaster.amRMClient.stop();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/2709b07c/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index 8fd308e..2eedfd0 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -324,8 +324,8 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
       if (excessHaltedContainers.containsKey(participantId)) {
         // Halted containers can be restarted if necessary
         Participant participant = excessHaltedContainers.get(participantId);
-        containersToStart.add(participant);
-        excessHaltedContainers.remove(participantId); // don't release this container
+        //containersToStart.add(participant);
+        //excessHaltedContainers.remove(participantId); // don't release this container
       } else if (!existingContainersIdSet.contains(participantId)) {
         // Unallocated containers must be allocated
         ContainerSpec containerSpec = new ContainerSpec(participantId);