You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/12/10 19:01:34 UTC
[1/2] incubator-beam git commit: BEAM-830 Support launch on YARN
cluster.
Repository: incubator-beam
Updated Branches:
refs/heads/master 58000b397 -> a834fb0eb
BEAM-830 Support launch on YARN cluster.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b6b2e202
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b6b2e202
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b6b2e202
Branch: refs/heads/master
Commit: b6b2e202ae8c5d3d1c081a1e24033380d7f55593
Parents: 1fab152
Author: Thomas Weise <th...@apache.org>
Authored: Thu Nov 24 18:36:11 2016 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Fri Dec 9 23:46:40 2016 -0800
----------------------------------------------------------------------
runners/apex/pom.xml | 52 ++-
.../apache/beam/runners/apex/ApexRunner.java | 48 ++-
.../beam/runners/apex/ApexRunnerResult.java | 50 +--
.../beam/runners/apex/ApexYarnLauncher.java | 395 +++++++++++++++++++
.../beam/runners/apex/ApexYarnLauncherTest.java | 138 +++++++
5 files changed, 631 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6b2e202/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index b604237..9f1455a 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -35,7 +35,7 @@
<packaging>jar</packaging>
<properties>
- <apex.core.version>3.5.0-SNAPSHOT</apex.core.version>
+ <apex.core.version>3.5.0</apex.core.version>
<apex.malhar.version>3.4.0</apex.malhar.version>
<skipIntegrationTests>true</skipIntegrationTests>
<!-- memory limit for embedded cluster -->
@@ -218,22 +218,64 @@
</goals>
<configuration>
<ignoredUsedUndeclaredDependencies>
- <ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:3.5.0-SNAPSHOT</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:3.5.0</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::3.1</ignoredUsedUndeclaredDependency>
- <ignoredUsedUndeclaredDependency>commons-io:commons-io:jar:</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>commons-io:commons-io:jar:2.4</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>com.esotericsoftware.kryo:kryo::2.24.0</ignoredUsedUndeclaredDependency>
- <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::1.3.0</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency>
- <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:</ignoredUsedUndeclaredDependency>
+ <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:2.6.0</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>joda-time:joda-time:jar:2.4</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:19.0</ignoredUsedUndeclaredDependency>
</ignoredUsedUndeclaredDependencies>
</configuration>
</execution>
+ <execution>
+ <!-- used in ApexYarnLauncher to filter compile time Hadoop dependencies -->
+ <id>dependency-tree</id>
+ <phase>generate-test-resources</phase>
+ <goals>
+ <goal>tree</goal>
+ </goals>
+ <configuration>
+ <outputFile>${project.build.directory}/classes/org/apache/beam/runners/apex/dependency-tree</outputFile>
+ </configuration>
+ </execution>
</executions>
</plugin>
</plugins>
+
+ <pluginManagement>
+ <plugins>
+ <!-- Eclipse has a problem with dependency:tree when it is not in package phase -->
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <versionRange>[2.10,)</versionRange>
+ <goals>
+ <goal>tree</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6b2e202/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 9507fb9..899efa3 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -17,19 +17,22 @@
*/
package org.apache.beam.runners.apex;
-import static com.google.common.base.Preconditions.checkArgument;
-
+import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.google.common.base.Throwables;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.LaunchMode;
import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
import org.apache.beam.runners.core.AssignWindows;
import org.apache.beam.sdk.Pipeline;
@@ -122,33 +125,44 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
public ApexRunnerResult run(final Pipeline pipeline) {
final ApexPipelineTranslator translator = new ApexPipelineTranslator(options);
+ final AtomicReference<DAG> apexDAG = new AtomicReference<>();
StreamingApplication apexApp = new StreamingApplication() {
@Override
public void populateDAG(DAG dag, Configuration conf) {
+ apexDAG.set(dag);
dag.setAttribute(DAGContext.APPLICATION_NAME, options.getApplicationName());
translator.translate(pipeline, dag);
}
};
- checkArgument(options.isEmbeddedExecution(),
- "only embedded execution is supported at this time");
- LocalMode lma = LocalMode.newInstance();
- Configuration conf = new Configuration(false);
- try {
- lma.prepareDAG(apexApp, conf);
- LocalMode.Controller lc = lma.getController();
+ if (options.isEmbeddedExecution()) {
+ Launcher<AppHandle> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+ Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true);
if (options.isEmbeddedExecutionDebugMode()) {
// turns off timeout checking for operator progress
- lc.setHeartbeatMonitoringEnabled(false);
+ launchAttributes.put(EmbeddedAppLauncher.HEARTBEAT_MONITORING, false);
+ }
+ Configuration conf = new Configuration(false);
+ try {
+ ApexRunner.ASSERTION_ERROR.set(null);
+ AppHandle apexAppResult = launcher.launchApp(apexApp, conf, launchAttributes);
+ return new ApexRunnerResult(apexDAG.get(), apexAppResult);
+ } catch (Exception e) {
+ Throwables.propagateIfPossible(e);
+ throw new RuntimeException(e);
+ }
+ } else {
+ try {
+ ApexYarnLauncher yarnLauncher = new ApexYarnLauncher();
+ AppHandle apexAppResult = yarnLauncher.launchApp(apexApp);
+ return new ApexRunnerResult(apexDAG.get(), apexAppResult);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to launch the application on YARN.", e);
}
- ApexRunner.ASSERTION_ERROR.set(null);
- lc.runAsync();
- return new ApexRunnerResult(lma.getDAG(), lc);
- } catch (Exception e) {
- Throwables.propagateIfPossible(e);
- throw new RuntimeException(e);
}
+
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6b2e202/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index 18b50bc..8548194 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -18,11 +18,11 @@
package org.apache.beam.runners.apex;
import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
import java.io.IOException;
-import java.lang.reflect.Field;
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.ShutdownMode;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
@@ -36,12 +36,12 @@ import org.joda.time.Duration;
*/
public class ApexRunnerResult implements PipelineResult {
private final DAG apexDAG;
- private final LocalMode.Controller ctrl;
+ private final AppHandle apexApp;
private State state = State.UNKNOWN;
- public ApexRunnerResult(DAG dag, LocalMode.Controller ctrl) {
+ public ApexRunnerResult(DAG dag, AppHandle apexApp) {
this.apexDAG = dag;
- this.ctrl = ctrl;
+ this.apexApp = apexApp;
}
@Override
@@ -57,19 +57,31 @@ public class ApexRunnerResult implements PipelineResult {
@Override
public State cancel() throws IOException {
- ctrl.shutdown();
+ apexApp.shutdown(ShutdownMode.KILL);
state = State.CANCELLED;
return state;
}
@Override
public State waitUntilFinish(Duration duration) {
- return ApexRunnerResult.waitUntilFinished(ctrl, duration);
+ long timeout = (duration == null || duration.getMillis() < 1) ? Long.MAX_VALUE
+ : System.currentTimeMillis() + duration.getMillis();
+ try {
+ while (!apexApp.isFinished() && System.currentTimeMillis() < timeout) {
+ if (ApexRunner.ASSERTION_ERROR.get() != null) {
+ throw ApexRunner.ASSERTION_ERROR.get();
+ }
+ Thread.sleep(500);
+ }
+ return apexApp.isFinished() ? State.DONE : null;
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
public State waitUntilFinish() {
- return ApexRunnerResult.waitUntilFinished(ctrl, null);
+ return waitUntilFinish(null);
}
@Override
@@ -85,26 +97,4 @@ public class ApexRunnerResult implements PipelineResult {
return apexDAG;
}
- public static State waitUntilFinished(LocalMode.Controller ctrl, Duration duration) {
- // we need to rely on internal field for now
- // Apex should make it available through API in upcoming release.
- long timeout = (duration == null || duration.getMillis() < 1) ? Long.MAX_VALUE
- : System.currentTimeMillis() + duration.getMillis();
- Field appDoneField;
- try {
- appDoneField = ctrl.getClass().getDeclaredField("appDone");
- appDoneField.setAccessible(true);
- while (!appDoneField.getBoolean(ctrl) && System.currentTimeMillis() < timeout) {
- if (ApexRunner.ASSERTION_ERROR.get() != null) {
- throw ApexRunner.ASSERTION_ERROR.get();
- }
- Thread.sleep(500);
- }
- return appDoneField.getBoolean(ctrl) ? State.DONE : null;
- } catch (NoSuchFieldException | SecurityException | IllegalArgumentException
- | IllegalAccessException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6b2e202/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
new file mode 100644
index 0000000..0ae4cc7
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Attribute.AttributeMap;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.lang.reflect.AccessibleObject;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.LaunchMode;
+import org.apache.apex.api.Launcher.LauncherException;
+import org.apache.apex.api.Launcher.ShutdownMode;
+import org.apache.apex.api.YarnAppLauncher;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Proxy to launch the YARN application through the hadoop script to run in the
+ * pre-configured environment (class path, configuration, native libraries etc.).
+ *
+ * <p>The proxy takes the DAG and communicates with the Hadoop services to launch
+ * it on the cluster.
+ */
+public class ApexYarnLauncher {
+ private static final Logger LOG = LoggerFactory.getLogger(ApexYarnLauncher.class);
+
+ public AppHandle launchApp(StreamingApplication app) throws IOException {
+
+ List<File> jarsToShip = getYarnDeployDependencies();
+ StringBuilder classpath = new StringBuilder();
+ for (File path : jarsToShip) {
+ if (path.isDirectory()) {
+ File tmpJar = File.createTempFile("beam-runners-apex-", ".jar");
+ createJar(path, tmpJar);
+ tmpJar.deleteOnExit();
+ path = tmpJar;
+ }
+ if (classpath.length() != 0) {
+ classpath.append(':');
+ }
+ classpath.append(path.getAbsolutePath());
+ }
+
+ EmbeddedAppLauncher<?> embeddedLauncher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+ DAG dag = embeddedLauncher.getDAG();
+ app.populateDAG(dag, new Configuration(false));
+
+ Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ launchAttributes.put(YarnAppLauncher.LIB_JARS, classpath.toString().replace(':', ','));
+ LaunchParams lp = new LaunchParams(dag, launchAttributes);
+ lp.cmd = "hadoop " + ApexYarnLauncher.class.getName();
+ HashMap<String, String> env = new HashMap<>();
+ env.put("HADOOP_USER_CLASSPATH_FIRST", "1");
+ env.put("HADOOP_CLASSPATH", classpath.toString());
+ lp.env = env;
+ return launchApp(lp);
+ }
+
+ protected AppHandle launchApp(LaunchParams params) throws IOException {
+ File tmpFile = File.createTempFile("beam-runner-apex", "params");
+ tmpFile.deleteOnExit();
+ try (FileOutputStream fos = new FileOutputStream(tmpFile)) {
+ SerializationUtils.serialize(params, fos);
+ }
+ if (params.getCmd() == null) {
+ ApexYarnLauncher.main(new String[] {tmpFile.getAbsolutePath()});
+ } else {
+ String cmd = params.getCmd() + " " + tmpFile.getAbsolutePath();
+ ByteArrayOutputStream consoleOutput = new ByteArrayOutputStream();
+ LOG.info("Executing: {} with {}", cmd, params.getEnv());
+
+ ProcessBuilder pb = new ProcessBuilder("bash", "-c", cmd);
+ Map<String, String> env = pb.environment();
+ env.putAll(params.getEnv());
+ Process p = pb.start();
+ ProcessWatcher pw = new ProcessWatcher(p);
+ InputStream output = p.getInputStream();
+ InputStream error = p.getErrorStream();
+ while (!pw.isFinished()) {
+ IOUtils.copy(output, consoleOutput);
+ IOUtils.copy(error, consoleOutput);
+ }
+ if (pw.rc != 0) {
+ String msg = "The Beam Apex runner in non-embedded mode requires the Hadoop client"
+ + " to be installed on the machine from which you launch the job"
+ + " and the 'hadoop' script in $PATH";
+ LOG.error(msg);
+ throw new RuntimeException("Failed to run: " + cmd + " (exit code " + pw.rc + ")" + "\n"
+ + consoleOutput.toString());
+ }
+ }
+ return new AppHandle() {
+ @Override
+ public boolean isFinished() {
+ // TODO (future PR): interaction with child process
+ LOG.warn("YARN application runs asynchronously and status check not implemented.");
+ return true;
+ }
+ @Override
+ public void shutdown(ShutdownMode arg0) throws LauncherException {
+ // TODO (future PR): interaction with child process
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ /**
+ * From the current classpath, find the jar files that need to be deployed
+ * with the application to run on YARN. Hadoop dependencies are provided
+ * through the Hadoop installation and the application should not bundle them
+ * to avoid conflicts. This is done by removing the Hadoop compile
+ * dependencies (transitively) by parsing the Maven dependency tree.
+ *
+ * @return list of jar files to ship
+ * @throws IOException when dependency information cannot be read
+ */
+ public static List<File> getYarnDeployDependencies() throws IOException {
+ InputStream dependencyTree = ApexRunner.class.getResourceAsStream("dependency-tree");
+ BufferedReader br = new BufferedReader(new InputStreamReader(dependencyTree));
+ String line = null;
+ List<String> excludes = new ArrayList<>();
+ int excludeLevel = Integer.MAX_VALUE;
+ while ((line = br.readLine()) != null) {
+ for (int i = 0; i < line.length(); i++) {
+ char c = line.charAt(i);
+ if (Character.isLetter(c)) {
+ if (i > excludeLevel) {
+ excludes.add(line.substring(i));
+ } else {
+ if (line.substring(i).startsWith("org.apache.hadoop")) {
+ excludeLevel = i;
+ excludes.add(line.substring(i));
+ } else {
+ excludeLevel = Integer.MAX_VALUE;
+ }
+ }
+ break;
+ }
+ }
+ }
+ br.close();
+
+ Set<String> excludeJarFileNames = Sets.newHashSet();
+ for (String exclude : excludes) {
+ String[] mvnc = exclude.split(":");
+ String fileName = mvnc[1] + "-";
+ if (mvnc.length == 6) {
+ fileName += mvnc[4] + "-" + mvnc[3]; // with classifier
+ } else {
+ fileName += mvnc[3];
+ }
+ fileName += ".jar";
+ excludeJarFileNames.add(fileName);
+ }
+
+ ClassLoader classLoader = ApexYarnLauncher.class.getClassLoader();
+ URL[] urls = ((URLClassLoader) classLoader).getURLs();
+ List<File> dependencyJars = new ArrayList<>();
+ for (int i = 0; i < urls.length; i++) {
+ File f = new File(urls[i].getFile());
+ // dependencies can also be directories in the build reactor,
+ // the Apex client will automatically create jar files for those.
+ if (f.exists() && !excludeJarFileNames.contains(f.getName())) {
+ dependencyJars.add(f);
+ }
+ }
+ return dependencyJars;
+ }
+
+ /**
+ * Create a jar file from the given directory.
+ * @param dir source directory
+ * @param jarFile jar file name
+ * @throws IOException when file cannot be created
+ */
+ public static void createJar(File dir, File jarFile) throws IOException {
+
+ final Map<String, ?> env = Collections.singletonMap("create", "true");
+ if (jarFile.exists() && !jarFile.delete()) {
+ throw new RuntimeException("Failed to remove " + jarFile);
+ }
+ URI uri = URI.create("jar:" + jarFile.toURI());
+ try (final FileSystem zipfs = FileSystems.newFileSystem(uri, env);) {
+
+ File manifestFile = new File(dir, JarFile.MANIFEST_NAME);
+ Files.createDirectory(zipfs.getPath("META-INF"));
+ final OutputStream out = Files.newOutputStream(zipfs.getPath(JarFile.MANIFEST_NAME));
+ if (!manifestFile.exists()) {
+ new Manifest().write(out);
+ } else {
+ FileUtils.copyFile(manifestFile, out);
+ }
+ out.close();
+
+ final java.nio.file.Path root = dir.toPath();
+ Files.walkFileTree(root, new java.nio.file.SimpleFileVisitor<Path>() {
+ String relativePath;
+
+ @Override
+ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
+ throws IOException {
+ relativePath = root.relativize(dir).toString();
+ if (!relativePath.isEmpty()) {
+ if (!relativePath.endsWith("/")) {
+ relativePath += "/";
+ }
+ final Path dstDir = zipfs.getPath(relativePath);
+ Files.createDirectory(dstDir);
+ }
+ return super.preVisitDirectory(dir, attrs);
+ }
+
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ String name = relativePath + file.getFileName();
+ if (!JarFile.MANIFEST_NAME.equals(name)) {
+ final OutputStream out = Files.newOutputStream(zipfs.getPath(name));
+ FileUtils.copyFile(file.toFile(), out);
+ out.close();
+ }
+ return super.visitFile(file, attrs);
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+ relativePath = root.relativize(dir.getParent()).toString();
+ if (!relativePath.isEmpty() && !relativePath.endsWith("/")) {
+ relativePath += "/";
+ }
+ return super.postVisitDirectory(dir, exc);
+ }
+ });
+ }
+ }
+
+ /**
+ * The main method expects the serialized DAG and will launch the YARN application.
+ * @param args location of launch parameters
+ * @throws IOException when parameters cannot be read
+ */
+ public static void main(String[] args) throws IOException {
+ checkArgument(args.length == 1, "exactly one argument expected");
+ File file = new File(args[0]);
+ checkArgument(file.exists() && file.isFile(), "invalid file path %s", file);
+ final LaunchParams params = (LaunchParams) SerializationUtils.deserialize(
+ new FileInputStream(file));
+ StreamingApplication apexApp = new StreamingApplication() {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf) {
+ copyShallow(params.dag, dag);
+ }
+ };
+ Configuration conf = new Configuration(); // configuration from Hadoop client
+ AppHandle appHandle = params.getApexLauncher().launchApp(apexApp, conf,
+ params.launchAttributes);
+ if (appHandle == null) {
+ throw new AssertionError("Launch returns null handle.");
+ }
+ // TODO (future PR)
+ // At this point the application is running, but this process should remain active to
+ // allow the parent to implement the runner result.
+ }
+
+ /**
+ * Launch parameters that will be serialized and passed to the child process.
+ */
+ @VisibleForTesting
+ protected static class LaunchParams implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final DAG dag;
+ private final Attribute.AttributeMap launchAttributes;
+ private HashMap<String, String> env;
+ private String cmd;
+
+ protected LaunchParams(DAG dag, AttributeMap launchAttributes) {
+ this.dag = dag;
+ this.launchAttributes = launchAttributes;
+ }
+
+ protected Launcher<?> getApexLauncher() {
+ return Launcher.getLauncher(LaunchMode.YARN);
+ }
+
+ protected String getCmd() {
+ return cmd;
+ }
+
+ protected Map<String, String> getEnv() {
+ return env;
+ }
+
+ }
+
+ private static void copyShallow(DAG from, DAG to) {
+ checkArgument(from.getClass() == to.getClass(), "must be same class %s %s",
+ from.getClass(), to.getClass());
+ Field[] fields = from.getClass().getDeclaredFields();
+ AccessibleObject.setAccessible(fields, true);
+ for (int i = 0; i < fields.length; i++) {
+ Field field = fields[i];
+ if (!java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
+ try {
+ field.set(to, field.get(from));
+ } catch (IllegalArgumentException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Starts a command and waits for it to complete.
+ */
+ public static class ProcessWatcher implements Runnable {
+ private final Process p;
+ private volatile boolean finished = false;
+ private volatile int rc;
+
+ public ProcessWatcher(Process p) {
+ this.p = p;
+ new Thread(this).start();
+ }
+
+ public boolean isFinished() {
+ return finished;
+ }
+
+ @Override
+ public void run() {
+ try {
+ rc = p.waitFor();
+ } catch (Exception e) {
+ // ignore
+ }
+ finished = true;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6b2e202/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
new file mode 100644
index 0000000..986818b
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Attribute.AttributeMap;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.jar.JarFile;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.LaunchMode;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for dependency resolution for pipeline execution on YARN.
+ */
+public class ApexYarnLauncherTest {
+
+ @Test
+ public void testGetYarnDeployDependencies() throws Exception {
+ List<File> deps = ApexYarnLauncher.getYarnDeployDependencies();
+ String depsToString = deps.toString();
+ // the beam dependencies are not present as jar when running within the Maven build reactor
+ //assertThat(depsToString, containsString("beam-runners-core-"));
+ //assertThat(depsToString, containsString("beam-runners-apex-"));
+ assertThat(depsToString, containsString("apex-common-"));
+ assertThat(depsToString, not(containsString("hadoop-")));
+ assertThat(depsToString, not(containsString("zookeeper-")));
+ }
+
+ @Test
+ public void testProxyLauncher() throws Exception {
+ // use the embedded launcher to build the DAG only
+ EmbeddedAppLauncher<?> embeddedLauncher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+
+ StreamingApplication app = new StreamingApplication() {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf) {
+ dag.setAttribute(DAGContext.APPLICATION_NAME, "DummyApp");
+ }
+ };
+
+ Configuration conf = new Configuration(false);
+ DAG dag = embeddedLauncher.prepareDAG(app, conf);
+ Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ ApexYarnLauncher launcher = new ApexYarnLauncher();
+ launcher.launchApp(new MockApexYarnLauncherParams(dag, launchAttributes));
+ }
+
+ private static class MockApexYarnLauncherParams extends ApexYarnLauncher.LaunchParams {
+ private static final long serialVersionUID = 1L;
+
+ public MockApexYarnLauncherParams(DAG dag, AttributeMap launchAttributes) {
+ super(dag, launchAttributes);
+ }
+
+ @Override
+ protected Launcher<?> getApexLauncher() {
+ return new Launcher<AppHandle>() {
+ @Override
+ public AppHandle launchApp(StreamingApplication application,
+ Configuration configuration, AttributeMap launchParameters)
+ throws org.apache.apex.api.Launcher.LauncherException {
+ EmbeddedAppLauncher<?> embeddedLauncher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+ DAG dag = embeddedLauncher.getDAG();
+ application.populateDAG(dag, new Configuration(false));
+ String appName = dag.getValue(DAGContext.APPLICATION_NAME);
+ Assert.assertEquals("DummyApp", appName);
+ return new AppHandle() {
+ @Override
+ public boolean isFinished() {
+ return true;
+ }
+ @Override
+ public void shutdown(org.apache.apex.api.Launcher.ShutdownMode arg0) {
+ }
+ };
+ }
+ };
+ }
+
+ }
+
+ @Test
+ public void testCreateJar() throws Exception {
+ File baseDir = new File("./target/testCreateJar");
+ File srcDir = new File(baseDir, "src");
+ String file1 = "file1";
+ FileUtils.forceMkdir(srcDir);
+ FileUtils.write(new File(srcDir, file1), "file1");
+
+ File jarFile = new File(baseDir, "test.jar");
+ ApexYarnLauncher.createJar(srcDir, jarFile);
+ Assert.assertTrue("exists: " + jarFile, jarFile.exists());
+ URI uri = URI.create("jar:" + jarFile.toURI());
+ final Map<String, ?> env = Collections.singletonMap("create", "true");
+ try (final FileSystem zipfs = FileSystems.newFileSystem(uri, env);) {
+ Assert.assertTrue("manifest", Files.isRegularFile(zipfs.getPath(JarFile.MANIFEST_NAME)));
+ Assert.assertTrue("file1", Files.isRegularFile(zipfs.getPath(file1)));
+ }
+
+ }
+}
[2/2] incubator-beam git commit: This closes #1517
Posted by th...@apache.org.
This closes #1517
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a834fb0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a834fb0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a834fb0e
Branch: refs/heads/master
Commit: a834fb0eb555ed121550dbe883250207baacb841
Parents: 58000b3 b6b2e20
Author: Thomas Weise <th...@apache.org>
Authored: Sat Dec 10 11:00:09 2016 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Sat Dec 10 11:00:58 2016 -0800
----------------------------------------------------------------------
runners/apex/pom.xml | 52 ++-
.../apache/beam/runners/apex/ApexRunner.java | 48 ++-
.../beam/runners/apex/ApexRunnerResult.java | 50 +--
.../beam/runners/apex/ApexYarnLauncher.java | 395 +++++++++++++++++++
.../beam/runners/apex/ApexYarnLauncherTest.java | 138 +++++++
5 files changed, 631 insertions(+), 52 deletions(-)
----------------------------------------------------------------------