You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/11/25 18:30:51 UTC

apex-core git commit: APEXCORE-405 Allow client to query if application was finished.

Repository: apex-core
Updated Branches:
  refs/heads/master a54e0b7f8 -> 0be03527e


APEXCORE-405 Allow client to query if application was finished.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/0be03527
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/0be03527
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/0be03527

Branch: refs/heads/master
Commit: 0be03527e361b023e79c0d9f9e8ef5e2632d64bf
Parents: a54e0b7
Author: Thomas Weise <th...@apache.org>
Authored: Thu Nov 24 18:22:26 2016 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Thu Nov 24 18:22:26 2016 -0800

----------------------------------------------------------------------
 .../apache/apex/api/EmbeddedAppLauncher.java    | 11 ++--
 .../main/java/org/apache/apex/api/Launcher.java | 54 ++++++++---------
 .../datatorrent/stram/StramLocalCluster.java    |  5 ++
 .../apex/engine/EmbeddedAppLauncherImpl.java    | 33 +++++------
 .../apache/apex/engine/YarnAppLauncherImpl.java | 61 +++++++++++++-------
 5 files changed, 91 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/0be03527/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java b/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java
index 8e3e0f6..4ff705b 100644
--- a/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java
+++ b/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java
@@ -21,7 +21,6 @@ package org.apache.apex.api;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.Attribute;
-import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.StreamingApplication;
 
 /**
@@ -33,25 +32,25 @@ public abstract class EmbeddedAppLauncher<H extends EmbeddedAppLauncher.Embedded
   /**
    * Parameter to specify the time after which the application will be shutdown; pass 0 to run indefinitely.
    */
-  public static final Attribute<Long> RUN_MILLIS = new Attribute<Long>(0L);
+  public static final Attribute<Long> RUN_MILLIS = new Attribute<>(0L);
 
   /**
    * Parameter to launch application asynchronously and return from launch immediately.
    */
-  public static final Attribute<Boolean> RUN_ASYNC = new Attribute<Boolean>(false);
+  public static final Attribute<Boolean> RUN_ASYNC = new Attribute<>(false);
 
   /**
    * Parameter to enable or disable heartbeat monitoring.
    */
-  public static final Attribute<Boolean> HEARTBEAT_MONITORING = new Attribute<Boolean>(true);
+  public static final Attribute<Boolean> HEARTBEAT_MONITORING = new Attribute<>(true);
 
   /**
    * Parameter to serialize DAG before launch.
    */
-  public static final Attribute<Boolean> SERIALIZE_DAG = new Attribute<Boolean>(false);
+  public static final Attribute<Boolean> SERIALIZE_DAG = new Attribute<>(false);
 
   static {
-    Attribute.AttributeMap.AttributeInitializer.initialize(LocalMode.class);
+    Attribute.AttributeMap.AttributeInitializer.initialize(EmbeddedAppLauncher.class);
   }
 
   public static EmbeddedAppLauncher newInstance()

http://git-wip-us.apache.org/repos/asf/apex-core/blob/0be03527/api/src/main/java/org/apache/apex/api/Launcher.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/Launcher.java b/api/src/main/java/org/apache/apex/api/Launcher.java
index 14c365a..0291ab0 100644
--- a/api/src/main/java/org/apache/apex/api/Launcher.java
+++ b/api/src/main/java/org/apache/apex/api/Launcher.java
@@ -77,8 +77,30 @@ public abstract class Launcher<H extends Launcher.AppHandle>
     KILL
   }
 
-  // Marker interface
-  public interface AppHandle {}
+  /**
+   * Results of application launch. The client can interact with the running application through this handle.
+   */
+  public interface AppHandle
+  {
+    boolean isFinished();
+
+    /**
+     * Shutdown the application.
+     *
+     * The method takes the application handle and a shutdown mode. The shutdown mode specifies how to shutdown the
+     * application.
+     *
+     * If the mode is AWAIT_TERMINATION, an attempt should be made to shutdown the application in an orderly fashion
+     * and wait till termination. If the application does not terminate in a reasonable amount of time the
+     * implementation can forcibly terminate the application.
+     *
+     * If the mode is KILL, the application can be killed immediately.
+     *
+     * @param shutdownMode The shutdown mode
+     */
+    void shutdown(ShutdownMode shutdownMode) throws LauncherException;
+
+  }
 
   /**
    * Get a launcher instance.<br><br>
@@ -138,34 +160,6 @@ public abstract class Launcher<H extends Launcher.AppHandle>
    */
   public abstract H launchApp(StreamingApplication application, Configuration configuration, Attribute.AttributeMap launchParameters) throws LauncherException;
 
-  /**
-   * Shutdown the application and await termination.
-   * Also see {@link #shutdownApp(AppHandle, ShutdownMode)}
-   *
-   * @param app The application handle
-   */
-  public void shutdownApp(H app) throws LauncherException
-  {
-    shutdownApp(app, ShutdownMode.AWAIT_TERMINATION);
-  }
-
-  /**
-   * Shutdown the application.
-   *
-   * The method takes the application handle and a shutdown mode. The shutdown mode specifies how to shutdown the
-   * application.
-   *
-   * If the mode is AWAIT_TERMINATION, an attempt should be made to shutdown the application in an orderly fashion
-   * and wait till termination. If the application does not terminate in a reasonable amount of time the
-   * implementation can forcibly terminate the application.
-   *
-   * If the mode is KILL, the application can be killed immediately.
-   *
-   * @param app The application handle
-   * @param shutdownMode The shutdown mode
-   */
-  public abstract void shutdownApp(H app, ShutdownMode shutdownMode) throws LauncherException;
-
   protected static <T> T loadService(Class<T> clazz)
   {
     ServiceLoader<T> loader = ServiceLoader.load(clazz);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/0be03527/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index 48ed070..14a2827 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -411,6 +411,11 @@ public class StramLocalCluster implements Runnable, Controller
     appDone = true;
   }
 
+  public boolean isFinished()
+  {
+    return appDone;
+  }
+
   @Override
   public void setHeartbeatMonitoringEnabled(boolean enabled)
   {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/0be03527/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
index 9ace9b5..5930e78 100644
--- a/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
+++ b/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
@@ -68,7 +68,7 @@ public class EmbeddedAppLauncherImpl extends LocalMode<EmbeddedAppLauncherImpl.E
     } catch (Exception e) {
       throw new LauncherException(e);
     }
-    LocalMode.Controller lc = getController();
+    StramLocalCluster lc = getController();
     boolean launched = false;
     if (launchParameters != null) {
       if (StramUtils.getValueWithDefault(launchParameters, SERIALIZE_DAG)) {
@@ -100,16 +100,6 @@ public class EmbeddedAppLauncherImpl extends LocalMode<EmbeddedAppLauncherImpl.E
   }
 
   @Override
-  public void shutdownApp(EmbeddedAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException
-  {
-    if (shutdownMode != ShutdownMode.KILL) {
-      app.controller.shutdown();
-    } else {
-      throw new UnsupportedOperationException("Kill not supported");
-    }
-  }
-
-  @Override
   public DAG prepareDAG(StreamingApplication app, Configuration conf) throws Exception
   {
     if (app == null && conf == null) {
@@ -125,7 +115,7 @@ public class EmbeddedAppLauncherImpl extends LocalMode<EmbeddedAppLauncherImpl.E
   }
 
   @Override
-  public Controller getController()
+  public StramLocalCluster getController()
   {
     try {
       addLibraryJarsToClasspath(lp);
@@ -157,17 +147,26 @@ public class EmbeddedAppLauncherImpl extends LocalMode<EmbeddedAppLauncherImpl.E
 
   }
 
-  /**
-   *
-   */
   public static class EmbeddedAppHandleImpl implements EmbeddedAppLauncher.EmbeddedAppHandle
   {
-    Controller controller;
+    final StramLocalCluster controller;
 
-    public EmbeddedAppHandleImpl(Controller controller)
+    public EmbeddedAppHandleImpl(StramLocalCluster controller)
     {
       this.controller = controller;
     }
 
+    @Override
+    public boolean isFinished()
+    {
+      return controller.isFinished();
+    }
+
+    @Override
+    public void shutdown(ShutdownMode shutdownMode) throws LauncherException
+    {
+      controller.shutdown();
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/0be03527/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
index 4f5c8c8..d7a6dc8 100644
--- a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
+++ b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
@@ -20,14 +20,15 @@ package org.apache.apex.engine;
 
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.apex.api.YarnAppLauncher;
 import org.apache.apex.engine.util.StreamingAppFactory;
+import org.apache.bval.jsr303.util.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 
@@ -35,7 +36,6 @@ import com.google.common.base.Throwables;
 
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.stram.StramUtils;
 import com.datatorrent.stram.client.StramAppLauncher;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
@@ -54,6 +54,7 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
     propMapping.put(YarnAppLauncher.QUEUE_NAME, StramAppLauncher.QUEUE_NAME);
   }
 
+  @Override
   public YarnAppHandleImpl launchApp(final StreamingApplication app, Configuration conf, Attribute.AttributeMap launchParameters) throws LauncherException
   {
     if (launchParameters != null) {
@@ -83,27 +84,21 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
     }
   }
 
-  @Override
-  public void shutdownApp(YarnAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException
+  protected void shutdownApp(YarnAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException
   {
     if (shutdownMode == ShutdownMode.KILL) {
       YarnClient yarnClient = YarnClient.createYarnClient();
       try {
-        String appId = app.getApplicationId();
-        ApplicationId applicationId = null;
-        List<ApplicationReport> applications = StramUtils.getApexApplicationList(yarnClient);
-        for (ApplicationReport application : applications) {
-          if (application.getApplicationId().toString().equals(appId)) {
-            applicationId = application.getApplicationId();
-            break;
-          }
-        }
-        if (applicationId == null) {
-          throw new LauncherException("Application " + appId + " not found");
+        ApplicationId applicationId = app.appId;
+        ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
+        if (appReport == null) {
+          throw new LauncherException("Application " + app.getApplicationId() + " not found");
         }
         yarnClient.killApplication(applicationId);
       } catch (YarnException | IOException e) {
         throw Throwables.propagate(e);
+      } finally {
+        IOUtils.closeQuietly(yarnClient);
       }
     } else {
       throw new UnsupportedOperationException("Orderly shutdown not supported, try kill instead");
@@ -127,12 +122,9 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
     }
   }
 
-  /**
-   *
-   */
-  public static class YarnAppHandleImpl implements YarnAppLauncher.YarnAppHandle
+  public class YarnAppHandleImpl implements YarnAppLauncher.YarnAppHandle
   {
-    ApplicationId appId;
+    final ApplicationId appId;
 
     public YarnAppHandleImpl(ApplicationId appId)
     {
@@ -144,5 +136,34 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
     {
       return appId.toString();
     }
+
+    @Override
+    public boolean isFinished()
+    {
+      YarnClient yarnClient = YarnClient.createYarnClient();
+      try {
+        ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+        if (appReport != null) {
+          if (appReport.getFinalApplicationStatus() == null
+              || appReport.getFinalApplicationStatus() == FinalApplicationStatus.UNDEFINED) {
+            return false;
+          }
+        }
+        return true;
+      } catch (YarnException | IOException e) {
+        throw Throwables.propagate(e);
+      } finally {
+        IOUtils.closeQuietly(yarnClient);
+      }
+    }
+
+    @Override
+    public void shutdown(org.apache.apex.api.Launcher.ShutdownMode shutdownMode)
+        throws org.apache.apex.api.Launcher.LauncherException
+    {
+      shutdownApp(this, shutdownMode);
+
+    }
+
   }
 }