You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/11/21 21:28:03 UTC

[1/2] apex-core git commit: APEXCORE-405 API to launch app on local mode or cluster

Repository: apex-core
Updated Branches:
  refs/heads/master 5fb9d045d -> cf8141846


APEXCORE-405 API to launch app on local mode or cluster


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/7ad1d75d
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/7ad1d75d
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/7ad1d75d

Branch: refs/heads/master
Commit: 7ad1d75db581641916d3c7f68fb9dcd145d8cc66
Parents: 81b8c92
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Fri Jun 24 00:44:57 2016 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon Nov 21 08:43:17 2016 -0800

----------------------------------------------------------------------
 .idea/codeStyleSettings.xml                     |   2 +-
 .../java/com/datatorrent/api/LocalMode.java     |  41 +---
 .../apache/apex/api/EmbeddedAppLauncher.java    |  94 +++++++++
 .../main/java/org/apache/apex/api/Launcher.java | 192 +++++++++++++++++++
 .../org/apache/apex/api/YarnAppLauncher.java    |  88 +++++++++
 .../com/datatorrent/stram/LocalModeImpl.java    | 103 ----------
 .../java/com/datatorrent/stram/StramUtils.java  |  23 ++-
 .../java/com/datatorrent/stram/cli/ApexCli.java |   5 +-
 .../stram/client/StramAppLauncher.java          |  28 +--
 .../apex/engine/EmbeddedAppLauncherImpl.java    | 173 +++++++++++++++++
 .../apache/apex/engine/YarnAppLauncherImpl.java | 148 ++++++++++++++
 .../apex/engine/util/StreamingAppFactory.java   |  64 +++++++
 .../services/com.datatorrent.api.LocalMode      |   2 +-
 .../org.apache.apex.api.EmbeddedAppLauncher     |   1 +
 .../org.apache.apex.api.YarnAppLauncher         |   1 +
 15 files changed, 796 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/.idea/codeStyleSettings.xml
----------------------------------------------------------------------
diff --git a/.idea/codeStyleSettings.xml b/.idea/codeStyleSettings.xml
index 8dfc54e..7b75d12 100644
--- a/.idea/codeStyleSettings.xml
+++ b/.idea/codeStyleSettings.xml
@@ -105,4 +105,4 @@
     </option>
     <option name="USE_PER_PROJECT_SETTINGS" value="true" />
   </component>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/api/src/main/java/com/datatorrent/api/LocalMode.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/LocalMode.java b/api/src/main/java/com/datatorrent/api/LocalMode.java
index 51d3da3..7d6f1ee 100644
--- a/api/src/main/java/com/datatorrent/api/LocalMode.java
+++ b/api/src/main/java/com/datatorrent/api/LocalMode.java
@@ -18,17 +18,17 @@
  */
 package com.datatorrent.api;
 
-import java.util.Iterator;
-import java.util.ServiceLoader;
-
+import org.apache.apex.api.EmbeddedAppLauncher;
 import org.apache.hadoop.conf.Configuration;
 
 /**
  * Local mode execution for single application
  *
+ * @deprecated
  * @since 0.3.2
  */
-public abstract class LocalMode
+@Deprecated
+public abstract class LocalMode<H extends EmbeddedAppLauncher.EmbeddedAppHandle> extends EmbeddedAppLauncher<H>
 {
 
   /**
@@ -96,38 +96,7 @@ public abstract class LocalMode
    */
   public static LocalMode newInstance()
   {
-    ServiceLoader<LocalMode> loader = ServiceLoader.load(LocalMode.class);
-    Iterator<LocalMode> impl = loader.iterator();
-    if (!impl.hasNext()) {
-      throw new RuntimeException("No implementation for " + LocalMode.class);
-    }
-    return impl.next();
-  }
-
-  /**
-   * Shortcut to run an application. Used for testing.
-   *
-   * @param app
-   * @param runMillis
-   */
-  public static void runApp(StreamingApplication app, int runMillis)
-  {
-    runApp(app, null, runMillis);
-  }
-
-  /**
-   * Shortcut to run an application with the modified configuration.
-   *
-   * @param app           - Application to be run
-   * @param configuration - Configuration
-   * @param runMillis     - The time after which the application will be shutdown; pass 0 to run indefinitely.
-   */
-  public static void runApp(StreamingApplication app, Configuration configuration, int runMillis)
-  {
-    LocalMode lma = newInstance();
-    app.populateDAG(lma.getDAG(), configuration == null ? new Configuration(false) : configuration);
-    LocalMode.Controller lc = lma.getController();
-    lc.run(runMillis);
+    return loadService(LocalMode.class);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java b/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java
new file mode 100644
index 0000000..8e3e0f6
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/EmbeddedAppLauncher.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+/**
+ * Launcher for running the application directly in the current Java VM. For basic operations such as launching or
+ * stopping the application, {@link Launcher} can be used directly.
+ */
+public abstract class EmbeddedAppLauncher<H extends EmbeddedAppLauncher.EmbeddedAppHandle> extends Launcher<H>
+{
+  /**
+   * Parameter to specify the time after which the application will be shutdown; pass 0 to run indefinitely.
+   */
+  public static final Attribute<Long> RUN_MILLIS = new Attribute<Long>(0L);
+
+  /**
+   * Parameter to launch application asynchronously and return from launch immediately.
+   */
+  public static final Attribute<Boolean> RUN_ASYNC = new Attribute<Boolean>(false);
+
+  /**
+   * Parameter to enable or disable heartbeat monitoring.
+   */
+  public static final Attribute<Boolean> HEARTBEAT_MONITORING = new Attribute<Boolean>(true);
+
+  /**
+   * Parameter to serialize DAG before launch.
+   */
+  public static final Attribute<Boolean> SERIALIZE_DAG = new Attribute<Boolean>(false);
+
+  static {
+    Attribute.AttributeMap.AttributeInitializer.initialize(LocalMode.class);
+  }
+
+  public static EmbeddedAppLauncher newInstance()
+  {
+    return loadService(EmbeddedAppLauncher.class);
+  }
+
+  /**
+   * The EmbeddedAppHandle class would be useful in future to provide additional information without breaking backwards
+   * compatibility of the launchApp method
+   */
+  public interface EmbeddedAppHandle extends AppHandle {}
+
+  /**
+   * Shortcut to run an application. Used for testing.
+   *
+   * @param app
+   * @param runMillis
+   */
+  public static void runApp(StreamingApplication app, int runMillis)
+  {
+    runApp(app, null, runMillis);
+  }
+
+  /**
+   * Shortcut to run an application with the modified configuration.
+   *
+   * @param app           - Application to be run
+   * @param configuration - Configuration
+   * @param runMillis     - The time after which the application will be shutdown; pass 0 to run indefinitely.
+   */
+  public static void runApp(StreamingApplication app, Configuration configuration, int runMillis)
+  {
+    EmbeddedAppLauncher launcher = newInstance();
+    Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    launchAttributes.put(RUN_MILLIS, (long)runMillis);
+    launcher.launchApp(app, configuration, launchAttributes);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/api/src/main/java/org/apache/apex/api/Launcher.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/Launcher.java b/api/src/main/java/org/apache/apex/api/Launcher.java
new file mode 100644
index 0000000..14c365a
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/Launcher.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.StreamingApplication;
+
+/**
+ * A class that provides an entry point for functionality to run applications in different environments such as current
+ * Java VM, Hadoop YARN etc.
+ */
+public abstract class Launcher<H extends Launcher.AppHandle>
+{
+
+  public static final String NEW_INSTANCE_METHOD = "newInstance";
+
+  /**
+   * Denotes an environment in which to launch the application. Also, contains the list of supported environments.
+   * @param <L> The launcher for the specific environment
+   */
+  public static class LaunchMode<L extends Launcher>
+  {
+    /**
+     * Launch application in the current Java VM
+     */
+    public static final LaunchMode<EmbeddedAppLauncher> EMBEDDED = new LaunchMode<>(EmbeddedAppLauncher.class);
+    /**
+     * Launch application on Hadoop YARN
+     */
+    public static final LaunchMode<YarnAppLauncher> YARN = new LaunchMode<>(YarnAppLauncher.class);
+
+    Class<L> clazz;
+
+    public LaunchMode(Class<L> clazz)
+    {
+      this.clazz = clazz;
+    }
+  }
+
+  /**
+   * Specifies the manner in which a running application be stopped.
+   */
+  public enum ShutdownMode
+  {
+    /**
+     * Shutdown the application in an orderly fashion and wait till it stops running
+     */
+    AWAIT_TERMINATION,
+    /**
+     * Kill the application immediately
+     */
+    KILL
+  }
+
+  // Marker interface
+  public interface AppHandle {}
+
+  /**
+   * Get a launcher instance.<br><br>
+   *
+   * Returns a launcher specific to the given launch mode. This allows the user to also use custom methods supported by
+   * the specific launcher along with the basic launch methods from this class.
+   *
+   * @param launchMode - The launch mode to use
+   *
+   * @return The launcher
+   */
+  public static <L extends Launcher<?>> L getLauncher(LaunchMode<L> launchMode)
+  {
+    L launcher;
+    // If the static method for creating a new instance is present in the launcher, it is invoked to create an instance.
+    // This gives an opportunity for the launcher to do something custom when creating an instance. If the method is not
+    // present, the service is loaded from the class name. A factory approach would be cleaner and type safe but adds
+    // unnecessary complexity, going with the static method for now.
+    try {
+      Method m = launchMode.clazz.getDeclaredMethod(NEW_INSTANCE_METHOD);
+      launcher = (L)m.invoke(null);
+    } catch (NoSuchMethodException e) {
+      launcher = loadService(launchMode.clazz);
+    } catch (InvocationTargetException | IllegalAccessException e) {
+      throw Throwables.propagate(e);
+    }
+    return launcher;
+  }
+
+  /**
+   * Launch application with configuration.<br><br>
+   *
+   * Launch the given streaming application with the given configuration.
+   *
+   * @param application  - Application to be run
+   * @param configuration - Application Configuration
+   *
+   * @return The application handle
+   */
+  public H launchApp(StreamingApplication application, Configuration configuration) throws LauncherException
+  {
+    return launchApp(application, configuration, null);
+  }
+
+  /**
+   * Launch application with configuration and launch parameters.
+   *
+   * Launch the given streaming application with the given configuration and parameters. The parameters should be from
+   * the list of parameters supported by the launcher. To find out more about the supported parameters look at the
+   * documentation of the individual launcher.
+   *
+   * @param application  - Application to be run
+   * @param configuration - Application Configuration
+   * @param launchParameters - Launch Parameters
+   *
+   * @return The application handle
+   */
+  public abstract H launchApp(StreamingApplication application, Configuration configuration, Attribute.AttributeMap launchParameters) throws LauncherException;
+
+  /**
+   * Shutdown the application and await termination.
+   * Also see {@link #shutdownApp(AppHandle, ShutdownMode)}
+   *
+   * @param app The application handle
+   */
+  public void shutdownApp(H app) throws LauncherException
+  {
+    shutdownApp(app, ShutdownMode.AWAIT_TERMINATION);
+  }
+
+  /**
+   * Shutdown the application.
+   *
+   * The method takes the application handle and a shutdown mode. The shutdown mode specifies how to shutdown the
+   * application.
+   *
+   * If the mode is AWAIT_TERMINATION, an attempt should be made to shutdown the application in an orderly fashion
+   * and wait till termination. If the application does not terminate in a reasonable amount of time the
+   * implementation can forcibly terminate the application.
+   *
+   * If the mode is KILL, the application can be killed immediately.
+   *
+   * @param app The application handle
+   * @param shutdownMode The shutdown mode
+   */
+  public abstract void shutdownApp(H app, ShutdownMode shutdownMode) throws LauncherException;
+
+  protected static <T> T loadService(Class<T> clazz)
+  {
+    ServiceLoader<T> loader = ServiceLoader.load(clazz);
+    Iterator<T> impl = loader.iterator();
+    if (!impl.hasNext()) {
+      throw new RuntimeException("No implementation for " + clazz);
+    }
+    return impl.next();
+  }
+
+  public static class LauncherException extends RuntimeException
+  {
+    public LauncherException(String message)
+    {
+      super(message);
+    }
+
+    public LauncherException(Throwable cause)
+    {
+      super(cause);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
new file mode 100644
index 0000000..e3b36db
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/YarnAppLauncher.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.StringCodec;
+
+/**
+ * Launcher for running the application on Hadoop YARN. For basic operations such as launching or stopping the
+ * application, {@link Launcher} can be used directly.
+ */
+public abstract class YarnAppLauncher<H extends YarnAppLauncher.YarnAppHandle> extends Launcher<H>
+{
+
+  /**
+   * Parameter to specify extra jars for launch.
+   */
+  public static final Attribute<String> LIB_JARS = new Attribute<String>(new StringCodec.String2String());
+
+  /**
+   * Parameter to specify the previous application id to use to resume launch from.
+   */
+  public static final Attribute<String> ORIGINAL_APP_ID = new Attribute<String>(new StringCodec.String2String());
+
+  /**
+   * Parameter to specify the queue name to use for launch.
+   */
+  public static final Attribute<String> QUEUE_NAME = new Attribute<String>(new StringCodec.String2String());
+
+  static {
+    Attribute.AttributeMap.AttributeInitializer.initialize(YarnAppLauncher.class);
+  }
+
+  public static YarnAppLauncher newInstance()
+  {
+    return loadService(YarnAppLauncher.class);
+  }
+
+  public interface YarnAppHandle extends AppHandle
+  {
+    String getApplicationId();
+  }
+
+  /**
+   * Shortcut to run an application with the modified configuration.
+   *
+   * @param app           - Application to be run
+   * @param configuration - Application Configuration
+   */
+  public static void runApp(StreamingApplication app, Configuration configuration) throws LauncherException
+  {
+    runApp(app, configuration, null);
+  }
+
+  /**
+   * Shortcut to run an application with the modified configuration.
+   *
+   * @param app           - Application to be run
+   * @param configuration - Application Configuration
+   * @param launchAttributes - Launch Configuration
+   */
+  public static void runApp(StreamingApplication app, Configuration configuration, Attribute.AttributeMap launchAttributes) throws LauncherException
+
+  {
+    YarnAppLauncher launcher = newInstance();
+    launcher.launchApp(app, configuration, launchAttributes);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java b/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java
deleted file mode 100644
index 3fedc7c..0000000
--- a/engine/src/main/java/com/datatorrent/stram/LocalModeImpl.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.stram;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.stram.plan.logical.LogicalPlan;
-import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
-
-/**
- * <p>LocalModeImpl class.</p>
- *
- * @since 0.3.2
- */
-public class LocalModeImpl extends LocalMode
-{
-  private final LogicalPlan lp = new LogicalPlan();
-
-  @Override
-  public DAG getDAG()
-  {
-    return lp;
-  }
-
-  @Override
-  public DAG cloneDAG() throws Exception
-  {
-    return StramLocalCluster.cloneLogicalPlan(lp);
-  }
-
-  @Override
-  public DAG prepareDAG(StreamingApplication app, Configuration conf) throws Exception
-  {
-    if (app == null && conf == null) {
-      throw new IllegalArgumentException("Require app or configuration to populate logical plan.");
-    }
-    if (conf == null) {
-      conf = new Configuration(false);
-    }
-    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
-    String appName = app != null ? app.getClass().getName() : "unknown";
-    lpc.prepareDAG(lp, app, appName);
-    return lp;
-  }
-
-  @Override
-  public Controller getController()
-  {
-    try {
-      addLibraryJarsToClasspath(lp);
-      return new StramLocalCluster(lp);
-    } catch (Exception e) {
-      throw new RuntimeException("Error creating local cluster", e);
-    }
-  }
-
-  private void addLibraryJarsToClasspath(LogicalPlan lp) throws MalformedURLException
-  {
-    String libJarsCsv = lp.getAttributes().get(Context.DAGContext.LIBRARY_JARS);
-
-    if (libJarsCsv != null && libJarsCsv.length() != 0) {
-      String[] split = libJarsCsv.split(StramClient.LIB_JARS_SEP);
-      if (split.length != 0) {
-        URL[] urlList = new URL[split.length];
-        for (int i = 0; i < split.length; i++) {
-          File file = new File(split[i]);
-          urlList[i] = file.toURI().toURL();
-        }
-
-        // Set class loader.
-        ClassLoader prevCl = Thread.currentThread().getContextClassLoader();
-        URLClassLoader cl = URLClassLoader.newInstance(urlList, prevCl);
-        Thread.currentThread().setContextClassLoader(cl);
-      }
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/com/datatorrent/stram/StramUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramUtils.java b/engine/src/main/java/com/datatorrent/stram/StramUtils.java
index 2b2baa6..99f7bd4 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramUtils.java
@@ -18,7 +18,8 @@
  */
 package com.datatorrent.stram;
 
-
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 import org.codehaus.jettison.json.JSONArray;
@@ -28,10 +29,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
+import com.datatorrent.api.Attribute;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.stram.util.LoggerUtil;
 
@@ -148,4 +154,19 @@ public abstract class StramUtils
     return jsonObject;
   }
 
+
+  public static <T> T getValueWithDefault(Attribute.AttributeMap map, Attribute<T> key)
+  {
+    T value = map.get(key);
+    if (value == null) {
+      value = key.defaultValue;
+    }
+    return value;
+  }
+
+  public static List<ApplicationReport> getApexApplicationList(YarnClient yarnClient) throws IOException, YarnException
+  {
+    return yarnClient.getApplications(Sets.newHashSet(StramClient.YARN_APPLICATION_TYPE, StramClient.YARN_APPLICATION_TYPE_DEPRECATED));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 5cfde36..422f0c6 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -89,14 +89,13 @@ import org.apache.log4j.Level;
 import org.apache.tools.ant.DirectoryScanner;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
 import com.sun.jersey.api.client.WebResource;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG.GenericOperator;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.stram.StramClient;
+import com.datatorrent.stram.StramUtils;
 import com.datatorrent.stram.client.AppPackage;
 import com.datatorrent.stram.client.AppPackage.AppInfo;
 import com.datatorrent.stram.client.ConfigPackage;
@@ -1593,7 +1592,7 @@ public class ApexCli
   private List<ApplicationReport> getApplicationList()
   {
     try {
-      return yarnClient.getApplications(Sets.newHashSet(StramClient.YARN_APPLICATION_TYPE, StramClient.YARN_APPLICATION_TYPE_DEPRECATED));
+      return StramUtils.getApexApplicationList(yarnClient);
     } catch (Exception e) {
       throw new CliException("Error getting application list from resource manager", e);
     }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
index 961a97b..216771d 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java
@@ -44,6 +44,7 @@ import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.engine.util.StreamingAppFactory;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.NotImplementedException;
@@ -62,7 +63,6 @@ import com.google.common.collect.Sets;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.stram.StramClient;
 import com.datatorrent.stram.StramLocalCluster;
 import com.datatorrent.stram.StramUtils;
@@ -462,36 +462,16 @@ public class StramAppLauncher
       try {
         final Class<?> clazz = cl.loadClass(className);
         if (!Modifier.isAbstract(clazz.getModifiers()) && StreamingApplication.class.isAssignableFrom(clazz)) {
-          final AppFactory appConfig = new AppFactory()
+          final AppFactory appConfig = new StreamingAppFactory(classFileName, clazz)
           {
             @Override
-            public String getName()
-            {
-              return classFileName;
-            }
-
-            @Override
-            public String getDisplayName()
-            {
-              ApplicationAnnotation an = clazz.getAnnotation(ApplicationAnnotation.class);
-              if (an != null) {
-                return an.name();
-              } else {
-                return classFileName;
-              }
-            }
-
-            @Override
-            public LogicalPlan createApp(LogicalPlanConfiguration conf)
+            public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
             {
               // load class from current context class loader
               Class<? extends StreamingApplication> c = StramUtils.classForName(className, StreamingApplication.class);
               StreamingApplication app = StramUtils.newInstance(c);
-              LogicalPlan dag = new LogicalPlan();
-              conf.prepareDAG(dag, app, getName());
-              return dag;
+              return super.createApp(app, planConfig);
             }
-
           };
           appResourceList.add(appConfig);
         }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
new file mode 100644
index 0000000..9ace9b5
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/EmbeddedAppLauncherImpl.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.stram.StramClient;
+import com.datatorrent.stram.StramLocalCluster;
+import com.datatorrent.stram.StramUtils;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
+
+/**
+ * An implementation of {@link EmbeddedAppLauncher} to launch applications directly in the current Java VM.
+ *
+ * TODO: When LocalMode is removed, make this class extend EmbeddedAppLauncher directly
+ * @since 0.3.2
+ */
+public class EmbeddedAppLauncherImpl extends LocalMode<EmbeddedAppLauncherImpl.EmbeddedAppHandleImpl>
+{
+  private final LogicalPlan lp = new LogicalPlan();
+
+  @Override
+  public DAG getDAG()
+  {
+    return lp;
+  }
+
+  @Override
+  public DAG cloneDAG() throws Exception
+  {
+    return StramLocalCluster.cloneLogicalPlan(lp);
+  }
+
+  @Override
+  public EmbeddedAppHandleImpl launchApp(StreamingApplication application, Configuration configuration, Attribute.AttributeMap
+      launchParameters) throws LauncherException
+  {
+    try {
+      prepareDAG(application, configuration);
+    } catch (Exception e) {
+      throw new LauncherException(e);
+    }
+    LocalMode.Controller lc = getController();
+    boolean launched = false;
+    if (launchParameters != null) {
+      if (StramUtils.getValueWithDefault(launchParameters, SERIALIZE_DAG)) {
+        // Check if DAG can be serialized
+        try {
+          cloneDAG();
+        } catch (Exception e) {
+          throw new LauncherException(e);
+        }
+      }
+      if (StramUtils.getValueWithDefault(launchParameters, HEARTBEAT_MONITORING)) {
+        lc.setHeartbeatMonitoringEnabled(true);
+      }
+      if (StramUtils.getValueWithDefault(launchParameters, RUN_ASYNC)) {
+        lc.runAsync();
+        launched = true;
+      } else {
+        Long runMillis = StramUtils.getValueWithDefault(launchParameters, RUN_MILLIS);
+        if (runMillis != null) {
+          lc.run(runMillis);
+          launched = true;
+        }
+      }
+    }
+    if (!launched) {
+      lc.run();
+    }
+    return new EmbeddedAppHandleImpl(lc);
+  }
+
+  @Override
+  public void shutdownApp(EmbeddedAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException
+  {
+    if (shutdownMode != ShutdownMode.KILL) {
+      app.controller.shutdown();
+    } else {
+      throw new UnsupportedOperationException("Kill not supported");
+    }
+  }
+
+  @Override
+  public DAG prepareDAG(StreamingApplication app, Configuration conf) throws Exception
+  {
+    if (app == null && conf == null) {
+      throw new IllegalArgumentException("Require app or configuration to populate logical plan.");
+    }
+    if (conf == null) {
+      conf = new Configuration(false);
+    }
+    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
+    String appName = app != null ? app.getClass().getName() : "unknown";
+    lpc.prepareDAG(lp, app, appName);
+    return lp;
+  }
+
+  @Override
+  public Controller getController()
+  {
+    try {
+      addLibraryJarsToClasspath(lp);
+      return new StramLocalCluster(lp);
+    } catch (Exception e) {
+      throw new RuntimeException("Error creating local cluster", e);
+    }
+  }
+
+  private void addLibraryJarsToClasspath(LogicalPlan lp) throws MalformedURLException
+  {
+    String libJarsCsv = lp.getAttributes().get(Context.DAGContext.LIBRARY_JARS);
+
+    if (libJarsCsv != null && libJarsCsv.length() != 0) {
+      String[] split = libJarsCsv.split(StramClient.LIB_JARS_SEP);
+      if (split.length != 0) {
+        URL[] urlList = new URL[split.length];
+        for (int i = 0; i < split.length; i++) {
+          File file = new File(split[i]);
+          urlList[i] = file.toURI().toURL();
+        }
+
+        // Set class loader.
+        ClassLoader prevCl = Thread.currentThread().getContextClassLoader();
+        URLClassLoader cl = URLClassLoader.newInstance(urlList, prevCl);
+        Thread.currentThread().setContextClassLoader(cl);
+      }
+    }
+
+  }
+
+  /**
+   *
+   */
+  public static class EmbeddedAppHandleImpl implements EmbeddedAppLauncher.EmbeddedAppHandle
+  {
+    Controller controller;
+
+    public EmbeddedAppHandleImpl(Controller controller)
+    {
+      this.controller = controller;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
new file mode 100644
index 0000000..4f5c8c8
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/YarnAppLauncherImpl.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.apex.api.YarnAppLauncher;
+import org.apache.apex.engine.util.StreamingAppFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.google.common.base.Throwables;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.stram.StramUtils;
+import com.datatorrent.stram.client.StramAppLauncher;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
+
+/**
+ * An implementation of {@link YarnAppLauncher} to launch applications on Hadoop YARN.
+ */
+public class YarnAppLauncherImpl extends YarnAppLauncher<YarnAppLauncherImpl.YarnAppHandleImpl>
+{
+
+  private static final Map<Attribute<?>, String> propMapping = new HashMap<>();
+
+  static {
+    propMapping.put(YarnAppLauncher.LIB_JARS, StramAppLauncher.LIBJARS_CONF_KEY_NAME);
+    propMapping.put(YarnAppLauncher.ORIGINAL_APP_ID, StramAppLauncher.ORIGINAL_APP_ID);
+    propMapping.put(YarnAppLauncher.QUEUE_NAME, StramAppLauncher.QUEUE_NAME);
+  }
+
+  public YarnAppHandleImpl launchApp(final StreamingApplication app, Configuration conf, Attribute.AttributeMap launchParameters) throws LauncherException
+  {
+    if (launchParameters != null) {
+      for (Map.Entry<Attribute<?>, Object> entry : launchParameters.entrySet()) {
+        String property = propMapping.get(entry.getKey());
+        if (property != null) {
+          setConfiguration(conf, property, entry.getValue());
+        }
+      }
+    }
+    try {
+      String name = app.getClass().getName();
+      StramAppLauncher appLauncher = new StramAppLauncher(name, conf);
+      appLauncher.loadDependencies();
+      StreamingAppFactory appFactory = new StreamingAppFactory(name, app.getClass())
+      {
+        @Override
+        public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
+        {
+          return super.createApp(app, planConfig);
+        }
+      };
+      ApplicationId appId = appLauncher.launchApp(appFactory);
+      return new YarnAppHandleImpl(appId);
+    } catch (Exception ex) {
+      throw new LauncherException(ex);
+    }
+  }
+
+  @Override
+  public void shutdownApp(YarnAppHandleImpl app, ShutdownMode shutdownMode) throws LauncherException
+  {
+    if (shutdownMode == ShutdownMode.KILL) {
+      YarnClient yarnClient = YarnClient.createYarnClient();
+      try {
+        String appId = app.getApplicationId();
+        ApplicationId applicationId = null;
+        List<ApplicationReport> applications = StramUtils.getApexApplicationList(yarnClient);
+        for (ApplicationReport application : applications) {
+          if (application.getApplicationId().toString().equals(appId)) {
+            applicationId = application.getApplicationId();
+            break;
+          }
+        }
+        if (applicationId == null) {
+          throw new LauncherException("Application " + appId + " not found");
+        }
+        yarnClient.killApplication(applicationId);
+      } catch (YarnException | IOException e) {
+        throw Throwables.propagate(e);
+      }
+    } else {
+      throw new UnsupportedOperationException("Orderly shutdown not supported, try kill instead");
+    }
+  }
+
+  private void setConfiguration(Configuration conf, String property, Object value)
+  {
+    if (value instanceof Integer) {
+      conf.setInt(property, (Integer)value);
+    } else if (value instanceof Boolean) {
+      conf.setBoolean(property, (Boolean)value);
+    } else if (value instanceof Long) {
+      conf.setLong(property, (Long)value);
+    } else if (value instanceof Float) {
+      conf.setFloat(property, (Float)value);
+    } else if (value instanceof Double) {
+      conf.setDouble(property, (Double)value);
+    } else {
+      conf.set(property, value.toString());
+    }
+  }
+
+  /**
+   *
+   */
+  public static class YarnAppHandleImpl implements YarnAppLauncher.YarnAppHandle
+  {
+    ApplicationId appId;
+
+    public YarnAppHandleImpl(ApplicationId appId)
+    {
+      this.appId = appId;
+    }
+
+    @Override
+    public String getApplicationId()
+    {
+      return appId.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java b/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java
new file mode 100644
index 0000000..6c943fc
--- /dev/null
+++ b/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.engine.util;
+
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.stram.client.StramAppLauncher;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
+
+/**
+ *
+ */
+public abstract class StreamingAppFactory implements StramAppLauncher.AppFactory
+{
+  private Class<?> appClazz;
+  private String name;
+
+  public StreamingAppFactory(String name, Class<?> appClazz)
+  {
+    this.name = name;
+    this.appClazz = appClazz;
+  }
+
+  public abstract LogicalPlan createApp(LogicalPlanConfiguration planConfig);
+
+  protected LogicalPlan createApp(StreamingApplication app, LogicalPlanConfiguration planConfig)
+  {
+    LogicalPlan dag = new LogicalPlan();
+    planConfig.prepareDAG(dag, app, getName());
+    return dag;
+  }
+
+  public String getName()
+  {
+    return name;
+  }
+
+  public String getDisplayName()
+  {
+    ApplicationAnnotation an = appClazz.getAnnotation(ApplicationAnnotation.class);
+    if (an != null) {
+      return an.name();
+    } else {
+      return name;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/resources/META-INF/services/com.datatorrent.api.LocalMode
----------------------------------------------------------------------
diff --git a/engine/src/main/resources/META-INF/services/com.datatorrent.api.LocalMode b/engine/src/main/resources/META-INF/services/com.datatorrent.api.LocalMode
index 712a323..cb9119f 100644
--- a/engine/src/main/resources/META-INF/services/com.datatorrent.api.LocalMode
+++ b/engine/src/main/resources/META-INF/services/com.datatorrent.api.LocalMode
@@ -1 +1 @@
-com.datatorrent.stram.LocalModeImpl
\ No newline at end of file
+org.apache.apex.engine.EmbeddedAppLauncherImpl

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/resources/META-INF/services/org.apache.apex.api.EmbeddedAppLauncher
----------------------------------------------------------------------
diff --git a/engine/src/main/resources/META-INF/services/org.apache.apex.api.EmbeddedAppLauncher b/engine/src/main/resources/META-INF/services/org.apache.apex.api.EmbeddedAppLauncher
new file mode 100644
index 0000000..cb9119f
--- /dev/null
+++ b/engine/src/main/resources/META-INF/services/org.apache.apex.api.EmbeddedAppLauncher
@@ -0,0 +1 @@
+org.apache.apex.engine.EmbeddedAppLauncherImpl

http://git-wip-us.apache.org/repos/asf/apex-core/blob/7ad1d75d/engine/src/main/resources/META-INF/services/org.apache.apex.api.YarnAppLauncher
----------------------------------------------------------------------
diff --git a/engine/src/main/resources/META-INF/services/org.apache.apex.api.YarnAppLauncher b/engine/src/main/resources/META-INF/services/org.apache.apex.api.YarnAppLauncher
new file mode 100644
index 0000000..45989a4
--- /dev/null
+++ b/engine/src/main/resources/META-INF/services/org.apache.apex.api.YarnAppLauncher
@@ -0,0 +1 @@
+org.apache.apex.engine.YarnAppLauncherImpl


[2/2] apex-core git commit: Merge commit 'refs/pull/351/head' of https://github.com/apache/apex-core

Posted by th...@apache.org.
Merge commit 'refs/pull/351/head' of https://github.com/apache/apex-core


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/cf814184
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/cf814184
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/cf814184

Branch: refs/heads/master
Commit: cf81418464dc2fad2b75b68a9759c799ffff8b48
Parents: 5fb9d04 7ad1d75
Author: Thomas Weise <th...@apache.org>
Authored: Mon Nov 21 13:19:11 2016 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Mon Nov 21 13:19:11 2016 -0800

----------------------------------------------------------------------
 .idea/codeStyleSettings.xml                     |   2 +-
 .../java/com/datatorrent/api/LocalMode.java     |  41 +---
 .../apache/apex/api/EmbeddedAppLauncher.java    |  94 +++++++++
 .../main/java/org/apache/apex/api/Launcher.java | 192 +++++++++++++++++++
 .../org/apache/apex/api/YarnAppLauncher.java    |  88 +++++++++
 .../com/datatorrent/stram/LocalModeImpl.java    | 103 ----------
 .../java/com/datatorrent/stram/StramUtils.java  |  23 ++-
 .../java/com/datatorrent/stram/cli/ApexCli.java |   5 +-
 .../stram/client/StramAppLauncher.java          |  28 +--
 .../apex/engine/EmbeddedAppLauncherImpl.java    | 173 +++++++++++++++++
 .../apache/apex/engine/YarnAppLauncherImpl.java | 148 ++++++++++++++
 .../apex/engine/util/StreamingAppFactory.java   |  64 +++++++
 .../services/com.datatorrent.api.LocalMode      |   2 +-
 .../org.apache.apex.api.EmbeddedAppLauncher     |   1 +
 .../org.apache.apex.api.YarnAppLauncher         |   1 +
 15 files changed, 796 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/cf814184/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
----------------------------------------------------------------------