You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2017/01/27 22:01:32 UTC
[1/2] beam git commit: BEAM-980 Support configuration of Apex DAG
through properties file.
Repository: beam
Updated Branches:
refs/heads/master b21bdf475 -> 34b4a6d9d
BEAM-980 Support configuration of Apex DAG through properties file.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/31c63cb8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/31c63cb8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/31c63cb8
Branch: refs/heads/master
Commit: 31c63cb8c14ea71ed45376d19b4fd9f285d80763
Parents: 1c6e667
Author: Thomas Weise <th...@apache.org>
Authored: Wed Jan 25 22:22:36 2017 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Thu Jan 26 22:54:00 2017 -0800
----------------------------------------------------------------------
.../beam/runners/apex/ApexPipelineOptions.java | 7 +-
.../apache/beam/runners/apex/ApexRunner.java | 43 ++++++++---
.../beam/runners/apex/ApexYarnLauncher.java | 23 +++++-
.../beam/runners/apex/ApexRunnerTest.java | 75 ++++++++++++++++++++
.../beam/runners/apex/ApexYarnLauncherTest.java | 9 ++-
.../test/resources/beam-runners-apex.properties | 20 ++++++
6 files changed, 161 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
index 54fdf76..f37e874 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
@@ -56,5 +56,10 @@ public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializab
@Default.Long(0)
long getRunMillis();
-}
+ @Description("configuration properties file for the Apex engine")
+ void setConfigFile(String name);
+
+ @Default.String("classpath:/beam-runners-apex.properties")
+ String getConfigFile();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index f12ebef..e220e6c 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -22,10 +22,16 @@ import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.google.common.base.Throwables;
+
+import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.apex.api.EmbeddedAppLauncher;
import org.apache.apex.api.Launcher;
@@ -64,6 +70,7 @@ import org.apache.hadoop.conf.Configuration;
public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
private final ApexPipelineOptions options;
+ public static final String CLASSPATH_SCHEME = "classpath";
/**
* TODO: this isn't thread safe and may cause issues when tests run in parallel
@@ -126,6 +133,31 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
};
+ Properties configProperties = new Properties();
+ try {
+ if (options.getConfigFile() != null) {
+ URI configURL = new URI(options.getConfigFile());
+ if (CLASSPATH_SCHEME.equals(configURL.getScheme())) {
+ InputStream is = this.getClass().getResourceAsStream(configURL.getPath());
+ if (is != null) {
+ configProperties.load(is);
+ is.close();
+ }
+ } else {
+ if (!configURL.isAbsolute()) {
+ // resolve as local file name
+ File f = new File(options.getConfigFile());
+ configURL = f.toURI();
+ }
+ try (InputStream is = configURL.toURL().openStream()) {
+ configProperties.load(is);
+ }
+ }
+ }
+ } catch (IOException | URISyntaxException ex) {
+ throw new RuntimeException("Error loading properties", ex);
+ }
+
if (options.isEmbeddedExecution()) {
Launcher<AppHandle> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
@@ -135,6 +167,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
launchAttributes.put(EmbeddedAppLauncher.HEARTBEAT_MONITORING, false);
}
Configuration conf = new Configuration(false);
+ ApexYarnLauncher.addProperties(conf, configProperties);
try {
ApexRunner.ASSERTION_ERROR.set(null);
AppHandle apexAppResult = launcher.launchApp(apexApp, conf, launchAttributes);
@@ -146,7 +179,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
} else {
try {
ApexYarnLauncher yarnLauncher = new ApexYarnLauncher();
- AppHandle apexAppResult = yarnLauncher.launchApp(apexApp);
+ AppHandle apexAppResult = yarnLauncher.launchApp(apexApp, configProperties);
return new ApexRunnerResult(apexDAG.get(), apexAppResult);
} catch (IOException e) {
throw new RuntimeException("Failed to launch the application on YARN.", e);
@@ -155,14 +188,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
- private static class IdentityFn<T> extends DoFn<T, T> {
- private static final long serialVersionUID = 1L;
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element());
- }
- }
-
////////////////////////////////////////////
// Adapted from FlinkRunner for View support
http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
index a2d88f4..6bc42f0 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
@@ -52,6 +52,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
@@ -80,7 +81,8 @@ import org.slf4j.LoggerFactory;
public class ApexYarnLauncher {
private static final Logger LOG = LoggerFactory.getLogger(ApexYarnLauncher.class);
- public AppHandle launchApp(StreamingApplication app) throws IOException {
+ public AppHandle launchApp(StreamingApplication app, Properties configProperties)
+ throws IOException {
List<File> jarsToShip = getYarnDeployDependencies();
StringBuilder classpath = new StringBuilder();
@@ -103,7 +105,7 @@ public class ApexYarnLauncher {
Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
launchAttributes.put(YarnAppLauncher.LIB_JARS, classpath.toString().replace(':', ','));
- LaunchParams lp = new LaunchParams(dag, launchAttributes);
+ LaunchParams lp = new LaunchParams(dag, launchAttributes, configProperties);
lp.cmd = "hadoop " + ApexYarnLauncher.class.getName();
HashMap<String, String> env = new HashMap<>();
env.put("HADOOP_USER_CLASSPATH_FIRST", "1");
@@ -292,6 +294,18 @@ public class ApexYarnLauncher {
}
/**
+ * Transfer the properties to the configuration object.
+ * @param conf
+ * @param props
+ */
+ public static void addProperties(Configuration conf, Properties props) {
+ for (final String propertyName : props.stringPropertyNames()) {
+ String propertyValue = props.getProperty(propertyName);
+ conf.set(propertyName, propertyValue);
+ }
+ }
+
+ /**
* The main method expects the serialized DAG and will launch the YARN application.
* @param args location of launch parameters
* @throws IOException when parameters cannot be read
@@ -309,6 +323,7 @@ public class ApexYarnLauncher {
}
};
Configuration conf = new Configuration(); // configuration from Hadoop client
+ addProperties(conf, params.configProperties);
AppHandle appHandle = params.getApexLauncher().launchApp(apexApp, conf,
params.launchAttributes);
if (appHandle == null) {
@@ -327,12 +342,14 @@ public class ApexYarnLauncher {
private static final long serialVersionUID = 1L;
private final DAG dag;
private final Attribute.AttributeMap launchAttributes;
+ private final Properties configProperties;
private HashMap<String, String> env;
private String cmd;
- protected LaunchParams(DAG dag, AttributeMap launchAttributes) {
+ protected LaunchParams(DAG dag, AttributeMap launchAttributes, Properties configProperties) {
this.dag = dag;
this.launchAttributes = launchAttributes;
+ this.configProperties = configProperties;
}
protected Launcher<?> getApexLauncher() {
http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java
new file mode 100644
index 0000000..436c959
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.OperatorMeta;
+import com.datatorrent.stram.engine.OperatorContext;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the Apex runner.
+ */
+public class ApexRunnerTest {
+
+ @Test
+ public void testConfigProperties() throws Exception {
+
+ String operName = "testProperties";
+ ApexPipelineOptions options = PipelineOptionsFactory.create()
+ .as(ApexPipelineOptions.class);
+ options.setRunner(ApexRunner.class);
+
+ // default configuration from class path
+ Pipeline p = Pipeline.create(options);
+ p.apply(operName, Create.of(Collections.emptyList()));
+ ApexRunnerResult result = (ApexRunnerResult) p.run();
+ result.cancel();
+
+ DAG dag = result.getApexDAG();
+ OperatorMeta t1Meta = dag.getOperatorMeta(operName);
+ Assert.assertNotNull(t1Meta);
+ Assert.assertEquals(new Integer(32), t1Meta.getValue(OperatorContext.MEMORY_MB));
+
+ File tmp = File.createTempFile("beam-runners-apex-", ".properties");
+ tmp.deleteOnExit();
+ Properties props = new Properties();
+ props.setProperty("dt.operator." + operName + ".attr.MEMORY_MB", "64");
+ try (FileOutputStream fos = new FileOutputStream(tmp)) {
+ props.store(fos, "");
+ }
+ options.setConfigFile(tmp.getAbsolutePath());
+ result = (ApexRunnerResult) p.run();
+ result.cancel();
+ tmp.delete();
+ dag = result.getApexDAG();
+ t1Meta = dag.getOperatorMeta(operName);
+ Assert.assertNotNull(t1Meta);
+ Assert.assertEquals(new Integer(64), t1Meta.getValue(OperatorContext.MEMORY_MB));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
index 986818b..6ffb091 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
@@ -35,6 +35,7 @@ import java.nio.file.Files;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.jar.JarFile;
import org.apache.apex.api.EmbeddedAppLauncher;
@@ -78,15 +79,17 @@ public class ApexYarnLauncherTest {
Configuration conf = new Configuration(false);
DAG dag = embeddedLauncher.prepareDAG(app, conf);
Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ Properties configProperties = new Properties();
ApexYarnLauncher launcher = new ApexYarnLauncher();
- launcher.launchApp(new MockApexYarnLauncherParams(dag, launchAttributes));
+ launcher.launchApp(new MockApexYarnLauncherParams(dag, launchAttributes, configProperties));
}
private static class MockApexYarnLauncherParams extends ApexYarnLauncher.LaunchParams {
private static final long serialVersionUID = 1L;
- public MockApexYarnLauncherParams(DAG dag, AttributeMap launchAttributes) {
- super(dag, launchAttributes);
+ public MockApexYarnLauncherParams(DAG dag, AttributeMap launchAttributes,
+ Properties properties) {
+ super(dag, launchAttributes, properties);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/test/resources/beam-runners-apex.properties
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/resources/beam-runners-apex.properties b/runners/apex/src/test/resources/beam-runners-apex.properties
new file mode 100644
index 0000000..48f8b05
--- /dev/null
+++ b/runners/apex/src/test/resources/beam-runners-apex.properties
@@ -0,0 +1,20 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# properties for unit test
+dt.operator.testProperties.attr.MEMORY_MB=32
[2/2] beam git commit: This closes #1850
Posted by th...@apache.org.
This closes #1850
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/34b4a6d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/34b4a6d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/34b4a6d9
Branch: refs/heads/master
Commit: 34b4a6d9dc2cf5e8da43346077a36b460501afe2
Parents: b21bdf4 31c63cb
Author: Thomas Weise <th...@apache.org>
Authored: Fri Jan 27 14:01:09 2017 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Fri Jan 27 14:01:09 2017 -0800
----------------------------------------------------------------------
.../beam/runners/apex/ApexPipelineOptions.java | 7 +-
.../apache/beam/runners/apex/ApexRunner.java | 43 ++++++++---
.../beam/runners/apex/ApexYarnLauncher.java | 23 +++++-
.../beam/runners/apex/ApexRunnerTest.java | 75 ++++++++++++++++++++
.../beam/runners/apex/ApexYarnLauncherTest.java | 9 ++-
.../test/resources/beam-runners-apex.properties | 20 ++++++
6 files changed, 161 insertions(+), 16 deletions(-)
----------------------------------------------------------------------