You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/03/19 03:21:40 UTC

[1/2] incubator-beam git commit: Look up a runner if it is not registered

Repository: incubator-beam
Updated Branches:
  refs/heads/master 81d5ff5a5 -> a461e006a


Look up a runner if it is not registered

If a fully qualified runner is passed as the value of --runner, and it
is not present within the map of registered runners, attempts to look
up the runner using Class#forName, and uses the result class if the
result class is an instance of PipelineRunner. This brings the behavior
in line with the described behavior in PipelineOptions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e9dd155a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e9dd155a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e9dd155a

Branch: refs/heads/master
Commit: e9dd155a8dcc1233337f2c3ca4ae522d00277bc1
Parents: 81d5ff5
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 18 16:20:56 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Mar 18 19:19:43 2016 -0700

----------------------------------------------------------------------
 .../sdk/options/PipelineOptionsFactory.java     | 31 ++++++++++++++---
 .../sdk/options/PipelineOptionsFactoryTest.java | 36 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9dd155a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactory.java
index e77b89f..48cff6d 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactory.java
@@ -16,6 +16,8 @@
 
 package com.google.cloud.dataflow.sdk.options;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.cloud.dataflow.sdk.options.Validation.Required;
 import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
 import com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar;
@@ -1391,7 +1393,10 @@ public class PipelineOptionsFactory {
    * split up each string on ','.
    *
    * <p>We special case the "runner" option. It is mapped to the class of the {@link PipelineRunner}
-   * based off of the {@link PipelineRunner}s simple class name or fully qualified class name.
+   * based off of the {@link PipelineRunner PipelineRunners} simple class name. If the provided
+   * runner name is not registered via a {@link PipelineRunnerRegistrar}, we attempt to obtain the
+   * class that the name represents using {@link Class#forName(String)} and use the result class if
+   * it subclasses {@link PipelineRunner}.
    *
    * <p>If strict parsing is enabled, unknown options or options that cannot be converted to
    * the expected java type using an {@link ObjectMapper} will be ignored.
@@ -1442,10 +1447,26 @@ public class PipelineOptionsFactory {
         JavaType type = MAPPER.getTypeFactory().constructType(method.getGenericReturnType());
         if ("runner".equals(entry.getKey())) {
           String runner = Iterables.getOnlyElement(entry.getValue());
-          Preconditions.checkArgument(SUPPORTED_PIPELINE_RUNNERS.containsKey(runner),
-              "Unknown 'runner' specified '%s', supported pipeline runners %s",
-              runner, Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
-          convertedOptions.put("runner", SUPPORTED_PIPELINE_RUNNERS.get(runner));
+          if (SUPPORTED_PIPELINE_RUNNERS.containsKey(runner)) {
+            convertedOptions.put("runner", SUPPORTED_PIPELINE_RUNNERS.get(runner));
+          } else {
+            try {
+              Class<?> runnerClass = Class.forName(runner);
+              checkArgument(
+                  PipelineRunner.class.isAssignableFrom(runnerClass),
+                  "Class '%s' does not implement PipelineRunner. Supported pipeline runners %s",
+                  runner,
+                  Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
+              convertedOptions.put("runner", runnerClass);
+            } catch (ClassNotFoundException e) {
+              String msg =
+                  String.format(
+                      "Unknown 'runner' specified '%s', supported pipeline runners %s",
+                      runner,
+                      Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
+                throw new IllegalArgumentException(msg, e);
+            }
+          }
         } else if ((returnType.isArray() && (SIMPLE_TYPES.contains(returnType.getComponentType())
                 || returnType.getComponentType().isEnum()))
             || Collection.class.isAssignableFrom(returnType)) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9dd155a/sdk/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java
index e687f27..045a8ad 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java
@@ -25,8 +25,12 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
 import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
 import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
 import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
 import com.google.cloud.dataflow.sdk.testing.RestoreSystemProperties;
 import com.google.common.collect.ArrayListMultimap;
@@ -825,6 +829,14 @@ public class PipelineOptionsFactoryTest {
   }
 
   @Test
+  public void testSettingRunnerFullName() {
+    String[] args =
+        new String[] {String.format("--runner=%s", DataflowPipelineRunner.class.getName())};
+    PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+    assertEquals(opts.getRunner(), DataflowPipelineRunner.class);
+  }
+
+  @Test
   public void testSettingUnknownRunner() {
     String[] args = new String[] {"--runner=UnknownRunner"};
     expectedException.expect(IllegalArgumentException.class);
@@ -834,6 +846,30 @@ public class PipelineOptionsFactoryTest {
     PipelineOptionsFactory.fromArgs(args).create();
   }
 
+  private static class ExampleTestRunner extends PipelineRunner<PipelineResult> {
+    @Override
+    public PipelineResult run(Pipeline pipeline) {
+      return null;
+    }
+  }
+
+  @Test
+  public void testSettingRunnerCanonicalClassNameNotInSupportedExists() {
+    String[] args = new String[] {String.format("--runner=%s", ExampleTestRunner.class.getName())};
+    PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+    assertEquals(opts.getRunner(), ExampleTestRunner.class);
+  }
+
+  @Test
+  public void testSettingRunnerCanonicalClassNameNotInSupportedNotPipelineRunner() {
+    String[] args = new String[] {"--runner=java.lang.String"};
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("does not implement PipelineRunner");
+    expectedException.expectMessage("java.lang.String");
+
+    PipelineOptionsFactory.fromArgs(args).create();
+  }
+
   @Test
   public void testUsingArgumentWithUnknownPropertyIsNotAllowed() {
     String[] args = new String[] {"--unknownProperty=value"};


[2/2] incubator-beam git commit: [BEAM-136] This closes #61

Posted by lc...@apache.org.
[BEAM-136] This closes #61


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a461e006
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a461e006
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a461e006

Branch: refs/heads/master
Commit: a461e006a40f0fd1aef94baef92d0c687dc8e5fa
Parents: 81d5ff5 e9dd155
Author: Luke Cwik <lc...@google.com>
Authored: Fri Mar 18 19:21:05 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Mar 18 19:21:05 2016 -0700

----------------------------------------------------------------------
 .../sdk/options/PipelineOptionsFactory.java     | 31 ++++++++++++++---
 .../sdk/options/PipelineOptionsFactoryTest.java | 36 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------