You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/11/21 21:28:03 UTC
[1/2] apex-core git commit: APEXCORE-405 API to launch app on local
mode or cluster
Repository: apex-core
Updated Branches:
refs/heads/master 5fb9d045d -> cf8141846
APEXCORE-405 API to launch app on local mode or cluster
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/7ad1d75d
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/7ad1d75d
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/7ad1d75d
Branch: refs/heads/master
Commit: 7ad1d75db581641916d3c7f68fb9dcd145d8cc66
Parents: 81b8c92
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Fri Jun 24 00:44:57 2016 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon Nov 21 08:43:17 2016 -0800
----------------------------------------------------------------------
.idea/codeStyleSettings.xml | 2 +-
.../java/com/datatorrent/api/LocalMode.java | 41 +---
.../apache/apex/api/EmbeddedAppLauncher.java | 94 +++++++++
.../main/java/org/apache/apex/api/Launcher.java | 192 +++++++++++++++++++
.../org/apache/apex/api/YarnAppLauncher.java | 88 +++++++++
.../com/datatorrent/stram/LocalModeImpl.java | 103 ----------
.../java/com/datatorrent/stram/StramUtils.java | 23 ++-
.../java/com/datatorrent/stram/cli/ApexCli.java | 5 +-
.../stram/client/StramAppLauncher.java | 28 +--
.../apex/engine/EmbeddedAppLauncherImpl.java | 173 +++++++++++++++++
.../apache/apex/engine/YarnAppLauncherImpl.java | 148 ++++++++++++++
.../apex/engine/util/StreamingAppFactory.java | 64 +++++++
.../services/com.datatorrent.api.LocalMode | 2 +-
.../org.apache.apex.api.EmbeddedAppLauncher | 1 +
.../org.apache.apex.api.YarnAppLauncher | 1 +
15 files changed, 796 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/.idea/codeStyleSettings.xml
----------------------------------------------------------------------
diff --git a/.idea/codeStyleSettings.xml b/.idea/codeStyleSettings.xml
index 8dfc54e..7b75d12 100644
--- a/.idea/codeStyleSettings.xml
+++ b/.idea/codeStyleSettings.xml
@@ -105,4 +105,4 @@
</option>
<option name="USE_PER_PROJECT_SETTINGS" value="true" />
</component>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/api/src/main/java/com/datatorrent/api/LocalMode.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/LocalMode.java b/api/src/main/java/com/datatorrent/api/LocalMode.java
index 51d3da3..7d6f1ee 100644
--- a/api/src/main/java/com/datatorrent/api/LocalMode.java
+++ b/api/src/main/java/com/datatorrent/api/LocalMode.java
@@ -18,17 +18,17 @@
*/
package com.datatorrent.api;
-import java.util.Iterator;
-import java.util.ServiceLoader;
-
+import org.apache.apex.api.EmbeddedAppLauncher;
import org.apache.hadoop.conf.Configuration;
/**
* Local mode execution for single application
*
+ * @deprecated
* @since 0.3.2
*/
-public abstract class LocalMode
+@Deprecated
+public abstract class LocalMode<H extends EmbeddedAppLauncher.EmbeddedAppHandle> extends EmbeddedAppLauncher<H>
{
/**
@@ -96,38 +96,7 @@ public abstract class LocalMode
*/
public static LocalMode newInstance()
{
- ServiceLoader<LocalMode> loader = ServiceLoader.load(LocalMode.class);
- Iterator<LocalMode> impl = loader.iterator();
- if (!impl.hasNext()) {
- throw new RuntimeException("No implementation for " + LocalMode.class);
- }
- return impl.next();
- }
-
- /**
- * Shortcut to run an application. Used for testing.
- *
- * @param app
- * @param runMillis
- */
- public static void runApp(StreamingApplication app, int runMillis)
- {
- runApp(app, null, runMillis);
- }
-
- /**
- * Shortcut to run an application with the modified configuration.
- *
- * @param app - Application to be run
- * @param configuration - Configuration
- * @param runMillis - The time after which the application will be shutdown; pass 0 to run indefinitely.
- */
- public static void runApp(StreamingApplication app, Configuration configuration, int runMillis)
- {
- LocalMode lma = newInstance();
- app.populateDAG(lma.getDAG(), configuration == null ? new Configuration(false) : configuration);
- LocalMode.Controller lc = lma.getController();
- lc.run(runMillis);
+ return loadService(LocalMode.class);
}
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java b/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java
new file mode 100644
index 0000000..8e3e0f6
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+/**
+ * Launcher for running the application directly in the current Java VM. For basic operations such as launching or
+ * stopping the application, {@link Launcher} can be used directly.
+ */
+public abstract class EmbeddedAppLauncher<H extends EmbeddedAppLauncher.EmbeddedAppHandle> extends Launcher<H>
+{
+ /**
+ * Parameter to specify the time after which the application will be shutdown; pass 0 to run indefinitely.
+ */
+ public static final Attribute<Long> RUN_MILLIS = new Attribute<Long>(0L);
+
+ /**
+ * Parameter to launch application asynchronously and return from launch immediately.
+ */
+ public static final Attribute<Boolean> RUN_ASYNC = new Attribute<Boolean>(false);
+
+ /**
+ * Parameter to enable or disable heartbeat monitoring.
+ */
+ public static final Attribute<Boolean> HEARTBEAT_MONITORING = new Attribute<Boolean>(true);
+
+ /**
+ * Parameter to serialize DAG before launch.
+ */
+ public static final Attribute<Boolean> SERIALIZE_DAG = new Attribute<Boolean>(false);
+
+ static {
+ Attribute.AttributeMap.AttributeInitializer.initialize(LocalMode.class);
+ }
+
+ public static EmbeddedAppLauncher newInstance()
+ {
+ return loadService(EmbeddedAppLauncher.class);
+ }
+
+ /**
+ * The EmbeddedAppHandle class would be useful in future to provide additional information without breaking backwards
+ * compatibility of the launchApp method
+ */
+ public interface EmbeddedAppHandle extends AppHandle {}
+
+ /**
+ * Shortcut to run an application. Used for testing.
+ *
+ * @param app
+ * @param runMillis
+ */
+ public static void runApp(StreamingApplication app, int runMillis)
+ {
+ runApp(app, null, runMillis);
+ }
+
+ /**
+ * Shortcut to run an application with the modified configuration.
+ *
+ * @param app - Application to be run
+ * @param configuration - Configuration
+ * @param runMillis - The time after which the application will be shutdown; pass 0 to run indefinitely.
+ */
+ public static void runApp(StreamingApplication app, Configuration configuration, int runMillis)
+ {
+ EmbeddedAppLauncher launcher = newInstance();
+ Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ launchAttributes.put(RUN_MILLIS, (long)runMillis);
+ launcher.launchApp(app, configuration, launchAttributes);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/api/src/main/java/org/apache/apex/api/Launcher.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/Launcher.java b/api/src/main/java/org/apache/apex/api/Launcher.java
new file mode 100644
index 0000000..14c365a
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/Launcher.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.StreamingApplication;
+
+/**
+ * A class that provides an entry point for functionality to run applications in different environments such as current
+ * Java VM, Hadoop YARN etc.
+ */
+public abstract class Launcher<H extends Launcher.AppHandle>
+{
+
+ public static final String NEW_INSTANCE_METHOD = "newInstance";
+
+ /**
+ * Denotes an environment in which to launch the application. Also, contains the list of supported environments.
+ * @param <L> The launcher for the specific environment
+ */
+ public static class LaunchMode<L extends Launcher>
+ {
+ /**
+ * Launch application in the current Java VM
+ */
+ public static final LaunchMode<EmbeddedAppLauncher> EMBEDDED = new LaunchMode<>(EmbeddedAppLauncher.class);
+ /**
+ * Launch application on Hadoop YARN
+ */
+ public static final LaunchMode<YarnAppLauncher> YARN = new LaunchMode<>(YarnAppLauncher.class);
+
+ Class<L> clazz;
+
+ public LaunchMode(Class<L> clazz)
+ {
+ this.clazz = clazz;
+ }
+ }
+
+ /**
+ * Specifies the manner in which a running application be stopped.
+ */
+ public enum ShutdownMode
+ {
+ /**
+ * Shutdown the application in an orderly fashion and wait till it stops running
+ */
+ AWAIT_TERMINATION,
+ /**
+ * Kill the application immediately
+ */
+ KILL
+ }
+
+ // Marker interface
+ public interface AppHandle {}
+
+ /**
+ * Get a launcher instance.<br><br>
+ *
+ * Returns a launcher specific to the given launch mode. This allows the user to also use custom methods supported by
+ * the specific launcher along with the basic launch methods from this class.
+ *
+ * @param launchMode - The launch mode to use
+ *
+ * @return The launcher
+ */
+ public static <L extends Launcher<?>> L getLauncher(LaunchMode<L> launchMode)
+ {
+ L launcher;
+ // If the static method for creating a new instance is present in the launcher, it is invoked to create an instance.
+ // This gives an opportunity for the launcher to do something custom when creating an instance. If the method is not
+ // present, the service is loaded from the class name. A factory approach would be cleaner and type safe but adds
+ // unnecessary complexity, going with the static method for now.
+ try {
+ Method m = launchMode.clazz.getDeclaredMethod(NEW_INSTANCE_METHOD);
+ launcher = (L)m.invoke(null);
+ } catch (NoSuchMethodException e) {
+ launcher = loadService(launchMode.clazz);
+ } catch (InvocationTargetException | IllegalAccessException e) {
+ throw Throwables.propagate(e);
+ }
+ return launcher;
+ }
+
+ /**
+ * Launch application with configuration.<br><br>
+ *
+ * Launch the given streaming application with the given configuration.
+ *
+ * @param application - Application to be run
+ * @param configuration - Application Configuration
+ *
+ * @return The application handle
+ */
+ public H launchApp(StreamingApplication application, Configuration configuration) throws LauncherException
+ {
+ return launchApp(application, configuration, null);
+ }
+
+ /**
+ * Launch application with configuration and launch parameters.
+ *
+ * Launch the given streaming application with the given configuration and parameters. The parameters should be from
+ * the list of parameters supported by the launcher. To find out more about the supported parameters look at the
+ * documentation of the individual launcher.
+ *
+ * @param application - Application to be run
+ * @param configuration - Application Configuration
+ * @param launchParameters - Launch Parameters
+ *
+ * @return The application handle
+ */
+ public abstract H launchApp(StreamingApplication application, Configuration configuration, Attribute.AttributeMap launchParameters) throws LauncherException;
+
+ /**
+ * Shutdown the application and await termination.
+ * Also see {@link #shutdownApp(AppHandle, ShutdownMode)}
+ *
+ * @param app The application handle
+ */
+ public void shutdownApp(H app) throws LauncherException
+ {
+ shutdownApp(app, ShutdownMode.AWAIT_TERMINATION);
+ }
+
+ /**
+ * Shutdown the application.
+ *
+ * The method takes the application handle and a shutdown mode. The shutdown mode specifies how to shutdown the
+ * application.
+ *
+ * If the mode is AWAIT_TERMINATION, an attempt should be made to shutdown the application in an orderly fashion
+ * and wait till termination. If the application does not terminate in a reasonable amount of time the
+ * implementation can forcibly terminate the application.
+ *
+ * If the mode is KILL, the application can be killed immediately.
+ *
+ * @param app The application handle
+ * @param shutdownMode The shutdown mode
+ */
+ public abstract void shutdownApp(H app, ShutdownMode shutdownMode) throws LauncherException;
+
+ protected static <T> T loadService(Class<T> clazz)
+ {
+ ServiceLoader<T> loader = ServiceLoader.load(clazz);
+ Iterator<T> impl = loader.iterator();
+ if (!impl.hasNext()) {
+ throw new RuntimeException("No implementation for " + clazz);
+ }
+ return impl.next();
+ }
+
+ public static class LauncherException extends RuntimeException
+ {
+ public LauncherException(String message)
+ {
+ super(message);
+ }
+
+ public LauncherException(Throwable cause)
+ {
+ super(cause);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
new file mode 100644
index 0000000..e3b36db
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.StringCodec;
+
+/**
+ * Launcher for running the application on Hadoop YARN. For basic operations such as launching or stopping the
+ * application, {@link Launcher} can be used directly.
+ */
+public abstract class YarnAppLauncher<H extends YarnAppLauncher.YarnAppHandle> extends Launcher<H>
+{
+
+ /**
+ * Parameter to specify extra jars for launch.
+ */
+ public static final Attribute<String> LIB_JARS = new Attribute<String>(new StringCodec.String2String());
+
+ /**
+ * Parameter to specify the previous application id to use to resume launch from.
+ */
+ public static final Attribute<String> ORIGINAL_APP_ID = new Attribute<String>(new StringCodec.String2String());
+
+ /**
+ * Parameter to specify the queue name to use for launch.
+ */
+ public static final Attribute<String> QUEUE_NAME = new Attribute<String>(new StringCodec.String2String());
+
+ static {
+ Attribute.AttributeMap.AttributeInitializer.initialize(YarnAppLauncher.class);
+ }
+
+ public static YarnAppLauncher newInstance()
+ {
+ return loadService(YarnAppLauncher.class);
+ }
+
+ public interface YarnAppHandle extends AppHandle
+ {
+ String getApplicationId();
+ }
+
+ /**
+ * Shortcut to run an application with the modified configuration.
+ *
+ * @param app - Application to be run
+ * @param configuration - Application Configuration
+ */
+ public static void runApp(StreamingApplication app, Configuration configuration) throws LauncherException
+ {
+ runApp(app, configuration, null);
+ }
+
+ /**
+ * Shortcut to run an application with the modified configuration.
+ *
+ * @param app - Application to be run
+ * @param configuration - Application Configuration
+ * @param launchAttributes - Launch Configuration
+ */
+ public static void runApp(StreamingApplication app, Configuration configuration, Attribute.AttributeMap launchAttributes) throws LauncherException
+
+ {
+ YarnAppLauncher launcher = newInstance();
+ launcher.launchApp(app, configuration, launchAttributes);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java b/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java
deleted file mode 100644
index 3fedc7c..0000000
--- a/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.stram;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.stram.plan.logical.LogicalPlan;
-import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
-
-/**
- * <p>LocalModeImpl class.</p>
- *
- * @since 0.3.2
- */
-public class LocalModeImpl extends LocalMode
-{
- private final LogicalPlan lp = new LogicalPlan();
-
- @Override
- public DAG getDAG()
- {
- return lp;
- }
-
- @Override
- public DAG cloneDAG() throws Exception
- {
- return StramLocalCluster.cloneLogicalPlan(lp);
- }
-
- @Override
- public DAG prepareDAG(StreamingApplication app, Configuration conf) throws Exception
- {
- if (app == null && conf == null) {
- throw new IllegalArgumentException("Require app or configuration to populate logical plan.");
- }
- if (conf == null) {
- conf = new Configuration(false);
- }
- LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
- String appName = app != null ? app.getClass().getName() : "unknown";
- lpc.prepareDAG(lp, app, appName);
- return lp;
- }
-
- @Override
- public Controller getController()
- {
- try {
- addLibraryJarsToClasspath(lp);
- return new StramLocalCluster(lp);
- } catch (Exception e) {
- throw new RuntimeException("Error creating local cluster", e);
- }
- }
-
- private void addLibraryJarsToClasspath(LogicalPlan lp) throws MalformedURLException
- {
- String libJarsCsv = lp.getAttributes().get(Context.DAGContext.LIBRARY_JARS);
-
- if (libJarsCsv != null && libJarsCsv.length() != 0) {
- String[] split = libJarsCsv.split(StramClient.LIB_JARS_SEP);
- if (split.length != 0) {
- URL[] urlList = new URL[split.length];
- for (int i = 0; i < split.length; i++) {
- File file = new File(split[i]);
- urlList[i] = file.toURI().toURL();
- }
-
- // Set class loader.
- ClassLoader prevCl = Thread.currentThread().getContextClassLoader();
- URLClassLoader cl = URLClassLoader.newInstance(urlList, prevCl);
- Thread.currentThread().setContextClassLoader(cl);
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/com/datatorrent/stram/StramUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramUtils.java b/engine/src/main/java/com/datatorrent/stram/StramUtils.java
index 2b2baa6..99f7bd4 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramUtils.java
@@ -18,7 +18,8 @@
*/
package com.datatorrent.stram;
-
+import java.io.IOException;
+import java.util.List;
import java.util.Map;
import org.codehaus.jettison.json.JSONArray;
@@ -28,10 +29,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.datatorrent.api.Attribute;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.stram.util.LoggerUtil;
@@ -148,4 +154,19 @@ public abstract class StramUtils
return jsonObject;
}
+
+ public static <T> T getValueWithDefault(Attribute.AttributeMap map, Attribute<T> key)
+ {
+ T value = map.get(key);
+ if (value == null) {
+ value = key.defaultValue;
+ }
+ return value;
+ }
+
+ public static List<ApplicationReport> getApexApplicationList(YarnClient yarnClient) throws IOException, YarnException
+ {
+ return yarnClient.getApplications(Sets.newHashSet(StramClient.YARN_APPLICATION_TYPE, StramClient.YARN_APPLICATION_TYPE_DEPRECATED));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 5cfde36..422f0c6 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -89,14 +89,13 @@ import org.apache.log4j.Level;
import org.apache.tools.ant.DirectoryScanner;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
import com.sun.jersey.api.client.WebResource;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG.GenericOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.stram.StramClient;
+import com.datatorrent.stram.StramUtils;
import com.datatorrent.stram.client.AppPackage;
import com.datatorrent.stram.client.AppPackage.AppInfo;
import com.datatorrent.stram.client.ConfigPackage;
@@ -1593,7 +1592,7 @@ public class ApexCli
private List<ApplicationReport> getApplicationList()
{
try {
- return yarnClient.getApplications(Sets.newHashSet(StramClient.YARN_APPLICATION_TYPE, StramClient.YARN_APPLICATION_TYPE_DEPRECATED));
+ return StramUtils.getApexApplicationList(yarnClient);
} catch (Exception e) {
throw new CliException("Error getting application list from resource manager", e);
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index 961a97b..216771d 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -44,6 +44,7 @@ import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.engine.util.StreamingAppFactory;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.NotImplementedException;
@@ -62,7 +63,6 @@ import com.google.common.collect.Sets;
import com.datatorrent.api.Context;
import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.stram.StramClient;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.StramUtils;
@@ -462,36 +462,16 @@ public class StramAppLauncher
try {
final Class<?> clazz = cl.loadClass(className);
if (!Modifier.isAbstract(clazz.getModifiers()) && StreamingApplication.class.isAssignableFrom(clazz)) {
- final AppFactory appConfig = new AppFactory()
+ final AppFactory appConfig = new StreamingAppFactory(classFileName, clazz)
{
@Override
- public String getName()
- {
- return classFileName;
- }
-
- @Override
- public String getDisplayName()
- {
- ApplicationAnnotation an = clazz.getAnnotation(ApplicationAnnotation.class);
- if (an != null) {
- return an.name();
- } else {
- return classFileName;
- }
- }
-
- @Override
- public LogicalPlan createApp(LogicalPlanConfiguration conf)
+ public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
{
// load class from current context class loader
Class<? extends StreamingApplication> c = StramUtils.classForName(className, StreamingApplication.class);
StreamingApplication app = StramUtils.newInstance(c);
- LogicalPlan dag = new LogicalPlan();
- conf.prepareDAG(dag, app, getName());
- return dag;
+ return super.createApp(app, planConfig);
}
-
};
appResourceList.add(appConfig);
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
new file mode 100644
index 0000000..9ace9b5
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.stram.StramClient;
+import com.datatorrent.stram.StramLocalCluster;
+import com.datatorrent.stram.StramUtils;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
+
+/**
+ * An implementation of {@link EmbeddedAppLauncher} to launch applications directly in the current Java VM.
+ *
+ * TODO: When LocalMode is removed, make this class extend EmbeddedAppLauncher directly
+ * @since 0.3.2
+ */
+public class EmbeddedAppLauncherImpl extends LocalMode<EmbeddedAppLauncherImpl.EmbeddedAppHandleImpl>
+{
+ private final LogicalPlan lp = new LogicalPlan();
+
+ @Override
+ public DAG getDAG()
+ {
+ return lp;
+ }
+
+ @Override
+ public DAG cloneDAG() throws Exception
+ {
+ return StramLocalCluster.cloneLogicalPlan(lp);
+ }
+
+ @Override
+ public EmbeddedAppHandleImpl launchApp(StreamingApplication application, Configuration configuration, Attribute.AttributeMap
+ launchParameters) throws LauncherException
+ {
+ try {
+ prepareDAG(application, configuration);
+ } catch (Exception e) {
+ throw new LauncherException(e);
+ }
+ LocalMode.Controller lc = getController();
+ boolean launched = false;
+ if (launchParameters != null) {
+ if (StramUtils.getValueWithDefault(launchParameters, SERIALIZE_DAG)) {
+ // Check if DAG can be serialized
+ try {
+ cloneDAG();
+ } catch (Exception e) {
+ throw new LauncherException(e);
+ }
+ }
+ if (StramUtils.getValueWithDefault(launchParameters, HEARTBEAT_MONITORING)) {
+ lc.setHeartbeatMonitoringEnabled(true);
+ }
+ if (StramUtils.getValueWithDefault(launchParameters, RUN_ASYNC)) {
+ lc.runAsync();
+ launched = true;
+ } else {
+ Long runMillis = StramUtils.getValueWithDefault(launchParameters, RUN_MILLIS);
+ if (runMillis != null) {
+ lc.run(runMillis);
+ launched = true;
+ }
+ }
+ }
+ if (!launched) {
+ lc.run();
+ }
+ return new EmbeddedAppHandleImpl(lc);
+ }
+
+ @Override
+ public void shutdownApp(EmbeddedAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException
+ {
+ if (shutdownMode != ShutdownMode.KILL) {
+ app.controller.shutdown();
+ } else {
+ throw new UnsupportedOperationException("Kill not supported");
+ }
+ }
+
+ @Override
+ public DAG prepareDAG(StreamingApplication app, Configuration conf) throws Exception
+ {
+ if (app == null && conf == null) {
+ throw new IllegalArgumentException("Require app or configuration to populate logical plan.");
+ }
+ if (conf == null) {
+ conf = new Configuration(false);
+ }
+ LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
+ String appName = app != null ? app.getClass().getName() : "unknown";
+ lpc.prepareDAG(lp, app, appName);
+ return lp;
+ }
+
+ @Override
+ public Controller getController()
+ {
+ try {
+ addLibraryJarsToClasspath(lp);
+ return new StramLocalCluster(lp);
+ } catch (Exception e) {
+ throw new RuntimeException("Error creating local cluster", e);
+ }
+ }
+
+ private void addLibraryJarsToClasspath(LogicalPlan lp) throws MalformedURLException
+ {
+ String libJarsCsv = lp.getAttributes().get(Context.DAGContext.LIBRARY_JARS);
+
+ if (libJarsCsv != null && libJarsCsv.length() != 0) {
+ String[] split = libJarsCsv.split(StramClient.LIB_JARS_SEP);
+ if (split.length != 0) {
+ URL[] urlList = new URL[split.length];
+ for (int i = 0; i < split.length; i++) {
+ File file = new File(split[i]);
+ urlList[i] = file.toURI().toURL();
+ }
+
+ // Set class loader.
+ ClassLoader prevCl = Thread.currentThread().getContextClassLoader();
+ URLClassLoader cl = URLClassLoader.newInstance(urlList, prevCl);
+ Thread.currentThread().setContextClassLoader(cl);
+ }
+ }
+
+ }
+
+ /**
+ *
+ */
+ public static class EmbeddedAppHandleImpl implements EmbeddedAppLauncher.EmbeddedAppHandle
+ {
+ Controller controller;
+
+ public EmbeddedAppHandleImpl(Controller controller)
+ {
+ this.controller = controller;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
new file mode 100644
index 0000000..4f5c8c8
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.api.YarnAppLauncher;
+import org.apache.apex.engine.util.StreamingAppFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.stram.StramUtils;
+import com.datatorrent.stram.client.StramAppLauncher;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
+
+/**
+ * An implementation of {@link YarnAppLauncher} to launch applications on Hadoop YARN.
+ */
+public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.YarnAppHandleImpl>
+{
+
+ private static final Map<Attribute<?>, String> propMapping = new HashMap<>();
+
+ static {
+ propMapping.put(YarnAppLauncher.LIB_JARS, StramAppLauncher.LIBJARS_CONF_KEY_NAME);
+ propMapping.put(YarnAppLauncher.ORIGINAL_APP_ID, StramAppLauncher.ORIGINAL_APP_ID);
+ propMapping.put(YarnAppLauncher.QUEUE_NAME, StramAppLauncher.QUEUE_NAME);
+ }
+
+ public YarnAppHandleImpl launchApp(final StreamingApplication app, Configuration conf, Attribute.AttributeMap launchParameters) throws LauncherException
+ {
+ if (launchParameters != null) {
+ for (Map.Entry<Attribute<?>, Object> entry : launchParameters.entrySet()) {
+ String property = propMapping.get(entry.getKey());
+ if (property != null) {
+ setConfiguration(conf, property, entry.getValue());
+ }
+ }
+ }
+ try {
+ String name = app.getClass().getName();
+ StramAppLauncher appLauncher = new StramAppLauncher(name, conf);
+ appLauncher.loadDependencies();
+ StreamingAppFactory appFactory = new StreamingAppFactory(name, app.getClass())
+ {
+ @Override
+ public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
+ {
+ return super.createApp(app, planConfig);
+ }
+ };
+ ApplicationId appId = appLauncher.launchApp(appFactory);
+ return new YarnAppHandleImpl(appId);
+ } catch (Exception ex) {
+ throw new LauncherException(ex);
+ }
+ }
+
+ @Override
+ public void shutdownApp(YarnAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException
+ {
+ if (shutdownMode == ShutdownMode.KILL) {
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ try {
+ String appId = app.getApplicationId();
+ ApplicationId applicationId = null;
+ List<ApplicationReport> applications = StramUtils.getApexApplicationList(yarnClient);
+ for (ApplicationReport application : applications) {
+ if (application.getApplicationId().toString().equals(appId)) {
+ applicationId = application.getApplicationId();
+ break;
+ }
+ }
+ if (applicationId == null) {
+ throw new LauncherException("Application " + appId + " not found");
+ }
+ yarnClient.killApplication(applicationId);
+ } catch (YarnException | IOException e) {
+ throw Throwables.propagate(e);
+ }
+ } else {
+ throw new UnsupportedOperationException("Orderly shutdown not supported, try kill instead");
+ }
+ }
+
+ private void setConfiguration(Configuration conf, String property, Object value)
+ {
+ if (value instanceof Integer) {
+ conf.setInt(property, (Integer)value);
+ } else if (value instanceof Boolean) {
+ conf.setBoolean(property, (Boolean)value);
+ } else if (value instanceof Long) {
+ conf.setLong(property, (Long)value);
+ } else if (value instanceof Float) {
+ conf.setFloat(property, (Float)value);
+ } else if (value instanceof Double) {
+ conf.setDouble(property, (Double)value);
+ } else {
+ conf.set(property, value.toString());
+ }
+ }
+
+ /**
+ *
+ */
+ public static class YarnAppHandleImpl implements YarnAppLauncher.YarnAppHandle
+ {
+ ApplicationId appId;
+
+ public YarnAppHandleImpl(ApplicationId appId)
+ {
+ this.appId = appId;
+ }
+
+ @Override
+ public String getApplicationId()
+ {
+ return appId.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java b/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java
new file mode 100644
index 0000000..6c943fc
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.util;
+
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.stram.client.StramAppLauncher;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
+
+/**
+ *
+ */
+public abstract class StreamingAppFactory implements StramAppLauncher.AppFactory
+{
+ private Class<?> appClazz;
+ private String name;
+
+ public StreamingAppFactory(String name, Class<?> appClazz)
+ {
+ this.name = name;
+ this.appClazz = appClazz;
+ }
+
+ public abstract LogicalPlan createApp(LogicalPlanConfiguration planConfig);
+
+ protected LogicalPlan createApp(StreamingApplication app, LogicalPlanConfiguration planConfig)
+ {
+ LogicalPlan dag = new LogicalPlan();
+ planConfig.prepareDAG(dag, app, getName());
+ return dag;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public String getDisplayName()
+ {
+ ApplicationAnnotation an = appClazz.getAnnotation(ApplicationAnnotation.class);
+ if (an != null) {
+ return an.name();
+ } else {
+ return name;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/resources/META-INF/services/com.datatorrent.api.LocalMode
----------------------------------------------------------------------
diff --git a/engine/src/main/resources/META-INF/services/com.datatorrent.api.LocalMode b/engine/src/main/resources/META-INF/services/com.datatorrent.api.LocalMode
index 712a323..cb9119f 100644
--- a/engine/src/main/resources/META-INF/services/com.datatorrent.api.LocalMode
+++ b/engine/src/main/resources/META-INF/services/com.datatorrent.api.LocalMode
@@ -1 +1 @@
-com.datatorrent.stram.LocalModeImpl
\ No newline at end of file
+org.apache.apex.engine.EmbeddedAppLauncherImpl
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/resources/META-INF/services/org.apache.apex.api.EmbeddedAppLauncher
----------------------------------------------------------------------
diff --git a/engine/src/main/resources/META-INF/services/org.apache.apex.api.EmbeddedAppLauncher b/engine/src/main/resources/META-INF/services/org.apache.apex.api.EmbeddedAppLauncher
new file mode 100644
index 0000000..cb9119f
--- /dev/null
+++ b/engine/src/main/resources/META-INF/services/org.apache.apex.api.EmbeddedAppLauncher
@@ -0,0 +1 @@
+org.apache.apex.engine.EmbeddedAppLauncherImpl
http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/resources/META-INF/services/org.apache.apex.api.YarnAppLauncher
----------------------------------------------------------------------
diff --git a/engine/src/main/resources/META-INF/services/org.apache.apex.api.YarnAppLauncher b/engine/src/main/resources/META-INF/services/org.apache.apex.api.YarnAppLauncher
new file mode 100644
index 0000000..45989a4
--- /dev/null
+++ b/engine/src/main/resources/META-INF/services/org.apache.apex.api.YarnAppLauncher
@@ -0,0 +1 @@
+org.apache.apex.engine.YarnAppLauncherImpl
[2/2] apex-core git commit: Merge commit 'refs/pull/351/head' of
https://github.com/apache/apex-core
Posted by th...@apache.org.
Merge commit 'refs/pull/351/head' of https://github.com/apache/apex-core
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/cf814184
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/cf814184
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/cf814184
Branch: refs/heads/master
Commit: cf81418464dc2fad2b75b68a9759c799ffff8b48
Parents: 5fb9d04 7ad1d75
Author: Thomas Weise <th...@apache.org>
Authored: Mon Nov 21 13:19:11 2016 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Mon Nov 21 13:19:11 2016 -0800
----------------------------------------------------------------------
.idea/codeStyleSettings.xml | 2 +-
.../java/com/datatorrent/api/LocalMode.java | 41 +---
.../apache/apex/api/EmbeddedAppLauncher.java | 94 +++++++++
.../main/java/org/apache/apex/api/Launcher.java | 192 +++++++++++++++++++
.../org/apache/apex/api/YarnAppLauncher.java | 88 +++++++++
.../com/datatorrent/stram/LocalModeImpl.java | 103 ----------
.../java/com/datatorrent/stram/StramUtils.java | 23 ++-
.../java/com/datatorrent/stram/cli/ApexCli.java | 5 +-
.../stram/client/StramAppLauncher.java | 28 +--
.../apex/engine/EmbeddedAppLauncherImpl.java | 173 +++++++++++++++++
.../apache/apex/engine/YarnAppLauncherImpl.java | 148 ++++++++++++++
.../apex/engine/util/StreamingAppFactory.java | 64 +++++++
.../services/com.datatorrent.api.LocalMode | 2 +-
.../org.apache.apex.api.EmbeddedAppLauncher | 1 +
.../org.apache.apex.api.YarnAppLauncher | 1 +
15 files changed, 796 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/cf814184/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
----------------------------------------------------------------------