You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/29 16:22:28 UTC

[42/50] beam git commit: BEAM-980 Support configuration of Apex DAG through properties file.

BEAM-980 Support configuration of Apex DAG through properties file.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/31c63cb8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/31c63cb8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/31c63cb8

Branch: refs/heads/python-sdk
Commit: 31c63cb8c14ea71ed45376d19b4fd9f285d80763
Parents: 1c6e667
Author: Thomas Weise <th...@apache.org>
Authored: Wed Jan 25 22:22:36 2017 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Thu Jan 26 22:54:00 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/apex/ApexPipelineOptions.java  |  7 +-
 .../apache/beam/runners/apex/ApexRunner.java    | 43 ++++++++---
 .../beam/runners/apex/ApexYarnLauncher.java     | 23 +++++-
 .../beam/runners/apex/ApexRunnerTest.java       | 75 ++++++++++++++++++++
 .../beam/runners/apex/ApexYarnLauncherTest.java |  9 ++-
 .../test/resources/beam-runners-apex.properties | 20 ++++++
 6 files changed, 161 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
index 54fdf76..f37e874 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
@@ -56,5 +56,10 @@ public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializab
   @Default.Long(0)
   long getRunMillis();
 
-}
+  @Description("configuration properties file for the Apex engine")
+  void setConfigFile(String name);
+
+  @Default.String("classpath:/beam-runners-apex.properties")
+  String getConfigFile();
 
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index f12ebef..e220e6c 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -22,10 +22,16 @@ import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.google.common.base.Throwables;
+
+import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.apex.api.EmbeddedAppLauncher;
 import org.apache.apex.api.Launcher;
@@ -64,6 +70,7 @@ import org.apache.hadoop.conf.Configuration;
 public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
 
   private final ApexPipelineOptions options;
+  public static final String CLASSPATH_SCHEME = "classpath";
 
   /**
    * TODO: this isn't thread safe and may cause issues when tests run in parallel
@@ -126,6 +133,31 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
       }
     };
 
+    Properties configProperties = new Properties();
+    try {
+      if (options.getConfigFile() != null) {
+        URI configURL = new URI(options.getConfigFile());
+        if (CLASSPATH_SCHEME.equals(configURL.getScheme())) {
+          InputStream is = this.getClass().getResourceAsStream(configURL.getPath());
+          if (is != null) {
+            configProperties.load(is);
+            is.close();
+          }
+        } else {
+          if (!configURL.isAbsolute()) {
+            // resolve as local file name
+            File f = new File(options.getConfigFile());
+            configURL = f.toURI();
+          }
+          try (InputStream is = configURL.toURL().openStream()) {
+            configProperties.load(is);
+          }
+        }
+      }
+    } catch (IOException | URISyntaxException ex) {
+      throw new RuntimeException("Error loading properties", ex);
+    }
+
     if (options.isEmbeddedExecution()) {
       Launcher<AppHandle> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
       Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
@@ -135,6 +167,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
         launchAttributes.put(EmbeddedAppLauncher.HEARTBEAT_MONITORING, false);
       }
       Configuration conf = new Configuration(false);
+      ApexYarnLauncher.addProperties(conf, configProperties);
       try {
         ApexRunner.ASSERTION_ERROR.set(null);
         AppHandle apexAppResult = launcher.launchApp(apexApp, conf, launchAttributes);
@@ -146,7 +179,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     } else {
       try {
         ApexYarnLauncher yarnLauncher = new ApexYarnLauncher();
-        AppHandle apexAppResult = yarnLauncher.launchApp(apexApp);
+        AppHandle apexAppResult = yarnLauncher.launchApp(apexApp, configProperties);
         return new ApexRunnerResult(apexDAG.get(), apexAppResult);
       } catch (IOException e) {
         throw new RuntimeException("Failed to launch the application on YARN.", e);
@@ -155,14 +188,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
 
   }
 
-  private static class IdentityFn<T> extends DoFn<T, T> {
-    private static final long serialVersionUID = 1L;
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(c.element());
-    }
-  }
-
 ////////////////////////////////////////////
 // Adapted from FlinkRunner for View support
 

http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
index a2d88f4..6bc42f0 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java
@@ -52,6 +52,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
@@ -80,7 +81,8 @@ import org.slf4j.LoggerFactory;
 public class ApexYarnLauncher {
   private static final Logger LOG = LoggerFactory.getLogger(ApexYarnLauncher.class);
 
-  public AppHandle launchApp(StreamingApplication app) throws IOException {
+  public AppHandle launchApp(StreamingApplication app, Properties configProperties)
+      throws IOException {
 
     List<File> jarsToShip = getYarnDeployDependencies();
     StringBuilder classpath = new StringBuilder();
@@ -103,7 +105,7 @@ public class ApexYarnLauncher {
 
     Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
     launchAttributes.put(YarnAppLauncher.LIB_JARS, classpath.toString().replace(':', ','));
-    LaunchParams lp = new LaunchParams(dag, launchAttributes);
+    LaunchParams lp = new LaunchParams(dag, launchAttributes, configProperties);
     lp.cmd = "hadoop " + ApexYarnLauncher.class.getName();
     HashMap<String, String> env = new HashMap<>();
     env.put("HADOOP_USER_CLASSPATH_FIRST", "1");
@@ -292,6 +294,18 @@ public class ApexYarnLauncher {
   }
 
   /**
+   * Transfer the properties to the configuration object.
+   * @param conf
+   * @param props
+   */
+  public static void addProperties(Configuration conf, Properties props) {
+    for (final String propertyName : props.stringPropertyNames()) {
+      String propertyValue = props.getProperty(propertyName);
+      conf.set(propertyName, propertyValue);
+    }
+  }
+
+  /**
    * The main method expects the serialized DAG and will launch the YARN application.
    * @param args location of launch parameters
    * @throws IOException when parameters cannot be read
@@ -309,6 +323,7 @@ public class ApexYarnLauncher {
       }
     };
     Configuration conf = new Configuration(); // configuration from Hadoop client
+    addProperties(conf, params.configProperties);
     AppHandle appHandle = params.getApexLauncher().launchApp(apexApp, conf,
         params.launchAttributes);
     if (appHandle == null) {
@@ -327,12 +342,14 @@ public class ApexYarnLauncher {
     private static final long serialVersionUID = 1L;
     private final DAG dag;
     private final Attribute.AttributeMap launchAttributes;
+    private final Properties configProperties;
     private HashMap<String, String> env;
     private String cmd;
 
-    protected LaunchParams(DAG dag, AttributeMap launchAttributes) {
+    protected LaunchParams(DAG dag, AttributeMap launchAttributes, Properties configProperties) {
       this.dag = dag;
       this.launchAttributes = launchAttributes;
+      this.configProperties = configProperties;
     }
 
     protected Launcher<?> getApexLauncher() {

http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java
new file mode 100644
index 0000000..436c959
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.OperatorMeta;
+import com.datatorrent.stram.engine.OperatorContext;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the Apex runner.
+ */
+public class ApexRunnerTest {
+
+  @Test
+  public void testConfigProperties() throws Exception {
+
+    String operName = "testProperties";
+    ApexPipelineOptions options = PipelineOptionsFactory.create()
+        .as(ApexPipelineOptions.class);
+    options.setRunner(ApexRunner.class);
+
+    // default configuration from class path
+    Pipeline p = Pipeline.create(options);
+    p.apply(operName, Create.of(Collections.emptyList()));
+    ApexRunnerResult result = (ApexRunnerResult) p.run();
+    result.cancel();
+
+    DAG dag = result.getApexDAG();
+    OperatorMeta t1Meta = dag.getOperatorMeta(operName);
+    Assert.assertNotNull(t1Meta);
+    Assert.assertEquals(new Integer(32), t1Meta.getValue(OperatorContext.MEMORY_MB));
+
+    File tmp = File.createTempFile("beam-runners-apex-", ".properties");
+    tmp.deleteOnExit();
+    Properties props = new Properties();
+    props.setProperty("dt.operator." + operName + ".attr.MEMORY_MB", "64");
+    try (FileOutputStream fos = new FileOutputStream(tmp)) {
+      props.store(fos, "");
+    }
+    options.setConfigFile(tmp.getAbsolutePath());
+    result = (ApexRunnerResult) p.run();
+    result.cancel();
+    tmp.delete();
+    dag = result.getApexDAG();
+    t1Meta = dag.getOperatorMeta(operName);
+    Assert.assertNotNull(t1Meta);
+    Assert.assertEquals(new Integer(64), t1Meta.getValue(OperatorContext.MEMORY_MB));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
index 986818b..6ffb091 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java
@@ -35,6 +35,7 @@ import java.nio.file.Files;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.jar.JarFile;
 
 import org.apache.apex.api.EmbeddedAppLauncher;
@@ -78,15 +79,17 @@ public class ApexYarnLauncherTest {
     Configuration conf = new Configuration(false);
     DAG dag = embeddedLauncher.prepareDAG(app, conf);
     Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    Properties configProperties = new Properties();
     ApexYarnLauncher launcher = new ApexYarnLauncher();
-    launcher.launchApp(new MockApexYarnLauncherParams(dag, launchAttributes));
+    launcher.launchApp(new MockApexYarnLauncherParams(dag, launchAttributes, configProperties));
   }
 
   private static class MockApexYarnLauncherParams extends  ApexYarnLauncher.LaunchParams {
     private static final long serialVersionUID = 1L;
 
-    public MockApexYarnLauncherParams(DAG dag, AttributeMap launchAttributes) {
-      super(dag, launchAttributes);
+    public MockApexYarnLauncherParams(DAG dag, AttributeMap launchAttributes,
+        Properties properties) {
+      super(dag, launchAttributes, properties);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/test/resources/beam-runners-apex.properties
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/resources/beam-runners-apex.properties b/runners/apex/src/test/resources/beam-runners-apex.properties
new file mode 100644
index 0000000..48f8b05
--- /dev/null
+++ b/runners/apex/src/test/resources/beam-runners-apex.properties
@@ -0,0 +1,20 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# properties for unit test
+dt.operator.testProperties.attr.MEMORY_MB=32