You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/14 17:11:24 UTC
[7/8] incubator-beam git commit: Update the Default Pipeline Runner
Update the Default Pipeline Runner
Select the InProcessRunner if it is on the classpath, and throw an
exception otherwise.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a3ffd510
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a3ffd510
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a3ffd510
Branch: refs/heads/master
Commit: a3ffd510896626019723294931a4c3763faf43af
Parents: 816a3bf
Author: Thomas Groh <tg...@google.com>
Authored: Wed May 18 16:56:06 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 14 09:57:17 2016 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 7 ++
runners/google-cloud-dataflow-java/pom.xml | 11 +++
sdks/java/core/pom.xml | 3 +
.../beam/sdk/options/PipelineOptions.java | 31 +++++++-
.../sdk/options/PipelineOptionsFactoryTest.java | 79 +++++++++++++++-----
.../beam/sdk/options/PipelineOptionsTest.java | 8 --
.../options/PipelineOptionsValidatorTest.java | 15 ++++
.../sdk/runners/DirectPipelineRunnerTest.java | 1 +
.../beam/sdk/testing/TestPipelineTest.java | 5 +-
sdks/java/extensions/join-library/pom.xml | 7 ++
sdks/java/io/google-cloud-platform/pom.xml | 7 ++
sdks/java/io/hdfs/pom.xml | 7 ++
sdks/java/io/kafka/pom.xml | 7 ++
sdks/java/java8tests/pom.xml | 7 ++
.../beam/sdk/transforms/WithKeysJava8Test.java | 3 +-
.../main/resources/archetype-resources/pom.xml | 6 ++
16 files changed, 170 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 3d81338..5211b80 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -49,6 +49,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
+ <beamUseDummyRunner></beamUseDummyRunner>
<beamTestPipelineOptions>
</beamTestPipelineOptions>
</systemPropertyVariables>
@@ -213,6 +214,12 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index a6dfae3..6d8e94b 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -84,6 +84,17 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions></beamTestPipelineOptions>
+ <beamUseDummyRunner>true</beamUseDummyRunner>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 372a913..c559cff 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -129,6 +129,9 @@
<excludedGroups>
org.apache.beam.sdk.testing.NeedsRunner
</excludedGroups>
+ <systemPropertyVariables>
+ <beamUseDummyRunner>true</beamUseDummyRunner>
+ </systemPropertyVariables>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index a2f38ed..b1b5280 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -21,11 +21,11 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer;
import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Context;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+
import com.google.auto.service.AutoService;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -225,7 +225,7 @@ public interface PipelineOptions extends HasDisplayData {
@Description("The pipeline runner that will be used to execute the pipeline. "
+ "For registered runners, the class name can be specified, otherwise the fully "
+ "qualified name needs to be specified.")
- @Default.Class(DirectPipelineRunner.class)
+ @Default.InstanceFactory(DirectRunner.class)
Class<? extends PipelineRunner<?>> getRunner();
void setRunner(Class<? extends PipelineRunner<?>> kls);
@@ -262,4 +262,31 @@ public interface PipelineOptions extends HasDisplayData {
@Description("A pipeline level default location for storing temporary files.")
String getTempLocation();
void setTempLocation(String value);
+
+ /**
+ * A {@link DefaultValueFactory} that obtains the class of the {@code DirectRunner} if it exists
+ * on the classpath, and throws an exception otherwise.
+ *
+ * <p>As the {@code DirectRunner} is in an independent module, it cannot be directly referenced
+ * as the {@link Default}. However, it should still be used if available, and a user is required
+ * to explicitly set the {@code --runner} property if they wish to use an alternative runner.
+ */
+ class DirectRunner implements DefaultValueFactory<Class<? extends PipelineRunner>> {
+ @Override
+ public Class<? extends PipelineRunner> create(PipelineOptions options) {
+ try {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Class<? extends PipelineRunner> direct = (Class<? extends PipelineRunner>) Class.forName(
+ "org.apache.beam.runners.direct.InProcessPipelineRunner");
+ return direct;
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(String.format(
+ "No Runner was specified and the DirectRunner was not found on the classpath.%n"
+ + "Specify a runner by either:%n"
+ + " Explicitly specifying a runner by providing the 'runner' property%n"
+ + " Adding the DirectRunner to the classpath%n"
+ + " Calling 'PipelineOptions.setRunner(PipelineRunner)' directly"));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index 62c6909..8b8337e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -29,11 +29,13 @@ import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import com.google.auto.service.AutoService;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -59,8 +61,9 @@ import java.util.Set;
/** Tests for {@link PipelineOptionsFactory}. */
@RunWith(JUnit4.class)
public class PipelineOptionsFactoryTest {
- private static final Class<? extends PipelineRunner<?>> DEFAULT_RUNNER_CLASS =
- DirectPipelineRunner.class;
+ private static final String DEFAULT_RUNNER_NAME = "DirectRunner";
+ private static final Class<? extends PipelineRunner> REGISTERED_RUNNER =
+ RegisteredTestRunner.class;
@Rule public ExpectedException expectedException = ExpectedException.none();
@Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
@@ -68,14 +71,13 @@ public class PipelineOptionsFactoryTest {
@Test
public void testAutomaticRegistrationOfPipelineOptions() {
- assertTrue(PipelineOptionsFactory.getRegisteredOptions().contains(DirectPipelineOptions.class));
+ assertTrue(PipelineOptionsFactory.getRegisteredOptions().contains(RegisteredTestOptions.class));
}
@Test
public void testAutomaticRegistrationOfRunners() {
- assertEquals(
- DEFAULT_RUNNER_CLASS,
- PipelineOptionsFactory.getRegisteredRunners().get(DEFAULT_RUNNER_CLASS.getSimpleName()));
+ assertEquals(REGISTERED_RUNNER,
+ PipelineOptionsFactory.getRegisteredRunners().get(REGISTERED_RUNNER.getSimpleName()));
}
@Test
@@ -85,7 +87,7 @@ public class PipelineOptionsFactoryTest {
}
/** A simple test interface. */
- public static interface TestPipelineOptions extends PipelineOptions {
+ public interface TestPipelineOptions extends PipelineOptions {
String getTestPipelineOption();
void setTestPipelineOption(String value);
}
@@ -810,18 +812,18 @@ public class PipelineOptionsFactoryTest {
@Test
public void testSettingRunner() {
- String[] args = new String[] {"--runner=DirectPipelineRunner"};
+ String[] args = new String[] {"--runner=" + RegisteredTestRunner.class.getSimpleName()};
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
- assertEquals(DirectPipelineRunner.class, options.getRunner());
+ assertEquals(RegisteredTestRunner.class, options.getRunner());
}
@Test
public void testSettingRunnerFullName() {
String[] args =
- new String[] {String.format("--runner=%s", DirectPipelineRunner.class.getName())};
+ new String[] {String.format("--runner=%s", CrashingRunner.class.getName())};
PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
- assertEquals(opts.getRunner(), DirectPipelineRunner.class);
+ assertEquals(opts.getRunner(), CrashingRunner.class);
}
@@ -832,7 +834,7 @@ public class PipelineOptionsFactoryTest {
expectedException.expectMessage(
"Unknown 'runner' specified 'UnknownRunner', supported " + "pipeline runners");
Set<String> registeredRunners = PipelineOptionsFactory.getRegisteredRunners().keySet();
- assertThat(registeredRunners, hasItem(DEFAULT_RUNNER_CLASS.getSimpleName()));
+ assertThat(registeredRunners, hasItem(REGISTERED_RUNNER.getSimpleName()));
for (String registeredRunner : registeredRunners) {
expectedException.expectMessage(registeredRunner);
}
@@ -923,7 +925,7 @@ public class PipelineOptionsFactoryTest {
public void testEmptyArgumentIsIgnored() {
String[] args =
new String[] {
- "", "--string=100", "", "", "--runner=" + DEFAULT_RUNNER_CLASS.getSimpleName()
+ "", "--string=100", "", "", "--runner=" + REGISTERED_RUNNER.getSimpleName()
};
PipelineOptionsFactory.fromArgs(args).as(Objects.class);
}
@@ -932,7 +934,7 @@ public class PipelineOptionsFactoryTest {
public void testNullArgumentIsIgnored() {
String[] args =
new String[] {
- "--string=100", null, null, "--runner=" + DEFAULT_RUNNER_CLASS.getSimpleName()
+ "--string=100", null, null, "--runner=" + REGISTERED_RUNNER.getSimpleName()
};
PipelineOptionsFactory.fromArgs(args).as(Objects.class);
}
@@ -985,7 +987,7 @@ public class PipelineOptionsFactoryTest {
String output = new String(baos.toByteArray());
assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
assertThat(output, containsString("--runner"));
- assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+ assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
assertThat(output,
containsString("The pipeline runner that will be used to execute the pipeline."));
}
@@ -1000,7 +1002,7 @@ public class PipelineOptionsFactoryTest {
String output = new String(baos.toByteArray());
assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
assertThat(output, containsString("--runner"));
- assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+ assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
assertThat(output,
containsString("The pipeline runner that will be used to execute the pipeline."));
}
@@ -1015,7 +1017,7 @@ public class PipelineOptionsFactoryTest {
String output = new String(baos.toByteArray());
assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
assertThat(output, containsString("--runner"));
- assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+ assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
assertThat(output,
containsString("The pipeline runner that will be used to execute the pipeline."));
}
@@ -1109,7 +1111,7 @@ public class PipelineOptionsFactoryTest {
String output = new String(baos.toByteArray());
assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
assertThat(output, containsString("--runner"));
- assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+ assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
assertThat(output,
containsString("The pipeline runner that will be used to execute the pipeline."));
}
@@ -1147,4 +1149,43 @@ public class PipelineOptionsFactoryTest {
thread.join();
assertEquals(cl, classLoader[0]);
}
+
+ private static class RegisteredTestRunner extends PipelineRunner<PipelineResult> {
+ public static PipelineRunner fromOptions(PipelineOptions options) {
+ return new RegisteredTestRunner();
+ }
+
+ public PipelineResult run(Pipeline p) {
+ throw new IllegalArgumentException();
+ }
+ }
+
+
+ /**
+ * A {@link PipelineRunnerRegistrar} to demonstrate default {@link PipelineRunner} registration.
+ */
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class RegisteredTestRunnerRegistrar implements PipelineRunnerRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+ return ImmutableList.<Class<? extends PipelineRunner<?>>>of(RegisteredTestRunner.class);
+ }
+ }
+
+ private interface RegisteredTestOptions extends PipelineOptions {
+ Object getRegisteredExampleFooBar();
+ void setRegisteredExampleFooBar(Object registeredExampleFooBar);
+ }
+
+
+ /**
+ * A {@link PipelineOptionsRegistrar} to demonstrate default {@link PipelineOptions} registration.
+ */
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class RegisteredTestOptionsRegistrar implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(RegisteredTestOptions.class);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
index dfda528..687271c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
@@ -17,15 +17,12 @@
*/
package org.apache.beam.sdk.options;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -86,11 +83,6 @@ public class PipelineOptionsTest {
}
@Test
- public void testDefaultRunnerIsSet() {
- assertEquals(DirectPipelineRunner.class, PipelineOptionsFactory.create().getRunner());
- }
-
- @Test
public void testCloneAs() throws IOException {
DerivedTestOptions options = PipelineOptionsFactory.create().as(DerivedTestOptions.class);
options.setBaseValue(Lists.<Boolean>newArrayList());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
index 0250bd1..2b684a8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.options;
+import org.apache.beam.sdk.testing.CrashingRunner;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -40,6 +42,7 @@ public class PipelineOptionsValidatorTest {
@Test
public void testWhenRequiredOptionIsSet() {
Required required = PipelineOptionsFactory.as(Required.class);
+ required.setRunner(CrashingRunner.class);
required.setObject("blah");
PipelineOptionsValidator.validate(Required.class, required);
}
@@ -114,6 +117,7 @@ public class PipelineOptionsValidatorTest {
GroupRequired groupRequired = PipelineOptionsFactory.as(GroupRequired.class);
groupRequired.setFoo("foo");
groupRequired.setBar(null);
+ groupRequired.setRunner(CrashingRunner.class);
PipelineOptionsValidator.validate(GroupRequired.class, groupRequired);
@@ -126,6 +130,7 @@ public class PipelineOptionsValidatorTest {
@Test
public void testWhenNoneOfRequiredGroupIsSetThrowsException() {
GroupRequired groupRequired = PipelineOptionsFactory.as(GroupRequired.class);
+ groupRequired.setRunner(CrashingRunner.class);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Missing required value for group [ham]");
@@ -155,6 +160,7 @@ public class PipelineOptionsValidatorTest {
public void testWhenOneOfMultipleRequiredGroupsIsSetIsValid() {
MultiGroupRequired multiGroupRequired = PipelineOptionsFactory.as(MultiGroupRequired.class);
+ multiGroupRequired.setRunner(CrashingRunner.class);
multiGroupRequired.setFoo("eggs");
PipelineOptionsValidator.validate(MultiGroupRequired.class, multiGroupRequired);
@@ -194,6 +200,7 @@ public class PipelineOptionsValidatorTest {
public void testWhenOptionIsDefinedInMultipleSuperInterfacesAndIsNotPresentFailsRequirement() {
RightOptions rightOptions = PipelineOptionsFactory.as(RightOptions.class);
rightOptions.setBoth("foo");
+ rightOptions.setRunner(CrashingRunner.class);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Missing required value for group");
@@ -212,6 +219,8 @@ public class PipelineOptionsValidatorTest {
leftOpts.setFoo("Untrue");
leftOpts.setBoth("Raise the");
+ rightOpts.setRunner(CrashingRunner.class);
+ leftOpts.setRunner(CrashingRunner.class);
PipelineOptionsValidator.validate(JoinedOptions.class, rightOpts);
PipelineOptionsValidator.validate(JoinedOptions.class, leftOpts);
}
@@ -226,6 +235,8 @@ public class PipelineOptionsValidatorTest {
leftOpts.setFoo("Untrue");
leftOpts.setBoth("Raise the");
+ rightOpts.setRunner(CrashingRunner.class);
+ leftOpts.setRunner(CrashingRunner.class);
PipelineOptionsValidator.validate(RightOptions.class, leftOpts);
PipelineOptionsValidator.validate(LeftOptions.class, rightOpts);
}
@@ -234,6 +245,7 @@ public class PipelineOptionsValidatorTest {
public void testWhenOptionIsDefinedOnMultipleInterfacesOnlyListedOnceWhenNotPresent() {
JoinedOptions options = PipelineOptionsFactory.as(JoinedOptions.class);
options.setFoo("Hello");
+ options.setRunner(CrashingRunner.class);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("required value for group [both]");
@@ -273,6 +285,7 @@ public class PipelineOptionsValidatorTest {
public void testSuperInterfaceRequiredOptionsAlsoRequiredInSubInterface() {
SubOptions subOpts = PipelineOptionsFactory.as(SubOptions.class);
subOpts.setFoo("Bar");
+ subOpts.setRunner(CrashingRunner.class);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("otherSuper");
@@ -288,6 +301,7 @@ public class PipelineOptionsValidatorTest {
SubOptions opts = PipelineOptionsFactory.as(SubOptions.class);
opts.setFoo("Foo");
opts.setSuperclassObj("Hello world");
+ opts.setRunner(CrashingRunner.class);
// Valid SubOptions, but invalid SuperOptions
PipelineOptionsValidator.validate(SubOptions.class, opts);
@@ -304,6 +318,7 @@ public class PipelineOptionsValidatorTest {
subOpts.setFoo("bar");
subOpts.setBar("bar");
subOpts.setSuperclassObj("SuperDuper");
+ subOpts.setRunner(CrashingRunner.class);
PipelineOptionsValidator.validate(SubOptions.class, subOpts);
PipelineOptionsValidator.validate(SuperOptions.class, subOpts);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
index ae3b4e0..edf6996 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
@@ -68,6 +68,7 @@ public class DirectPipelineRunnerTest implements Serializable {
@Test
public void testToString() {
PipelineOptions options = PipelineOptionsFactory.create();
+ options.setRunner(DirectPipelineRunner.class);
DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options);
assertEquals("DirectPipelineRunner#" + runner.hashCode(),
runner.toString());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index b741e2e..043c06c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.transforms.Create;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -62,13 +61,13 @@ public class TestPipelineTest {
public void testCreationOfPipelineOptions() throws Exception {
ObjectMapper mapper = new ObjectMapper();
String stringOptions = mapper.writeValueAsString(new String[]{
- "--runner=DirectPipelineRunner",
+ "--runner=org.apache.beam.sdk.testing.CrashingRunner",
"--project=testProject"
});
System.getProperties().put("beamTestPipelineOptions", stringOptions);
GcpOptions options =
TestPipeline.testingPipelineOptions().as(GcpOptions.class);
- assertEquals(DirectPipelineRunner.class, options.getRunner());
+ assertEquals(CrashingRunner.class, options.getRunner());
assertEquals(options.getProject(), "testProject");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/extensions/join-library/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml
index 828fcce..0fec148 100644
--- a/sdks/java/extensions/join-library/pom.xml
+++ b/sdks/java/extensions/join-library/pom.xml
@@ -63,6 +63,13 @@
<!-- Dependency for tests -->
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 962c7b0..f567261 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -88,6 +88,13 @@
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index e99dbd7..9c30792 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -66,6 +66,13 @@
<!-- test dependencies -->
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index be19a83..76c0eb6 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -71,6 +71,13 @@
<!-- test dependencies-->
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/java8tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/pom.xml b/sdks/java/java8tests/pom.xml
index 48bf682..8e20228 100644
--- a/sdks/java/java8tests/pom.xml
+++ b/sdks/java/java8tests/pom.xml
@@ -96,6 +96,13 @@
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
index a0d1a63..1ffb147 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.transforms;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -65,7 +64,7 @@ public class WithKeysJava8Test {
values.apply("ApplyKeysWithWithKeys", WithKeys.of((String s) -> Integer.valueOf(s)));
- thrown.expect(PipelineExecutionException.class);
+ thrown.expect(IllegalStateException.class);
thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys");
thrown.expectMessage("Cannot provide a coder for type variable K");
thrown.expectMessage("the actual type is unknown due to erasure.");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
index d86b9cc..2b2e24b 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
@@ -101,6 +101,12 @@
<!-- Adds a dependency on a specific version of the Dataflow runnner. -->
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>[0-incubating, 2-incubating)</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>[0-incubating, 2-incubating)</version>
</dependency>