You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/14 17:11:18 UTC
[1/8] incubator-beam git commit: Move GcsUtil TextIO Tests to
TextIOTest
Repository: incubator-beam
Updated Branches:
refs/heads/master c8ad2e7dd -> 774944014
Move GcsUtil TextIO Tests to TextIOTest
These tests are not a test of the DataflowRunner, nor any
DataflowRunner specific behavior, so they should be part of TextIOTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f2fb59c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f2fb59c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f2fb59c6
Branch: refs/heads/master
Commit: f2fb59c65119d5da56df5dd4e64fa1873c6ccbbb
Parents: f73bd73
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 10 14:43:10 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 10 14:49:33 2016 -0700
----------------------------------------------------------------------
.../runners/dataflow/io/DataflowTextIOTest.java | 80 +-------------------
.../java/org/apache/beam/sdk/io/TextIOTest.java | 75 ++++++++++++++++++
2 files changed, 77 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f2fb59c6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
index 0d7c1cb..ae711f0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
@@ -27,30 +27,16 @@ import static org.junit.Assert.assertThat;
import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import com.google.common.collect.ImmutableList;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
-import java.util.List;
import java.util.Set;
/**
@@ -60,73 +46,11 @@ import java.util.Set;
public class DataflowTextIOTest {
private TestDataflowPipelineOptions buildTestPipelineOptions() {
TestDataflowPipelineOptions options =
- PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+ TestPipeline.testingPipelineOptions().as(TestDataflowPipelineOptions.class);
options.setGcpCredential(new TestCredential());
return options;
}
- private GcsUtil buildMockGcsUtil() throws IOException {
- GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
-
- // Any request to open gets a new bogus channel
- Mockito
- .when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
- .then(new Answer<SeekableByteChannel>() {
- @Override
- public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
- return FileChannel.open(
- Files.createTempFile("channel-", ".tmp"),
- StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
- }
- });
-
- // Any request for expansion returns a list containing the original GcsPath
- // This is required to pass validation that occurs in TextIO during apply()
- Mockito
- .when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
- .then(new Answer<List<GcsPath>>() {
- @Override
- public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
- return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
- }
- });
-
- return mockGcsUtil;
- }
-
- /**
- * This tests a few corner cases that should not crash.
- */
- @Test
- public void testGoodWildcards() throws Exception {
- TestDataflowPipelineOptions options = buildTestPipelineOptions();
- options.setGcsUtil(buildMockGcsUtil());
-
- Pipeline pipeline = Pipeline.create(options);
-
- applyRead(pipeline, "gs://bucket/foo");
- applyRead(pipeline, "gs://bucket/foo/");
- applyRead(pipeline, "gs://bucket/foo/*");
- applyRead(pipeline, "gs://bucket/foo/?");
- applyRead(pipeline, "gs://bucket/foo/[0-9]");
- applyRead(pipeline, "gs://bucket/foo/*baz*");
- applyRead(pipeline, "gs://bucket/foo/*baz?");
- applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
- applyRead(pipeline, "gs://bucket/foo/baz/*");
- applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
- applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
- applyRead(pipeline, "gs://bucket/foo*/baz");
- applyRead(pipeline, "gs://bucket/foo?/baz");
- applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
-
- // Check that running doesn't fail.
- pipeline.run();
- }
-
- private void applyRead(Pipeline pipeline, String path) {
- pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
- }
-
@Test
public void testPrimitiveWriteDisplayData() {
DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f2fb59c6/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 724a113..9a762d1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.TextIO.CompressionType;
import org.apache.beam.sdk.io.TextIO.TextSource;
+import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
@@ -46,7 +47,9 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -59,6 +62,9 @@ import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.io.BufferedReader;
import java.io.File;
@@ -66,8 +72,11 @@ import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintStream;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -685,4 +694,70 @@ public class TextIOTest {
return source;
}
+
+ ///////////////////////////////////////////////////////////////////////////////////////////////
+ // Test "gs://" paths
+
+ private GcsUtil buildMockGcsUtil() throws IOException {
+ GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
+
+ // Any request to open gets a new bogus channel
+ Mockito
+ .when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
+ .then(new Answer<SeekableByteChannel>() {
+ @Override
+ public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
+ return FileChannel.open(
+ Files.createTempFile("channel-", ".tmp"),
+ StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
+ }
+ });
+
+ // Any request for expansion returns a list containing the original GcsPath
+ // This is required to pass validation that occurs in TextIO during apply()
+ Mockito
+ .when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
+ .then(new Answer<List<GcsPath>>() {
+ @Override
+ public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
+ return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
+ }
+ });
+
+ return mockGcsUtil;
+ }
+
+ /**
+ * This tests a few corner cases that should not crash.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testGoodWildcards() throws Exception {
+ GcsOptions options = TestPipeline.testingPipelineOptions().as(GcsOptions.class);
+ options.setGcsUtil(buildMockGcsUtil());
+
+ Pipeline pipeline = Pipeline.create(options);
+
+ applyRead(pipeline, "gs://bucket/foo");
+ applyRead(pipeline, "gs://bucket/foo/");
+ applyRead(pipeline, "gs://bucket/foo/*");
+ applyRead(pipeline, "gs://bucket/foo/?");
+ applyRead(pipeline, "gs://bucket/foo/[0-9]");
+ applyRead(pipeline, "gs://bucket/foo/*baz*");
+ applyRead(pipeline, "gs://bucket/foo/*baz?");
+ applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
+ applyRead(pipeline, "gs://bucket/foo/baz/*");
+ applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
+ applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
+ applyRead(pipeline, "gs://bucket/foo*/baz");
+ applyRead(pipeline, "gs://bucket/foo?/baz");
+ applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
+
+ // Check that running doesn't fail.
+ pipeline.run();
+ }
+
+ private void applyRead(Pipeline pipeline, String path) {
+ pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
+ }
}
[7/8] incubator-beam git commit: Update the Default Pipeline Runner
Posted by ke...@apache.org.
Update the Default Pipeline Runner
Select the InProcessRunner if it is on the classpath, and throw an
exception otherwise.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a3ffd510
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a3ffd510
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a3ffd510
Branch: refs/heads/master
Commit: a3ffd510896626019723294931a4c3763faf43af
Parents: 816a3bf
Author: Thomas Groh <tg...@google.com>
Authored: Wed May 18 16:56:06 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 14 09:57:17 2016 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 7 ++
runners/google-cloud-dataflow-java/pom.xml | 11 +++
sdks/java/core/pom.xml | 3 +
.../beam/sdk/options/PipelineOptions.java | 31 +++++++-
.../sdk/options/PipelineOptionsFactoryTest.java | 79 +++++++++++++++-----
.../beam/sdk/options/PipelineOptionsTest.java | 8 --
.../options/PipelineOptionsValidatorTest.java | 15 ++++
.../sdk/runners/DirectPipelineRunnerTest.java | 1 +
.../beam/sdk/testing/TestPipelineTest.java | 5 +-
sdks/java/extensions/join-library/pom.xml | 7 ++
sdks/java/io/google-cloud-platform/pom.xml | 7 ++
sdks/java/io/hdfs/pom.xml | 7 ++
sdks/java/io/kafka/pom.xml | 7 ++
sdks/java/java8tests/pom.xml | 7 ++
.../beam/sdk/transforms/WithKeysJava8Test.java | 3 +-
.../main/resources/archetype-resources/pom.xml | 6 ++
16 files changed, 170 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 3d81338..5211b80 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -49,6 +49,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
+ <beamUseDummyRunner></beamUseDummyRunner>
<beamTestPipelineOptions>
</beamTestPipelineOptions>
</systemPropertyVariables>
@@ -213,6 +214,12 @@
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index a6dfae3..6d8e94b 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -84,6 +84,17 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions></beamTestPipelineOptions>
+ <beamUseDummyRunner>true</beamUseDummyRunner>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 372a913..c559cff 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -129,6 +129,9 @@
<excludedGroups>
org.apache.beam.sdk.testing.NeedsRunner
</excludedGroups>
+ <systemPropertyVariables>
+ <beamUseDummyRunner>true</beamUseDummyRunner>
+ </systemPropertyVariables>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index a2f38ed..b1b5280 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -21,11 +21,11 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer;
import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Context;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+
import com.google.auto.service.AutoService;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -225,7 +225,7 @@ public interface PipelineOptions extends HasDisplayData {
@Description("The pipeline runner that will be used to execute the pipeline. "
+ "For registered runners, the class name can be specified, otherwise the fully "
+ "qualified name needs to be specified.")
- @Default.Class(DirectPipelineRunner.class)
+ @Default.InstanceFactory(DirectRunner.class)
Class<? extends PipelineRunner<?>> getRunner();
void setRunner(Class<? extends PipelineRunner<?>> kls);
@@ -262,4 +262,31 @@ public interface PipelineOptions extends HasDisplayData {
@Description("A pipeline level default location for storing temporary files.")
String getTempLocation();
void setTempLocation(String value);
+
+ /**
+ * A {@link DefaultValueFactory} that obtains the class of the {@code DirectRunner} if it exists
+ * on the classpath, and throws an exception otherwise.
+ *
+ * <p>As the {@code DirectRunner} is in an independent module, it cannot be directly referenced
+ * as the {@link Default}. However, it should still be used if available, and a user is required
+ * to explicitly set the {@code --runner} property if they wish to use an alternative runner.
+ */
+ class DirectRunner implements DefaultValueFactory<Class<? extends PipelineRunner>> {
+ @Override
+ public Class<? extends PipelineRunner> create(PipelineOptions options) {
+ try {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Class<? extends PipelineRunner> direct = (Class<? extends PipelineRunner>) Class.forName(
+ "org.apache.beam.runners.direct.InProcessPipelineRunner");
+ return direct;
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(String.format(
+ "No Runner was specified and the DirectRunner was not found on the classpath.%n"
+ + "Specify a runner by either:%n"
+ + " Explicitly specifying a runner by providing the 'runner' property%n"
+ + " Adding the DirectRunner to the classpath%n"
+ + " Calling 'PipelineOptions.setRunner(PipelineRunner)' directly"));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index 62c6909..8b8337e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -29,11 +29,13 @@ import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import com.google.auto.service.AutoService;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -59,8 +61,9 @@ import java.util.Set;
/** Tests for {@link PipelineOptionsFactory}. */
@RunWith(JUnit4.class)
public class PipelineOptionsFactoryTest {
- private static final Class<? extends PipelineRunner<?>> DEFAULT_RUNNER_CLASS =
- DirectPipelineRunner.class;
+ private static final String DEFAULT_RUNNER_NAME = "DirectRunner";
+ private static final Class<? extends PipelineRunner> REGISTERED_RUNNER =
+ RegisteredTestRunner.class;
@Rule public ExpectedException expectedException = ExpectedException.none();
@Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
@@ -68,14 +71,13 @@ public class PipelineOptionsFactoryTest {
@Test
public void testAutomaticRegistrationOfPipelineOptions() {
- assertTrue(PipelineOptionsFactory.getRegisteredOptions().contains(DirectPipelineOptions.class));
+ assertTrue(PipelineOptionsFactory.getRegisteredOptions().contains(RegisteredTestOptions.class));
}
@Test
public void testAutomaticRegistrationOfRunners() {
- assertEquals(
- DEFAULT_RUNNER_CLASS,
- PipelineOptionsFactory.getRegisteredRunners().get(DEFAULT_RUNNER_CLASS.getSimpleName()));
+ assertEquals(REGISTERED_RUNNER,
+ PipelineOptionsFactory.getRegisteredRunners().get(REGISTERED_RUNNER.getSimpleName()));
}
@Test
@@ -85,7 +87,7 @@ public class PipelineOptionsFactoryTest {
}
/** A simple test interface. */
- public static interface TestPipelineOptions extends PipelineOptions {
+ public interface TestPipelineOptions extends PipelineOptions {
String getTestPipelineOption();
void setTestPipelineOption(String value);
}
@@ -810,18 +812,18 @@ public class PipelineOptionsFactoryTest {
@Test
public void testSettingRunner() {
- String[] args = new String[] {"--runner=DirectPipelineRunner"};
+ String[] args = new String[] {"--runner=" + RegisteredTestRunner.class.getSimpleName()};
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
- assertEquals(DirectPipelineRunner.class, options.getRunner());
+ assertEquals(RegisteredTestRunner.class, options.getRunner());
}
@Test
public void testSettingRunnerFullName() {
String[] args =
- new String[] {String.format("--runner=%s", DirectPipelineRunner.class.getName())};
+ new String[] {String.format("--runner=%s", CrashingRunner.class.getName())};
PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
- assertEquals(opts.getRunner(), DirectPipelineRunner.class);
+ assertEquals(opts.getRunner(), CrashingRunner.class);
}
@@ -832,7 +834,7 @@ public class PipelineOptionsFactoryTest {
expectedException.expectMessage(
"Unknown 'runner' specified 'UnknownRunner', supported " + "pipeline runners");
Set<String> registeredRunners = PipelineOptionsFactory.getRegisteredRunners().keySet();
- assertThat(registeredRunners, hasItem(DEFAULT_RUNNER_CLASS.getSimpleName()));
+ assertThat(registeredRunners, hasItem(REGISTERED_RUNNER.getSimpleName()));
for (String registeredRunner : registeredRunners) {
expectedException.expectMessage(registeredRunner);
}
@@ -923,7 +925,7 @@ public class PipelineOptionsFactoryTest {
public void testEmptyArgumentIsIgnored() {
String[] args =
new String[] {
- "", "--string=100", "", "", "--runner=" + DEFAULT_RUNNER_CLASS.getSimpleName()
+ "", "--string=100", "", "", "--runner=" + REGISTERED_RUNNER.getSimpleName()
};
PipelineOptionsFactory.fromArgs(args).as(Objects.class);
}
@@ -932,7 +934,7 @@ public class PipelineOptionsFactoryTest {
public void testNullArgumentIsIgnored() {
String[] args =
new String[] {
- "--string=100", null, null, "--runner=" + DEFAULT_RUNNER_CLASS.getSimpleName()
+ "--string=100", null, null, "--runner=" + REGISTERED_RUNNER.getSimpleName()
};
PipelineOptionsFactory.fromArgs(args).as(Objects.class);
}
@@ -985,7 +987,7 @@ public class PipelineOptionsFactoryTest {
String output = new String(baos.toByteArray());
assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
assertThat(output, containsString("--runner"));
- assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+ assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
assertThat(output,
containsString("The pipeline runner that will be used to execute the pipeline."));
}
@@ -1000,7 +1002,7 @@ public class PipelineOptionsFactoryTest {
String output = new String(baos.toByteArray());
assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
assertThat(output, containsString("--runner"));
- assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+ assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
assertThat(output,
containsString("The pipeline runner that will be used to execute the pipeline."));
}
@@ -1015,7 +1017,7 @@ public class PipelineOptionsFactoryTest {
String output = new String(baos.toByteArray());
assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
assertThat(output, containsString("--runner"));
- assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+ assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
assertThat(output,
containsString("The pipeline runner that will be used to execute the pipeline."));
}
@@ -1109,7 +1111,7 @@ public class PipelineOptionsFactoryTest {
String output = new String(baos.toByteArray());
assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
assertThat(output, containsString("--runner"));
- assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+ assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
assertThat(output,
containsString("The pipeline runner that will be used to execute the pipeline."));
}
@@ -1147,4 +1149,43 @@ public class PipelineOptionsFactoryTest {
thread.join();
assertEquals(cl, classLoader[0]);
}
+
+ private static class RegisteredTestRunner extends PipelineRunner<PipelineResult> {
+ public static PipelineRunner fromOptions(PipelineOptions options) {
+ return new RegisteredTestRunner();
+ }
+
+ public PipelineResult run(Pipeline p) {
+ throw new IllegalArgumentException();
+ }
+ }
+
+
+ /**
+ * A {@link PipelineRunnerRegistrar} to demonstrate default {@link PipelineRunner} registration.
+ */
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class RegisteredTestRunnerRegistrar implements PipelineRunnerRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+ return ImmutableList.<Class<? extends PipelineRunner<?>>>of(RegisteredTestRunner.class);
+ }
+ }
+
+ private interface RegisteredTestOptions extends PipelineOptions {
+ Object getRegisteredExampleFooBar();
+ void setRegisteredExampleFooBar(Object registeredExampleFooBar);
+ }
+
+
+ /**
+ * A {@link PipelineOptionsRegistrar} to demonstrate default {@link PipelineOptions} registration.
+ */
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class RegisteredTestOptionsRegistrar implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(RegisteredTestOptions.class);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
index dfda528..687271c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
@@ -17,15 +17,12 @@
*/
package org.apache.beam.sdk.options;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -86,11 +83,6 @@ public class PipelineOptionsTest {
}
@Test
- public void testDefaultRunnerIsSet() {
- assertEquals(DirectPipelineRunner.class, PipelineOptionsFactory.create().getRunner());
- }
-
- @Test
public void testCloneAs() throws IOException {
DerivedTestOptions options = PipelineOptionsFactory.create().as(DerivedTestOptions.class);
options.setBaseValue(Lists.<Boolean>newArrayList());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
index 0250bd1..2b684a8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.options;
+import org.apache.beam.sdk.testing.CrashingRunner;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -40,6 +42,7 @@ public class PipelineOptionsValidatorTest {
@Test
public void testWhenRequiredOptionIsSet() {
Required required = PipelineOptionsFactory.as(Required.class);
+ required.setRunner(CrashingRunner.class);
required.setObject("blah");
PipelineOptionsValidator.validate(Required.class, required);
}
@@ -114,6 +117,7 @@ public class PipelineOptionsValidatorTest {
GroupRequired groupRequired = PipelineOptionsFactory.as(GroupRequired.class);
groupRequired.setFoo("foo");
groupRequired.setBar(null);
+ groupRequired.setRunner(CrashingRunner.class);
PipelineOptionsValidator.validate(GroupRequired.class, groupRequired);
@@ -126,6 +130,7 @@ public class PipelineOptionsValidatorTest {
@Test
public void testWhenNoneOfRequiredGroupIsSetThrowsException() {
GroupRequired groupRequired = PipelineOptionsFactory.as(GroupRequired.class);
+ groupRequired.setRunner(CrashingRunner.class);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Missing required value for group [ham]");
@@ -155,6 +160,7 @@ public class PipelineOptionsValidatorTest {
public void testWhenOneOfMultipleRequiredGroupsIsSetIsValid() {
MultiGroupRequired multiGroupRequired = PipelineOptionsFactory.as(MultiGroupRequired.class);
+ multiGroupRequired.setRunner(CrashingRunner.class);
multiGroupRequired.setFoo("eggs");
PipelineOptionsValidator.validate(MultiGroupRequired.class, multiGroupRequired);
@@ -194,6 +200,7 @@ public class PipelineOptionsValidatorTest {
public void testWhenOptionIsDefinedInMultipleSuperInterfacesAndIsNotPresentFailsRequirement() {
RightOptions rightOptions = PipelineOptionsFactory.as(RightOptions.class);
rightOptions.setBoth("foo");
+ rightOptions.setRunner(CrashingRunner.class);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Missing required value for group");
@@ -212,6 +219,8 @@ public class PipelineOptionsValidatorTest {
leftOpts.setFoo("Untrue");
leftOpts.setBoth("Raise the");
+ rightOpts.setRunner(CrashingRunner.class);
+ leftOpts.setRunner(CrashingRunner.class);
PipelineOptionsValidator.validate(JoinedOptions.class, rightOpts);
PipelineOptionsValidator.validate(JoinedOptions.class, leftOpts);
}
@@ -226,6 +235,8 @@ public class PipelineOptionsValidatorTest {
leftOpts.setFoo("Untrue");
leftOpts.setBoth("Raise the");
+ rightOpts.setRunner(CrashingRunner.class);
+ leftOpts.setRunner(CrashingRunner.class);
PipelineOptionsValidator.validate(RightOptions.class, leftOpts);
PipelineOptionsValidator.validate(LeftOptions.class, rightOpts);
}
@@ -234,6 +245,7 @@ public class PipelineOptionsValidatorTest {
public void testWhenOptionIsDefinedOnMultipleInterfacesOnlyListedOnceWhenNotPresent() {
JoinedOptions options = PipelineOptionsFactory.as(JoinedOptions.class);
options.setFoo("Hello");
+ options.setRunner(CrashingRunner.class);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("required value for group [both]");
@@ -273,6 +285,7 @@ public class PipelineOptionsValidatorTest {
public void testSuperInterfaceRequiredOptionsAlsoRequiredInSubInterface() {
SubOptions subOpts = PipelineOptionsFactory.as(SubOptions.class);
subOpts.setFoo("Bar");
+ subOpts.setRunner(CrashingRunner.class);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("otherSuper");
@@ -288,6 +301,7 @@ public class PipelineOptionsValidatorTest {
SubOptions opts = PipelineOptionsFactory.as(SubOptions.class);
opts.setFoo("Foo");
opts.setSuperclassObj("Hello world");
+ opts.setRunner(CrashingRunner.class);
// Valid SubOptions, but invalid SuperOptions
PipelineOptionsValidator.validate(SubOptions.class, opts);
@@ -304,6 +318,7 @@ public class PipelineOptionsValidatorTest {
subOpts.setFoo("bar");
subOpts.setBar("bar");
subOpts.setSuperclassObj("SuperDuper");
+ subOpts.setRunner(CrashingRunner.class);
PipelineOptionsValidator.validate(SubOptions.class, subOpts);
PipelineOptionsValidator.validate(SuperOptions.class, subOpts);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
index ae3b4e0..edf6996 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
@@ -68,6 +68,7 @@ public class DirectPipelineRunnerTest implements Serializable {
@Test
public void testToString() {
PipelineOptions options = PipelineOptionsFactory.create();
+ options.setRunner(DirectPipelineRunner.class);
DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options);
assertEquals("DirectPipelineRunner#" + runner.hashCode(),
runner.toString());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index b741e2e..043c06c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.transforms.Create;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -62,13 +61,13 @@ public class TestPipelineTest {
public void testCreationOfPipelineOptions() throws Exception {
ObjectMapper mapper = new ObjectMapper();
String stringOptions = mapper.writeValueAsString(new String[]{
- "--runner=DirectPipelineRunner",
+ "--runner=org.apache.beam.sdk.testing.CrashingRunner",
"--project=testProject"
});
System.getProperties().put("beamTestPipelineOptions", stringOptions);
GcpOptions options =
TestPipeline.testingPipelineOptions().as(GcpOptions.class);
- assertEquals(DirectPipelineRunner.class, options.getRunner());
+ assertEquals(CrashingRunner.class, options.getRunner());
assertEquals(options.getProject(), "testProject");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/extensions/join-library/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml
index 828fcce..0fec148 100644
--- a/sdks/java/extensions/join-library/pom.xml
+++ b/sdks/java/extensions/join-library/pom.xml
@@ -63,6 +63,13 @@
<!-- Dependency for tests -->
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 962c7b0..f567261 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -88,6 +88,13 @@
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index e99dbd7..9c30792 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -66,6 +66,13 @@
<!-- test dependencies -->
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index be19a83..76c0eb6 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -71,6 +71,13 @@
<!-- test dependencies-->
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/java8tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/pom.xml b/sdks/java/java8tests/pom.xml
index 48bf682..8e20228 100644
--- a/sdks/java/java8tests/pom.xml
+++ b/sdks/java/java8tests/pom.xml
@@ -96,6 +96,13 @@
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
index a0d1a63..1ffb147 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.transforms;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -65,7 +64,7 @@ public class WithKeysJava8Test {
values.apply("ApplyKeysWithWithKeys", WithKeys.of((String s) -> Integer.valueOf(s)));
- thrown.expect(PipelineExecutionException.class);
+ thrown.expect(IllegalStateException.class);
thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys");
thrown.expectMessage("Cannot provide a coder for type variable K");
thrown.expectMessage("the actual type is unknown due to erasure.");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
index d86b9cc..2b2e24b 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
@@ -101,6 +101,12 @@
<!-- Adds a dependency on a specific version of the Dataflow runnner. -->
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>[0-incubating, 2-incubating)</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>[0-incubating, 2-incubating)</version>
</dependency>
[3/8] incubator-beam git commit: Update Direct Module tests to
explicitly set Pipeline
Posted by ke...@apache.org.
Update Direct Module tests to explicitly set Pipeline
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a8a33b19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a8a33b19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a8a33b19
Branch: refs/heads/master
Commit: a8a33b19933326c28522dee530974c96d4aef0cb
Parents: de49d03
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 10 14:38:36 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 10 14:49:33 2016 -0700
----------------------------------------------------------------------
runners/direct-java/pom.xml | 6 ++++++
.../runners/direct/AvroIOShardedWriteFactoryTest.java | 12 ++++++++++--
.../runners/direct/InProcessPipelineRunnerTest.java | 11 ++++++++++-
.../runners/direct/KeyedPValueTrackingVisitorTest.java | 7 ++-----
.../runners/direct/TextIOShardedWriteFactoryTest.java | 12 ++++++++++--
5 files changed, 38 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8a33b19/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index def7207..b2cb607 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -78,6 +78,12 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <!-- Use a dummy runner for component tests -->
+ <beamUseDummyRunner>true</beamUseDummyRunner>
+ </systemPropertyVariables>
+ </configuration>
<executions>
<execution>
<id>runnable-on-service-tests</id>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8a33b19/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
index d290a4b..c0c1361 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
@@ -21,8 +21,10 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.theInstance;
import static org.junit.Assert.assertThat;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.AvroIOTest;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
@@ -82,7 +84,7 @@ public class AvroIOShardedWriteFactoryTest {
assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
- TestPipeline p = TestPipeline.create();
+ Pipeline p = getPipeline();
String[] elems = new String[] {"foo", "bar", "baz"};
p.apply(Create.<String>of(elems)).apply(overridden);
@@ -101,7 +103,7 @@ public class AvroIOShardedWriteFactoryTest {
assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
- TestPipeline p = TestPipeline.create();
+ Pipeline p = getPipeline();
String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"};
p.apply(Create.<String>of(elems)).apply(overridden);
@@ -109,4 +111,10 @@ public class AvroIOShardedWriteFactoryTest {
p.run();
AvroIOTest.assertTestOutputs(elems, 3, file.getAbsolutePath(), original.getShardNameTemplate());
}
+
+ private Pipeline getPipeline() {
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ options.setRunner(InProcessPipelineRunner.class);
+ return TestPipeline.fromOptions(options);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8a33b19/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
index 5c26ac3..ab26c15 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.direct;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import org.apache.beam.runners.direct.InProcessPipelineRunner.InProcessPipelineResult;
@@ -27,6 +28,7 @@ import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
@@ -46,9 +48,9 @@ import org.apache.beam.sdk.values.TypeDescriptor;
import com.google.common.collect.ImmutableMap;
-
import com.fasterxml.jackson.annotation.JsonValue;
+import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
@@ -77,6 +79,13 @@ public class InProcessPipelineRunnerTest implements Serializable {
}
@Test
+ public void defaultRunnerLoaded() {
+ assertThat(InProcessPipelineRunner.class,
+ Matchers.<Class<? extends PipelineRunner>>equalTo(PipelineOptionsFactory.create()
+ .getRunner()));
+ }
+
+ @Test
public void wordCountShouldSucceed() throws Throwable {
Pipeline p = getPipeline();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8a33b19/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
index 24f9715..aa0d976 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -26,8 +26,7 @@ import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -61,9 +60,7 @@ public class KeyedPValueTrackingVisitorTest {
@Before
public void setup() {
- PipelineOptions options = PipelineOptionsFactory.create();
-
- p = Pipeline.create(options);
+ p = TestPipeline.create();
@SuppressWarnings("rawtypes")
Set<Class<? extends PTransform>> producesKeyed =
ImmutableSet.<Class<? extends PTransform>>of(PrimitiveKeyer.class, CompositeKeyer.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8a33b19/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
index 49c9061..fe9866c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
@@ -21,9 +21,11 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.theInstance;
import static org.junit.Assert.assertThat;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.TextIOTest;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
@@ -81,7 +83,7 @@ public class TextIOShardedWriteFactoryTest {
assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
- TestPipeline p = TestPipeline.create();
+ Pipeline p = getPipeline();
String[] elems = new String[] {"foo", "bar", "baz"};
p.apply(Create.<String>of(elems)).apply(overridden);
@@ -100,7 +102,7 @@ public class TextIOShardedWriteFactoryTest {
assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
- TestPipeline p = TestPipeline.create();
+ Pipeline p = getPipeline();
String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"};
p.apply(Create.<String>of(elems)).apply(overridden);
@@ -109,4 +111,10 @@ public class TextIOShardedWriteFactoryTest {
TextIOTest.assertOutputFiles(
elems, StringUtf8Coder.of(), 3, tmp, "foo", original.getShardNameTemplate());
}
+
+ private Pipeline getPipeline() {
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ options.setRunner(InProcessPipelineRunner.class);
+ return TestPipeline.fromOptions(options);
+ }
}
[4/8] incubator-beam git commit: Update Pipeline Execution Style in
WindowedWordCountTest
Posted by ke...@apache.org.
Update Pipeline Execution Style in WindowedWordCountTest
This sets the runner a Pipeline creation time rather than sending a
(potentially rewritten) pipeline to a new runner instance.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de49d032
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de49d032
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de49d032
Branch: refs/heads/master
Commit: de49d032730dd21691e6e4358fdcfef249aef46f
Parents: 8291219
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 10 14:36:42 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 10 14:49:33 2016 -0700
----------------------------------------------------------------------
.../spark/translation/WindowedWordCountTest.java | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de49d032/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index c6911e1..54af5e3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.SimpleWordCountTest;
import org.apache.beam.runners.spark.SparkPipelineRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
@@ -55,7 +56,9 @@ public class WindowedWordCountTest {
@Test
public void testFixed() throws Exception {
- Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+ PipelineOptions opts = PipelineOptionsFactory.create();
+ opts.setRunner(SparkPipelineRunner.class);
+ Pipeline p = Pipeline.create(opts);
PCollection<String> inputWords =
p.apply(Create.timestamped(WORDS, TIMESTAMPS)).setCoder(StringUtf8Coder.of());
PCollection<String> windowedWords =
@@ -65,7 +68,7 @@ public class WindowedWordCountTest {
PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SEPARATE_COUNT_SET);
- EvaluationResult res = SparkPipelineRunner.create().run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
res.close();
}
@@ -74,7 +77,9 @@ public class WindowedWordCountTest {
@Test
public void testFixed2() throws Exception {
- Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+ PipelineOptions opts = PipelineOptionsFactory.create();
+ opts.setRunner(SparkPipelineRunner.class);
+ Pipeline p = Pipeline.create(opts);
PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
.withCoder(StringUtf8Coder.of()));
PCollection<String> windowedWords = inputWords
@@ -84,7 +89,7 @@ public class WindowedWordCountTest {
PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SAME_COUNT_SET);
- EvaluationResult res = SparkPipelineRunner.create().run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
res.close();
}
@@ -94,7 +99,9 @@ public class WindowedWordCountTest {
@Test
public void testSliding() throws Exception {
- Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+ PipelineOptions opts = PipelineOptionsFactory.create();
+ opts.setRunner(SparkPipelineRunner.class);
+ Pipeline p = Pipeline.create(opts);
PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
.withCoder(StringUtf8Coder.of()));
PCollection<String> windowedWords = inputWords
@@ -105,7 +112,7 @@ public class WindowedWordCountTest {
PAssert.that(output).containsInAnyOrder(EXPECTED_SLIDING_COUNT_SET);
- EvaluationResult res = SparkPipelineRunner.create().run(p);
+ EvaluationResult res = (EvaluationResult) p.run();
res.close();
}
[6/8] incubator-beam git commit: Set Runner in DataflowRunner Tests
Posted by ke...@apache.org.
Set Runner in DataflowRunner Tests
Otherwise the Default Runner is used, which may be unavailable.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d1e68af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d1e68af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d1e68af
Branch: refs/heads/master
Commit: 4d1e68af96d7ef44403add666333fab29f849c69
Parents: f2fb59c
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 10 14:45:58 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 14 09:57:17 2016 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java | 2 ++
.../apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java | 2 ++
.../org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java | 1 -
3 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d1e68af/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java
index bc570e1..55b4027 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java
@@ -196,6 +196,7 @@ public class BlockingDataflowPipelineRunnerTest {
DataflowPipelineRunner mockRunner = mock(DataflowPipelineRunner.class);
TestDataflowPipelineOptions options =
PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+ options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject(job.getProjectId());
when(mockRunner.run(isA(Pipeline.class))).thenReturn(job);
@@ -296,6 +297,7 @@ public class BlockingDataflowPipelineRunnerTest {
options.setTempLocation("gs://test/temp/location");
options.setGcpCredential(new TestCredential());
options.setPathValidatorClass(NoopPathValidator.class);
+ options.setRunner(BlockingDataflowPipelineRunner.class);
assertEquals("BlockingDataflowPipelineRunner#testjobname",
BlockingDataflowPipelineRunner.fromOptions(options).toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d1e68af/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
index aa65dd1..f7068b0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
@@ -452,6 +452,7 @@ public class DataflowPipelineRunnerTest {
options.setProject(PROJECT_ID);
options.setGcpCredential(new TestCredential());
options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+ options.setRunner(DataflowPipelineRunner.class);
DataflowPipelineRunner.fromOptions(options);
@@ -866,6 +867,7 @@ public class DataflowPipelineRunnerTest {
options.setTempLocation("gs://test/temp/location");
options.setGcpCredential(new TestCredential());
options.setPathValidatorClass(NoopPathValidator.class);
+ options.setRunner(DataflowPipelineRunner.class);
assertEquals(
"DataflowPipelineRunner#testjobname",
DataflowPipelineRunner.fromOptions(options).toString());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d1e68af/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
index ae711f0..0340435 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow.io;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
-
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertThat;
[8/8] incubator-beam git commit: This closes #446
Posted by ke...@apache.org.
This closes #446
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77494401
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77494401
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77494401
Branch: refs/heads/master
Commit: 774944014af046f55b995a22258cbe7195b7b6f8
Parents: c8ad2e7 a3ffd51
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 14 09:57:31 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 14 09:57:31 2016 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 7 ++
runners/direct-java/pom.xml | 6 ++
.../direct/AvroIOShardedWriteFactoryTest.java | 12 ++-
.../direct/InProcessPipelineRunnerTest.java | 11 ++-
.../direct/KeyedPValueTrackingVisitorTest.java | 7 +-
.../direct/TextIOShardedWriteFactoryTest.java | 12 ++-
.../beam/runners/flink/PipelineOptionsTest.java | 11 +--
runners/google-cloud-dataflow-java/pom.xml | 11 +++
.../BlockingDataflowPipelineRunnerTest.java | 2 +
.../dataflow/DataflowPipelineRunnerTest.java | 2 +
.../runners/dataflow/io/DataflowTextIOTest.java | 81 +-------------------
.../translation/WindowedWordCountTest.java | 19 +++--
sdks/java/core/pom.xml | 3 +
.../beam/sdk/options/PipelineOptions.java | 31 +++++++-
.../org/apache/beam/sdk/io/BigQueryIOTest.java | 4 +-
.../java/org/apache/beam/sdk/io/TextIOTest.java | 75 ++++++++++++++++++
.../java/org/apache/beam/sdk/io/WriteTest.java | 9 +--
.../sdk/options/PipelineOptionsFactoryTest.java | 79 ++++++++++++++-----
.../beam/sdk/options/PipelineOptionsTest.java | 8 --
.../options/PipelineOptionsValidatorTest.java | 15 ++++
.../sdk/runners/DirectPipelineRunnerTest.java | 1 +
.../beam/sdk/testing/TestPipelineTest.java | 5 +-
sdks/java/extensions/join-library/pom.xml | 7 ++
sdks/java/io/google-cloud-platform/pom.xml | 7 ++
sdks/java/io/hdfs/pom.xml | 7 ++
sdks/java/io/kafka/pom.xml | 7 ++
sdks/java/java8tests/pom.xml | 7 ++
.../beam/sdk/transforms/WithKeysJava8Test.java | 3 +-
.../main/resources/archetype-resources/pom.xml | 6 ++
29 files changed, 314 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77494401/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
[5/8] incubator-beam git commit: Increase Visibility of Flink Test
PipelineOptions
Posted by ke...@apache.org.
Increase Visibility of Flink Test PipelineOptions
This fixes an issue where the package-private nature would cause an
exception
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/816a3bf1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/816a3bf1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/816a3bf1
Branch: refs/heads/master
Commit: 816a3bf19f21f224fdfed2fb5bddb436293f655c
Parents: 4d1e68a
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 10 14:47:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 14 09:57:17 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/flink/PipelineOptionsTest.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/816a3bf1/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index d571f31..61e219c 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -17,6 +17,10 @@
*/
package org.apache.beam.runners.flink;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkAbstractParDoWrapper;
import org.apache.beam.sdk.options.Default;
@@ -30,6 +34,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.TupleTag;
+
import org.apache.commons.lang.SerializationUtils;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;
@@ -38,16 +43,12 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
/**
* Tests the serialization and deserialization of PipelineOptions.
*/
public class PipelineOptionsTest {
- private interface MyOptions extends FlinkPipelineOptions {
+ public interface MyOptions extends FlinkPipelineOptions {
@Description("Bla bla bla")
@Default.String("Hello")
String getTestOption();
[2/8] incubator-beam git commit: Use
TestPipeline#testingPipelineOptions in IO Tests
Posted by ke...@apache.org.
Use TestPipeline#testingPipelineOptions in IO Tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f73bd73c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f73bd73c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f73bd73c
Branch: refs/heads/master
Commit: f73bd73caa5e8222946cfc20491fd2806edd1d2b
Parents: a8a33b1
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 10 14:41:06 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 10 14:49:33 2016 -0700
----------------------------------------------------------------------
.../test/java/org/apache/beam/sdk/io/BigQueryIOTest.java | 4 ++--
.../src/test/java/org/apache/beam/sdk/io/WriteTest.java | 9 ++++-----
2 files changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f73bd73c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index 679ae27..2a135ec 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -384,7 +384,7 @@ public class BigQueryIOTest implements Serializable {
@Before
public void setUp() throws IOException {
- bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
+ bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
bqOptions.setProject("defaultProject");
bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
@@ -755,7 +755,7 @@ public class BigQueryIOTest implements Serializable {
options.setProject("someproject");
options.setStreaming(streaming);
- Pipeline p = Pipeline.create(options);
+ Pipeline p = TestPipeline.create(options);
TableReference tableRef = new TableReference();
tableRef.setDatasetId("somedataset");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f73bd73c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 45a4374..abda3a5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
-
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
@@ -35,9 +34,9 @@ import org.apache.beam.sdk.io.Sink.WriteOperation;
import org.apache.beam.sdk.io.Sink.Writer;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
@@ -190,9 +189,9 @@ public class WriteTest {
private static void runWrite(
List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform) {
// Flag to validate that the pipeline options are passed to the Sink
- String[] args = {"--testFlag=test_value"};
- PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(WriteOptions.class);
- Pipeline p = Pipeline.create(options);
+ WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class);
+ options.setTestFlag("test_value");
+ Pipeline p = TestPipeline.create(options);
// Clear the sink's contents.
sinkContents.clear();