You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/11/25 18:30:51 UTC
apex-core git commit: APEXCORE-405 Allow client to query if
application was finished.
Repository: apex-core
Updated Branches:
refs/heads/master a54e0b7f8 -> 0be03527e
APEXCORE-405 Allow client to query if application was finished.
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/0be03527
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/0be03527
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/0be03527
Branch: refs/heads/master
Commit: 0be03527e361b023e79c0d9f9e8ef5e2632d64bf
Parents: a54e0b7
Author: Thomas Weise <th...@apache.org>
Authored: Thu Nov 24 18:22:26 2016 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Thu Nov 24 18:22:26 2016 -0800
----------------------------------------------------------------------
.../apache/apex/api/EmbeddedAppLauncher.java | 11 ++--
.../main/java/org/apache/apex/api/Launcher.java | 54 ++++++++---------
.../datatorrent/stram/StramLocalCluster.java | 5 ++
.../apex/engine/EmbeddedAppLauncherImpl.java | 33 +++++------
.../apache/apex/engine/YarnAppLauncherImpl.java | 61 +++++++++++++-------
5 files changed, 91 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/0be03527/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java b/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java
index 8e3e0f6..4ff705b 100644
--- a/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java
+++ b/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java
@@ -21,7 +21,6 @@ package org.apache.apex.api;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Attribute;
-import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
/**
@@ -33,25 +32,25 @@ public abstract class EmbeddedAppLauncher<H extends EmbeddedAppLauncher.Embedded
/**
* Parameter to specify the time after which the application will be shutdown; pass 0 to run indefinitely.
*/
- public static final Attribute<Long> RUN_MILLIS = new Attribute<Long>(0L);
+ public static final Attribute<Long> RUN_MILLIS = new Attribute<>(0L);
/**
* Parameter to launch application asynchronously and return from launch immediately.
*/
- public static final Attribute<Boolean> RUN_ASYNC = new Attribute<Boolean>(false);
+ public static final Attribute<Boolean> RUN_ASYNC = new Attribute<>(false);
/**
* Parameter to enable or disable heartbeat monitoring.
*/
- public static final Attribute<Boolean> HEARTBEAT_MONITORING = new Attribute<Boolean>(true);
+ public static final Attribute<Boolean> HEARTBEAT_MONITORING = new Attribute<>(true);
/**
* Parameter to serialize DAG before launch.
*/
- public static final Attribute<Boolean> SERIALIZE_DAG = new Attribute<Boolean>(false);
+ public static final Attribute<Boolean> SERIALIZE_DAG = new Attribute<>(false);
static {
- Attribute.AttributeMap.AttributeInitializer.initialize(LocalMode.class);
+ Attribute.AttributeMap.AttributeInitializer.initialize(EmbeddedAppLauncher.class);
}
public static EmbeddedAppLauncher newInstance()
http://git-wip-us.apache.org/repos/asf/apex-core/blob/0be03527/api/src/main/java/org/apache/apex/api/Launcher.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/Launcher.java b/api/src/main/java/org/apache/apex/api/Launcher.java
index 14c365a..0291ab0 100644
--- a/api/src/main/java/org/apache/apex/api/Launcher.java
+++ b/api/src/main/java/org/apache/apex/api/Launcher.java
@@ -77,8 +77,30 @@ public abstract class Launcher<H extends Launcher.AppHandle>
KILL
}
- // Marker interface
- public interface AppHandle {}
+ /**
+ * Results of application launch. The client can interact with the running application through this handle.
+ */
+ public interface AppHandle
+ {
+ boolean isFinished();
+
+ /**
+ * Shutdown the application.
+ *
+ * The method takes the application handle and a shutdown mode. The shutdown mode specifies how to shutdown the
+ * application.
+ *
+ * If the mode is AWAIT_TERMINATION, an attempt should be made to shutdown the application in an orderly fashion
+ * and wait till termination. If the application does not terminate in a reasonable amount of time the
+ * implementation can forcibly terminate the application.
+ *
+ * If the mode is KILL, the application can be killed immediately.
+ *
+ * @param shutdownMode The shutdown mode
+ */
+ void shutdown(ShutdownMode shutdownMode) throws LauncherException;
+
+ }
/**
* Get a launcher instance.<br><br>
@@ -138,34 +160,6 @@ public abstract class Launcher<H extends Launcher.AppHandle>
*/
public abstract H launchApp(StreamingApplication application, Configuration configuration, Attribute.AttributeMap launchParameters) throws LauncherException;
- /**
- * Shutdown the application and await termination.
- * Also see {@link #shutdownApp(AppHandle, ShutdownMode)}
- *
- * @param app The application handle
- */
- public void shutdownApp(H app) throws LauncherException
- {
- shutdownApp(app, ShutdownMode.AWAIT_TERMINATION);
- }
-
- /**
- * Shutdown the application.
- *
- * The method takes the application handle and a shutdown mode. The shutdown mode specifies how to shutdown the
- * application.
- *
- * If the mode is AWAIT_TERMINATION, an attempt should be made to shutdown the application in an orderly fashion
- * and wait till termination. If the application does not terminate in a reasonable amount of time the
- * implementation can forcibly terminate the application.
- *
- * If the mode is KILL, the application can be killed immediately.
- *
- * @param app The application handle
- * @param shutdownMode The shutdown mode
- */
- public abstract void shutdownApp(H app, ShutdownMode shutdownMode) throws LauncherException;
-
protected static <T> T loadService(Class<T> clazz)
{
ServiceLoader<T> loader = ServiceLoader.load(clazz);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/0be03527/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index 48ed070..14a2827 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -411,6 +411,11 @@ public class StramLocalCluster implements Runnable, Controller
appDone = true;
}
+ public boolean isFinished()
+ {
+ return appDone;
+ }
+
@Override
public void setHeartbeatMonitoringEnabled(boolean enabled)
{
http://git-wip-us.apache.org/repos/asf/apex-core/blob/0be03527/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
index 9ace9b5..5930e78 100644
--- a/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
+++ b/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
@@ -68,7 +68,7 @@ public class EmbeddedAppLauncherImpl extends LocalMode<EmbeddedAppLauncherImpl.E
} catch (Exception e) {
throw new LauncherException(e);
}
- LocalMode.Controller lc = getController();
+ StramLocalCluster lc = getController();
boolean launched = false;
if (launchParameters != null) {
if (StramUtils.getValueWithDefault(launchParameters, SERIALIZE_DAG)) {
@@ -100,16 +100,6 @@ public class EmbeddedAppLauncherImpl extends LocalMode<EmbeddedAppLauncherImpl.E
}
@Override
- public void shutdownApp(EmbeddedAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException
- {
- if (shutdownMode != ShutdownMode.KILL) {
- app.controller.shutdown();
- } else {
- throw new UnsupportedOperationException("Kill not supported");
- }
- }
-
- @Override
public DAG prepareDAG(StreamingApplication app, Configuration conf) throws Exception
{
if (app == null && conf == null) {
@@ -125,7 +115,7 @@ public class EmbeddedAppLauncherImpl extends LocalMode<EmbeddedAppLauncherImpl.E
}
@Override
- public Controller getController()
+ public StramLocalCluster getController()
{
try {
addLibraryJarsToClasspath(lp);
@@ -157,17 +147,26 @@ public class EmbeddedAppLauncherImpl extends LocalMode<EmbeddedAppLauncherImpl.E
}
- /**
- *
- */
public static class EmbeddedAppHandleImpl implements EmbeddedAppLauncher.EmbeddedAppHandle
{
- Controller controller;
+ final StramLocalCluster controller;
- public EmbeddedAppHandleImpl(Controller controller)
+ public EmbeddedAppHandleImpl(StramLocalCluster controller)
{
this.controller = controller;
}
+ @Override
+ public boolean isFinished()
+ {
+ return controller.isFinished();
+ }
+
+ @Override
+ public void shutdown(ShutdownMode shutdownMode) throws LauncherException
+ {
+ controller.shutdown();
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/0be03527/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
index 4f5c8c8..d7a6dc8 100644
--- a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
+++ b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
@@ -20,14 +20,15 @@ package org.apache.apex.engine;
import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.apex.api.YarnAppLauncher;
import org.apache.apex.engine.util.StreamingAppFactory;
+import org.apache.bval.jsr303.util.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -35,7 +36,6 @@ import com.google.common.base.Throwables;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.stram.StramUtils;
import com.datatorrent.stram.client.StramAppLauncher;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
@@ -54,6 +54,7 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
propMapping.put(YarnAppLauncher.QUEUE_NAME, StramAppLauncher.QUEUE_NAME);
}
+ @Override
public YarnAppHandleImpl launchApp(final StreamingApplication app, Configuration conf, Attribute.AttributeMap launchParameters) throws LauncherException
{
if (launchParameters != null) {
@@ -83,27 +84,21 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
}
}
- @Override
- public void shutdownApp(YarnAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException
+ protected void shutdownApp(YarnAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException
{
if (shutdownMode == ShutdownMode.KILL) {
YarnClient yarnClient = YarnClient.createYarnClient();
try {
- String appId = app.getApplicationId();
- ApplicationId applicationId = null;
- List<ApplicationReport> applications = StramUtils.getApexApplicationList(yarnClient);
- for (ApplicationReport application : applications) {
- if (application.getApplicationId().toString().equals(appId)) {
- applicationId = application.getApplicationId();
- break;
- }
- }
- if (applicationId == null) {
- throw new LauncherException("Application " + appId + " not found");
+ ApplicationId applicationId = app.appId;
+ ApplicationReport appReport = yarnClient.getApplicationReport(applicationId);
+ if (appReport == null) {
+ throw new LauncherException("Application " + app.getApplicationId() + " not found");
}
yarnClient.killApplication(applicationId);
} catch (YarnException | IOException e) {
throw Throwables.propagate(e);
+ } finally {
+ IOUtils.closeQuietly(yarnClient);
}
} else {
throw new UnsupportedOperationException("Orderly shutdown not supported, try kill instead");
@@ -127,12 +122,9 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
}
}
- /**
- *
- */
- public static class YarnAppHandleImpl implements YarnAppLauncher.YarnAppHandle
+ public class YarnAppHandleImpl implements YarnAppLauncher.YarnAppHandle
{
- ApplicationId appId;
+ final ApplicationId appId;
public YarnAppHandleImpl(ApplicationId appId)
{
@@ -144,5 +136,34 @@ public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.Yar
{
return appId.toString();
}
+
+ @Override
+ public boolean isFinished()
+ {
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ try {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport != null) {
+ if (appReport.getFinalApplicationStatus() == null
+ || appReport.getFinalApplicationStatus() == FinalApplicationStatus.UNDEFINED) {
+ return false;
+ }
+ }
+ return true;
+ } catch (YarnException | IOException e) {
+ throw Throwables.propagate(e);
+ } finally {
+ IOUtils.closeQuietly(yarnClient);
+ }
+ }
+
+ @Override
+ public void shutdown(org.apache.apex.api.Launcher.ShutdownMode shutdownMode)
+ throws org.apache.apex.api.Launcher.LauncherException
+ {
+ shutdownApp(this, shutdownMode);
+
+ }
+
}
}