You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/03/19 03:21:40 UTC
[1/2] incubator-beam git commit: Look up a runner if it is not
registered
Repository: incubator-beam
Updated Branches:
refs/heads/master 81d5ff5a5 -> a461e006a
Look up a runner if it is not registered
If a fully qualified runner is passed as the value of --runner, and it
is not present within the map of registered runners, attempts to look
up the runner using Class#forName, and uses the result class if the
result class is an instance of PipelineRunner. This brings the behavior
in line with the described behavior in PipelineOptions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e9dd155a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e9dd155a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e9dd155a
Branch: refs/heads/master
Commit: e9dd155a8dcc1233337f2c3ca4ae522d00277bc1
Parents: 81d5ff5
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 18 16:20:56 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Mar 18 19:19:43 2016 -0700
----------------------------------------------------------------------
.../sdk/options/PipelineOptionsFactory.java | 31 ++++++++++++++---
.../sdk/options/PipelineOptionsFactoryTest.java | 36 ++++++++++++++++++++
2 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9dd155a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactory.java
index e77b89f..48cff6d 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactory.java
@@ -16,6 +16,8 @@
package com.google.cloud.dataflow.sdk.options;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.google.cloud.dataflow.sdk.options.Validation.Required;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.runners.PipelineRunnerRegistrar;
@@ -1391,7 +1393,10 @@ public class PipelineOptionsFactory {
* split up each string on ','.
*
* <p>We special case the "runner" option. It is mapped to the class of the {@link PipelineRunner}
- * based off of the {@link PipelineRunner}s simple class name or fully qualified class name.
+ * based off of the {@link PipelineRunner PipelineRunners} simple class name. If the provided
+ * runner name is not registered via a {@link PipelineRunnerRegistrar}, we attempt to obtain the
+ * class that the name represents using {@link Class#forName(String)} and use the result class if
+ * it subclasses {@link PipelineRunner}.
*
* <p>If strict parsing is enabled, unknown options or options that cannot be converted to
* the expected java type using an {@link ObjectMapper} will be ignored.
@@ -1442,10 +1447,26 @@ public class PipelineOptionsFactory {
JavaType type = MAPPER.getTypeFactory().constructType(method.getGenericReturnType());
if ("runner".equals(entry.getKey())) {
String runner = Iterables.getOnlyElement(entry.getValue());
- Preconditions.checkArgument(SUPPORTED_PIPELINE_RUNNERS.containsKey(runner),
- "Unknown 'runner' specified '%s', supported pipeline runners %s",
- runner, Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
- convertedOptions.put("runner", SUPPORTED_PIPELINE_RUNNERS.get(runner));
+ if (SUPPORTED_PIPELINE_RUNNERS.containsKey(runner)) {
+ convertedOptions.put("runner", SUPPORTED_PIPELINE_RUNNERS.get(runner));
+ } else {
+ try {
+ Class<?> runnerClass = Class.forName(runner);
+ checkArgument(
+ PipelineRunner.class.isAssignableFrom(runnerClass),
+ "Class '%s' does not implement PipelineRunner. Supported pipeline runners %s",
+ runner,
+ Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
+ convertedOptions.put("runner", runnerClass);
+ } catch (ClassNotFoundException e) {
+ String msg =
+ String.format(
+ "Unknown 'runner' specified '%s', supported pipeline runners %s",
+ runner,
+ Sets.newTreeSet(SUPPORTED_PIPELINE_RUNNERS.keySet()));
+ throw new IllegalArgumentException(msg, e);
+ }
+ }
} else if ((returnType.isArray() && (SIMPLE_TYPES.contains(returnType.getComponentType())
|| returnType.getComponentType().isEnum()))
|| Collection.class.isAssignableFrom(returnType)) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e9dd155a/sdk/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java
index e687f27..045a8ad 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/options/PipelineOptionsFactoryTest.java
@@ -25,8 +25,12 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
import com.google.cloud.dataflow.sdk.testing.RestoreSystemProperties;
import com.google.common.collect.ArrayListMultimap;
@@ -825,6 +829,14 @@ public class PipelineOptionsFactoryTest {
}
@Test
+ public void testSettingRunnerFullName() {
+ String[] args =
+ new String[] {String.format("--runner=%s", DataflowPipelineRunner.class.getName())};
+ PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+ assertEquals(opts.getRunner(), DataflowPipelineRunner.class);
+ }
+
+ @Test
public void testSettingUnknownRunner() {
String[] args = new String[] {"--runner=UnknownRunner"};
expectedException.expect(IllegalArgumentException.class);
@@ -834,6 +846,30 @@ public class PipelineOptionsFactoryTest {
PipelineOptionsFactory.fromArgs(args).create();
}
+ private static class ExampleTestRunner extends PipelineRunner<PipelineResult> {
+ @Override
+ public PipelineResult run(Pipeline pipeline) {
+ return null;
+ }
+ }
+
+ @Test
+ public void testSettingRunnerCanonicalClassNameNotInSupportedExists() {
+ String[] args = new String[] {String.format("--runner=%s", ExampleTestRunner.class.getName())};
+ PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+ assertEquals(opts.getRunner(), ExampleTestRunner.class);
+ }
+
+ @Test
+ public void testSettingRunnerCanonicalClassNameNotInSupportedNotPipelineRunner() {
+ String[] args = new String[] {"--runner=java.lang.String"};
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("does not implement PipelineRunner");
+ expectedException.expectMessage("java.lang.String");
+
+ PipelineOptionsFactory.fromArgs(args).create();
+ }
+
@Test
public void testUsingArgumentWithUnknownPropertyIsNotAllowed() {
String[] args = new String[] {"--unknownProperty=value"};
[2/2] incubator-beam git commit: [BEAM-136] This closes #61
Posted by lc...@apache.org.
[BEAM-136] This closes #61
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a461e006
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a461e006
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a461e006
Branch: refs/heads/master
Commit: a461e006a40f0fd1aef94baef92d0c687dc8e5fa
Parents: 81d5ff5 e9dd155
Author: Luke Cwik <lc...@google.com>
Authored: Fri Mar 18 19:21:05 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Mar 18 19:21:05 2016 -0700
----------------------------------------------------------------------
.../sdk/options/PipelineOptionsFactory.java | 31 ++++++++++++++---
.../sdk/options/PipelineOptionsFactoryTest.java | 36 ++++++++++++++++++++
2 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------