You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/12/10 19:01:34 UTC

[1/2] incubator-beam git commit: BEAM-830 Support launch on YARN cluster.

Repository: incubator-beam
Updated Branches:
  refs/heads/master 58000b397 -> a834fb0eb


BEAM-830 Support launch on YARN cluster.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b6b2e202
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b6b2e202
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b6b2e202

Branch: refs/heads/master
Commit: b6b2e202ae8c5d3d1c081a1e24033380d7f55593
Parents: 1fab152
Author: Thomas Weise <th...@apache.org>
Authored: Thu Nov 24 18:36:11 2016 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Fri Dec 9 23:46:40 2016 -0800

----------------------------------------------------------------------
 runners/apex/pom.xml                            |  52 ++-
 .../apache/beam/runners/apex/ApexRunner.java    |  48 ++-
 .../beam/runners/apex/ApexRunnerResult.java     |  50 +--
 .../beam/runners/apex/ApexYarnLauncher.java     | 395 +++++++++++++++++++
 .../beam/runners/apex/ApexYarnLauncherTest.java | 138 +++++++
 5 files changed, 631 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6b2e202/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index b604237..9f1455a 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -35,7 +35,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <apex.core.version>3.5.0-SNAPSHOT</apex.core.version>
+    <apex.core.version>3.5.0</apex.core.version>
     <apex.malhar.version>3.4.0</apex.malhar.version>
     <skipIntegrationTests>true</skipIntegrationTests>
     <!-- memory limit for embedded cluster -->
@@ -218,22 +218,64 @@
             </goals>
             <configuration>
               <ignoredUsedUndeclaredDependencies>
-                <ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:3.5.0-SNAPSHOT</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:3.5.0</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::3.1</ignoredUsedUndeclaredDependency>
-                <ignoredUsedUndeclaredDependency>commons-io:commons-io:jar:</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>commons-io:commons-io:jar:2.4</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>com.esotericsoftware.kryo:kryo::2.24.0</ignoredUsedUndeclaredDependency>
-                <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::1.3.0</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency>
-                <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:</ignoredUsedUndeclaredDependency>
+                <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:2.6.0</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>joda-time:joda-time:jar:2.4</ignoredUsedUndeclaredDependency>
                 <ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:19.0</ignoredUsedUndeclaredDependency>
               </ignoredUsedUndeclaredDependencies>
             </configuration>
           </execution>
+          <execution>
+            <!--  used in ApexYarnLauncher to filter compile time Hadoop dependencies -->
+            <id>dependency-tree</id>
+            <phase>generate-test-resources</phase>
+            <goals>
+              <goal>tree</goal>
+            </goals>
+            <configuration>
+              <outputFile>${project.build.directory}/classes/org/apache/beam/runners/apex/dependency-tree</outputFile>
+            </configuration>
+          </execution>
         </executions>
       </plugin>
 
     </plugins>
+
+    <pluginManagement>
+      <plugins>
+        <!-- Eclipse has a problem with dependency:tree when it is not in package phase -->
+        <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+        <plugin>
+          <groupId>org.eclipse.m2e</groupId>
+          <artifactId>lifecycle-mapping</artifactId>
+          <version>1.0.0</version>
+          <configuration>
+            <lifecycleMappingMetadata>
+              <pluginExecutions>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-dependency-plugin</artifactId>
+                    <versionRange>[2.10,)</versionRange>
+                    <goals>
+                      <goal>tree</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+              </pluginExecutions>
+            </lifecycleMappingMetadata>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
   </build>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6b2e202/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 9507fb9..899efa3 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -17,19 +17,22 @@
  */
 package org.apache.beam.runners.apex;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
+import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.StreamingApplication;
 import com.google.common.base.Throwables;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.LaunchMode;
 import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
 import org.apache.beam.runners.core.AssignWindows;
 import org.apache.beam.sdk.Pipeline;
@@ -122,33 +125,44 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
   public ApexRunnerResult run(final Pipeline pipeline) {
 
     final ApexPipelineTranslator translator = new ApexPipelineTranslator(options);
+    final AtomicReference<DAG> apexDAG = new AtomicReference<>();
 
     StreamingApplication apexApp = new StreamingApplication() {
       @Override
       public void populateDAG(DAG dag, Configuration conf) {
+        apexDAG.set(dag);
         dag.setAttribute(DAGContext.APPLICATION_NAME, options.getApplicationName());
         translator.translate(pipeline, dag);
       }
     };
 
-    checkArgument(options.isEmbeddedExecution(),
-        "only embedded execution is supported at this time");
-    LocalMode lma = LocalMode.newInstance();
-    Configuration conf = new Configuration(false);
-    try {
-      lma.prepareDAG(apexApp, conf);
-      LocalMode.Controller lc = lma.getController();
+    if (options.isEmbeddedExecution()) {
+      Launcher<AppHandle> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+      Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true);
       if (options.isEmbeddedExecutionDebugMode()) {
         // turns off timeout checking for operator progress
-        lc.setHeartbeatMonitoringEnabled(false);
+        launchAttributes.put(EmbeddedAppLauncher.HEARTBEAT_MONITORING, false);
+      }
+      Configuration conf = new Configuration(false);
+      try {
+        ApexRunner.ASSERTION_ERROR.set(null);
+        AppHandle apexAppResult = launcher.launchApp(apexApp, conf, launchAttributes);
+        return new ApexRunnerResult(apexDAG.get(), apexAppResult);
+      } catch (Exception e) {
+        Throwables.propagateIfPossible(e);
+        throw new RuntimeException(e);
+      }
+    } else {
+      try {
+        ApexYarnLauncher yarnLauncher = new ApexYarnLauncher();
+        AppHandle apexAppResult = yarnLauncher.launchApp(apexApp);
+        return new ApexRunnerResult(apexDAG.get(), apexAppResult);
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to launch the application on YARN.", e);
       }
-      ApexRunner.ASSERTION_ERROR.set(null);
-      lc.runAsync();
-      return new ApexRunnerResult(lma.getDAG(), lc);
-    } catch (Exception e) {
-      Throwables.propagateIfPossible(e);
-      throw new RuntimeException(e);
     }
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6b2e202/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index 18b50bc..8548194 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -18,11 +18,11 @@
 package org.apache.beam.runners.apex;
 
 import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.ShutdownMode;
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
@@ -36,12 +36,12 @@ import org.joda.time.Duration;
  */
 public class ApexRunnerResult implements PipelineResult {
   private final DAG apexDAG;
-  private final LocalMode.Controller ctrl;
+  private final AppHandle apexApp;
   private State state = State.UNKNOWN;
 
-  public ApexRunnerResult(DAG dag, LocalMode.Controller ctrl) {
+  public ApexRunnerResult(DAG dag, AppHandle apexApp) {
     this.apexDAG = dag;
-    this.ctrl = ctrl;
+    this.apexApp = apexApp;
   }
 
   @Override
@@ -57,19 +57,31 @@ public class ApexRunnerResult implements PipelineResult {
 
   @Override
   public State cancel() throws IOException {
-    ctrl.shutdown();
+    apexApp.shutdown(ShutdownMode.KILL);
     state = State.CANCELLED;
     return state;
   }
 
   @Override
   public State waitUntilFinish(Duration duration) {
-    return ApexRunnerResult.waitUntilFinished(ctrl, duration);
+    long timeout = (duration == null || duration.getMillis() < 1) ? Long.MAX_VALUE
+        : System.currentTimeMillis() + duration.getMillis();
+    try {
+      while (!apexApp.isFinished() && System.currentTimeMillis() < timeout) {
+        if (ApexRunner.ASSERTION_ERROR.get() != null) {
+          throw ApexRunner.ASSERTION_ERROR.get();
+        }
+        Thread.sleep(500);
+      }
+      return apexApp.isFinished() ? State.DONE : null;
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   public State waitUntilFinish() {
-    return ApexRunnerResult.waitUntilFinished(ctrl, null);
+    return waitUntilFinish(null);
   }
 
   @Override
@@ -85,26 +97,4 @@ public class ApexRunnerResult implements PipelineResult {
     return apexDAG;
   }
 
-  public static State waitUntilFinished(LocalMode.Controller ctrl, Duration duration) {
-    // we need to rely on internal field for now
-    // Apex should make it available through API in upcoming release.
-    long timeout = (duration == null || duration.getMillis() < 1) ? Long.MAX_VALUE
-        : System.currentTimeMillis() + duration.getMillis();
-    Field appDoneField;
-    try {
-      appDoneField = ctrl.getClass().getDeclaredField("appDone");
-      appDoneField.setAccessible(true);
-      while (!appDoneField.getBoolean(ctrl) && System.currentTimeMillis() < timeout) {
-        if (ApexRunner.ASSERTION_ERROR.get() != null) {
-          throw ApexRunner.ASSERTION_ERROR.get();
-        }
-        Thread.sleep(500);
-      }
-      return appDoneField.getBoolean(ctrl) ? State.DONE : null;
-    } catch (NoSuchFieldException | SecurityException | IllegalArgumentException
-        | IllegalAccessException | InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6b2e202/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
new file mode 100644
index 0000000..0ae4cc7
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Attribute.AttributeMap;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.lang.reflect.AccessibleObject;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.LaunchMode;
+import org.apache.apex.api.Launcher.LauncherException;
+import org.apache.apex.api.Launcher.ShutdownMode;
+import org.apache.apex.api.YarnAppLauncher;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Proxy to launch the YARN application through the hadoop script to run in the
+ * pre-configured environment (class path, configuration, native libraries etc.).
+ *
+ * <p>The proxy takes the DAG and communicates with the Hadoop services to launch
+ * it on the cluster.
+ */
+public class ApexYarnLauncher {
+  private static final Logger LOG = LoggerFactory.getLogger(ApexYarnLauncher.class);
+
+  public AppHandle launchApp(StreamingApplication app) throws IOException {
+
+    List<File> jarsToShip = getYarnDeployDependencies();
+    StringBuilder classpath = new StringBuilder();
+    for (File path : jarsToShip) {
+      if (path.isDirectory()) {
+        File tmpJar = File.createTempFile("beam-runners-apex-", ".jar");
+        createJar(path, tmpJar);
+        tmpJar.deleteOnExit();
+        path = tmpJar;
+      }
+      if (classpath.length() != 0) {
+        classpath.append(':');
+      }
+      classpath.append(path.getAbsolutePath());
+    }
+
+    EmbeddedAppLauncher<?> embeddedLauncher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+    DAG dag = embeddedLauncher.getDAG();
+    app.populateDAG(dag, new Configuration(false));
+
+    Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    launchAttributes.put(YarnAppLauncher.LIB_JARS, classpath.toString().replace(':', ','));
+    LaunchParams lp = new LaunchParams(dag, launchAttributes);
+    lp.cmd = "hadoop " + ApexYarnLauncher.class.getName();
+    HashMap<String, String> env = new HashMap<>();
+    env.put("HADOOP_USER_CLASSPATH_FIRST", "1");
+    env.put("HADOOP_CLASSPATH", classpath.toString());
+    lp.env = env;
+    return launchApp(lp);
+  }
+
+  protected AppHandle launchApp(LaunchParams params) throws IOException {
+    File tmpFile = File.createTempFile("beam-runner-apex", "params");
+    tmpFile.deleteOnExit();
+    try (FileOutputStream fos = new FileOutputStream(tmpFile)) {
+      SerializationUtils.serialize(params, fos);
+    }
+    if (params.getCmd() == null) {
+      ApexYarnLauncher.main(new String[] {tmpFile.getAbsolutePath()});
+    } else {
+      String cmd = params.getCmd() + " " + tmpFile.getAbsolutePath();
+      ByteArrayOutputStream consoleOutput = new ByteArrayOutputStream();
+      LOG.info("Executing: {} with {}", cmd, params.getEnv());
+
+      ProcessBuilder pb = new ProcessBuilder("bash", "-c", cmd);
+      Map<String, String> env = pb.environment();
+      env.putAll(params.getEnv());
+      Process p = pb.start();
+      ProcessWatcher pw = new ProcessWatcher(p);
+      InputStream output = p.getInputStream();
+      InputStream error = p.getErrorStream();
+      while (!pw.isFinished()) {
+        IOUtils.copy(output, consoleOutput);
+        IOUtils.copy(error, consoleOutput);
+      }
+      if (pw.rc != 0) {
+        String msg = "The Beam Apex runner in non-embedded mode requires the Hadoop client"
+            + " to be installed on the machine from which you launch the job"
+            + " and the 'hadoop' script in $PATH";
+        LOG.error(msg);
+        throw new RuntimeException("Failed to run: " + cmd + " (exit code " + pw.rc + ")" + "\n"
+            + consoleOutput.toString());
+      }
+    }
+    return new AppHandle() {
+      @Override
+      public boolean isFinished() {
+        // TODO (future PR): interaction with child process
+        LOG.warn("YARN application runs asynchronously and status check not implemented.");
+        return true;
+      }
+      @Override
+      public void shutdown(ShutdownMode arg0) throws LauncherException {
+        // TODO (future PR): interaction with child process
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  /**
+   * From the current classpath, find the jar files that need to be deployed
+   * with the application to run on YARN. Hadoop dependencies are provided
+   * through the Hadoop installation and the application should not bundle them
+   * to avoid conflicts. This is done by removing the Hadoop compile
+   * dependencies (transitively) by parsing the Maven dependency tree.
+   *
+   * @return list of jar files to ship
+   * @throws IOException when dependency information cannot be read
+   */
+  public static List<File> getYarnDeployDependencies() throws IOException {
+    InputStream dependencyTree = ApexRunner.class.getResourceAsStream("dependency-tree");
+    BufferedReader br = new BufferedReader(new InputStreamReader(dependencyTree));
+    String line = null;
+    List<String> excludes = new ArrayList<>();
+    int excludeLevel = Integer.MAX_VALUE;
+    while ((line = br.readLine()) != null) {
+      for (int i = 0; i < line.length(); i++) {
+        char c = line.charAt(i);
+        if (Character.isLetter(c)) {
+          if (i > excludeLevel) {
+            excludes.add(line.substring(i));
+          } else {
+            if (line.substring(i).startsWith("org.apache.hadoop")) {
+              excludeLevel = i;
+              excludes.add(line.substring(i));
+            } else {
+              excludeLevel = Integer.MAX_VALUE;
+            }
+          }
+          break;
+        }
+      }
+    }
+    br.close();
+
+    Set<String> excludeJarFileNames = Sets.newHashSet();
+    for (String exclude : excludes) {
+      String[] mvnc = exclude.split(":");
+      String fileName = mvnc[1] + "-";
+      if (mvnc.length == 6) {
+        fileName += mvnc[4] + "-" + mvnc[3]; // with classifier
+      } else {
+        fileName += mvnc[3];
+      }
+      fileName += ".jar";
+      excludeJarFileNames.add(fileName);
+    }
+
+    ClassLoader classLoader = ApexYarnLauncher.class.getClassLoader();
+    URL[] urls = ((URLClassLoader) classLoader).getURLs();
+    List<File> dependencyJars = new ArrayList<>();
+    for (int i = 0; i < urls.length; i++) {
+      File f = new File(urls[i].getFile());
+      // dependencies can also be directories in the build reactor,
+      // the Apex client will automatically create jar files for those.
+      if (f.exists() && !excludeJarFileNames.contains(f.getName())) {
+          dependencyJars.add(f);
+      }
+    }
+    return dependencyJars;
+  }
+
+  /**
+   * Create a jar file from the given directory.
+   * @param dir source directory
+   * @param jarFile jar file name
+   * @throws IOException when file cannot be created
+   */
+  public static void createJar(File dir, File jarFile) throws IOException {
+
+    final Map<String, ?> env = Collections.singletonMap("create", "true");
+    if (jarFile.exists() && !jarFile.delete()) {
+      throw new RuntimeException("Failed to remove " + jarFile);
+    }
+    URI uri = URI.create("jar:" + jarFile.toURI());
+    try (final FileSystem zipfs = FileSystems.newFileSystem(uri, env);) {
+
+      File manifestFile = new File(dir, JarFile.MANIFEST_NAME);
+      Files.createDirectory(zipfs.getPath("META-INF"));
+      final OutputStream out = Files.newOutputStream(zipfs.getPath(JarFile.MANIFEST_NAME));
+      if (!manifestFile.exists()) {
+        new Manifest().write(out);
+      } else {
+        FileUtils.copyFile(manifestFile, out);
+      }
+      out.close();
+
+      final java.nio.file.Path root = dir.toPath();
+      Files.walkFileTree(root, new java.nio.file.SimpleFileVisitor<Path>() {
+        String relativePath;
+
+        @Override
+        public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
+            throws IOException {
+          relativePath = root.relativize(dir).toString();
+          if (!relativePath.isEmpty()) {
+            if (!relativePath.endsWith("/")) {
+              relativePath += "/";
+            }
+            final Path dstDir = zipfs.getPath(relativePath);
+            Files.createDirectory(dstDir);
+          }
+          return super.preVisitDirectory(dir, attrs);
+        }
+
+        @Override
+        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+          String name = relativePath + file.getFileName();
+          if (!JarFile.MANIFEST_NAME.equals(name)) {
+            final OutputStream out = Files.newOutputStream(zipfs.getPath(name));
+            FileUtils.copyFile(file.toFile(), out);
+            out.close();
+          }
+          return super.visitFile(file, attrs);
+        }
+
+        @Override
+        public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+          relativePath = root.relativize(dir.getParent()).toString();
+          if (!relativePath.isEmpty() && !relativePath.endsWith("/")) {
+            relativePath += "/";
+          }
+          return super.postVisitDirectory(dir, exc);
+        }
+      });
+    }
+  }
+
+  /**
+   * The main method expects the serialized DAG and will launch the YARN application.
+   * @param args location of launch parameters
+   * @throws IOException when parameters cannot be read
+   */
+  public static void main(String[] args) throws IOException {
+    checkArgument(args.length == 1, "exactly one argument expected");
+    File file = new File(args[0]);
+    checkArgument(file.exists() && file.isFile(), "invalid file path %s", file);
+    final LaunchParams params = (LaunchParams) SerializationUtils.deserialize(
+        new FileInputStream(file));
+    StreamingApplication apexApp = new StreamingApplication() {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf) {
+        copyShallow(params.dag, dag);
+      }
+    };
+    Configuration conf = new Configuration(); // configuration from Hadoop client
+    AppHandle appHandle = params.getApexLauncher().launchApp(apexApp, conf,
+        params.launchAttributes);
+    if (appHandle == null) {
+      throw new AssertionError("Launch returns null handle.");
+    }
+    // TODO (future PR)
+    // At this point the application is running, but this process should remain active to
+    // allow the parent to implement the runner result.
+  }
+
+  /**
+   * Launch parameters that will be serialized and passed to the child process.
+   */
+  @VisibleForTesting
+  protected static class LaunchParams implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private final DAG dag;
+    private final Attribute.AttributeMap launchAttributes;
+    private HashMap<String, String> env;
+    private String cmd;
+
+    protected LaunchParams(DAG dag, AttributeMap launchAttributes) {
+      this.dag = dag;
+      this.launchAttributes = launchAttributes;
+    }
+
+    protected Launcher<?> getApexLauncher() {
+      return Launcher.getLauncher(LaunchMode.YARN);
+    }
+
+    protected String getCmd() {
+      return cmd;
+    }
+
+    protected Map<String, String> getEnv() {
+      return env;
+    }
+
+  }
+
+  private static void copyShallow(DAG from, DAG to) {
+    checkArgument(from.getClass() == to.getClass(), "must be same class %s %s",
+        from.getClass(), to.getClass());
+    Field[] fields = from.getClass().getDeclaredFields();
+    AccessibleObject.setAccessible(fields, true);
+    for (int i = 0; i < fields.length; i++) {
+      Field field = fields[i];
+      if (!java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
+        try {
+          field.set(to,  field.get(from));
+        } catch (IllegalArgumentException | IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Starts a command and waits for it to complete.
+   */
+  public static class ProcessWatcher implements Runnable {
+    private final Process p;
+    private volatile boolean finished = false;
+    private volatile int rc;
+
+    public ProcessWatcher(Process p) {
+      this.p = p;
+      new Thread(this).start();
+    }
+
+    public boolean isFinished() {
+      return finished;
+    }
+
+    @Override
+    public void run() {
+      try {
+        rc = p.waitFor();
+      } catch (Exception e) {
+        // ignore
+      }
+      finished = true;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6b2e202/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
new file mode 100644
index 0000000..986818b
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Attribute.AttributeMap;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.jar.JarFile;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.LaunchMode;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for dependency resolution for pipeline execution on YARN.
+ */
+public class ApexYarnLauncherTest {
+
+  @Test
+  public void testGetYarnDeployDependencies() throws Exception {
+    List<File> deps = ApexYarnLauncher.getYarnDeployDependencies();
+    String depsToString = deps.toString();
+    // the beam dependencies are not present as jar when running within the Maven build reactor
+    //assertThat(depsToString, containsString("beam-runners-core-"));
+    //assertThat(depsToString, containsString("beam-runners-apex-"));
+    assertThat(depsToString, containsString("apex-common-"));
+    assertThat(depsToString, not(containsString("hadoop-")));
+    assertThat(depsToString, not(containsString("zookeeper-")));
+  }
+
+  @Test
+  public void testProxyLauncher() throws Exception {
+    // use the embedded launcher to build the DAG only
+    EmbeddedAppLauncher<?> embeddedLauncher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+
+    StreamingApplication app = new StreamingApplication() {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf) {
+        dag.setAttribute(DAGContext.APPLICATION_NAME, "DummyApp");
+      }
+    };
+
+    Configuration conf = new Configuration(false);
+    DAG dag = embeddedLauncher.prepareDAG(app, conf);
+    Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    ApexYarnLauncher launcher = new ApexYarnLauncher();
+    launcher.launchApp(new MockApexYarnLauncherParams(dag, launchAttributes));
+  }
+
+  private static class MockApexYarnLauncherParams extends  ApexYarnLauncher.LaunchParams {
+    private static final long serialVersionUID = 1L;
+
+    public MockApexYarnLauncherParams(DAG dag, AttributeMap launchAttributes) {
+      super(dag, launchAttributes);
+    }
+
+    @Override
+    protected Launcher<?> getApexLauncher() {
+      return new Launcher<AppHandle>() {
+        @Override
+        public AppHandle launchApp(StreamingApplication application,
+            Configuration configuration, AttributeMap launchParameters)
+            throws org.apache.apex.api.Launcher.LauncherException {
+          EmbeddedAppLauncher<?> embeddedLauncher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+          DAG dag = embeddedLauncher.getDAG();
+          application.populateDAG(dag, new Configuration(false));
+          String appName = dag.getValue(DAGContext.APPLICATION_NAME);
+          Assert.assertEquals("DummyApp", appName);
+          return new AppHandle() {
+            @Override
+            public boolean isFinished() {
+              return true;
+            }
+            @Override
+            public void shutdown(org.apache.apex.api.Launcher.ShutdownMode arg0) {
+            }
+          };
+        }
+      };
+    }
+
+  }
+
+  @Test
+  public void testCreateJar() throws Exception {
+    File baseDir = new File("./target/testCreateJar");
+    File srcDir = new File(baseDir, "src");
+    String file1 = "file1";
+    FileUtils.forceMkdir(srcDir);
+    FileUtils.write(new File(srcDir, file1), "file1");
+
+    File jarFile = new File(baseDir, "test.jar");
+    ApexYarnLauncher.createJar(srcDir, jarFile);
+    Assert.assertTrue("exists: " + jarFile, jarFile.exists());
+    URI uri = URI.create("jar:" + jarFile.toURI());
+    final Map<String, ?> env = Collections.singletonMap("create", "true");
+    try (final FileSystem zipfs = FileSystems.newFileSystem(uri, env);) {
+      Assert.assertTrue("manifest", Files.isRegularFile(zipfs.getPath(JarFile.MANIFEST_NAME)));
+      Assert.assertTrue("file1", Files.isRegularFile(zipfs.getPath(file1)));
+    }
+
+  }
+}


[2/2] incubator-beam git commit: This closes #1517

Posted by th...@apache.org.
This closes #1517


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a834fb0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a834fb0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a834fb0e

Branch: refs/heads/master
Commit: a834fb0eb555ed121550dbe883250207baacb841
Parents: 58000b3 b6b2e20
Author: Thomas Weise <th...@apache.org>
Authored: Sat Dec 10 11:00:09 2016 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Sat Dec 10 11:00:58 2016 -0800

----------------------------------------------------------------------
 runners/apex/pom.xml                            |  52 ++-
 .../apache/beam/runners/apex/ApexRunner.java    |  48 ++-
 .../beam/runners/apex/ApexRunnerResult.java     |  50 +--
 .../beam/runners/apex/ApexYarnLauncher.java     | 395 +++++++++++++++++++
 .../beam/runners/apex/ApexYarnLauncherTest.java | 138 +++++++
 5 files changed, 631 insertions(+), 52 deletions(-)
----------------------------------------------------------------------