You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/14 17:11:18 UTC

[1/8] incubator-beam git commit: Move GcsUtil TextIO Tests to TextIOTest

Repository: incubator-beam
Updated Branches:
  refs/heads/master c8ad2e7dd -> 774944014


Move GcsUtil TextIO Tests to TextIOTest

These tests are not a test of the DataflowRunner, nor any
DataflowRunner specific behavior, so they should be part of TextIOTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f2fb59c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f2fb59c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f2fb59c6

Branch: refs/heads/master
Commit: f2fb59c65119d5da56df5dd4e64fa1873c6ccbbb
Parents: f73bd73
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 10 14:43:10 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 10 14:49:33 2016 -0700

----------------------------------------------------------------------
 .../runners/dataflow/io/DataflowTextIOTest.java | 80 +-------------------
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 75 ++++++++++++++++++
 2 files changed, 77 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f2fb59c6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
index 0d7c1cb..ae711f0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
@@ -27,30 +27,16 @@ import static org.junit.Assert.assertThat;
 import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
 import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import com.google.common.collect.ImmutableList;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
-import java.util.List;
 import java.util.Set;
 
 /**
@@ -60,73 +46,11 @@ import java.util.Set;
 public class DataflowTextIOTest {
   private TestDataflowPipelineOptions buildTestPipelineOptions() {
     TestDataflowPipelineOptions options =
-        PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+        TestPipeline.testingPipelineOptions().as(TestDataflowPipelineOptions.class);
     options.setGcpCredential(new TestCredential());
     return options;
   }
 
-  private GcsUtil buildMockGcsUtil() throws IOException {
-    GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
-
-    // Any request to open gets a new bogus channel
-    Mockito
-        .when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
-        .then(new Answer<SeekableByteChannel>() {
-          @Override
-          public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
-            return FileChannel.open(
-                Files.createTempFile("channel-", ".tmp"),
-                StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
-          }
-        });
-
-    // Any request for expansion returns a list containing the original GcsPath
-    // This is required to pass validation that occurs in TextIO during apply()
-    Mockito
-        .when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
-        .then(new Answer<List<GcsPath>>() {
-          @Override
-          public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
-            return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
-          }
-        });
-
-    return mockGcsUtil;
-  }
-
-  /**
-   * This tests a few corner cases that should not crash.
-   */
-  @Test
-  public void testGoodWildcards() throws Exception {
-    TestDataflowPipelineOptions options = buildTestPipelineOptions();
-    options.setGcsUtil(buildMockGcsUtil());
-
-    Pipeline pipeline = Pipeline.create(options);
-
-    applyRead(pipeline, "gs://bucket/foo");
-    applyRead(pipeline, "gs://bucket/foo/");
-    applyRead(pipeline, "gs://bucket/foo/*");
-    applyRead(pipeline, "gs://bucket/foo/?");
-    applyRead(pipeline, "gs://bucket/foo/[0-9]");
-    applyRead(pipeline, "gs://bucket/foo/*baz*");
-    applyRead(pipeline, "gs://bucket/foo/*baz?");
-    applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
-    applyRead(pipeline, "gs://bucket/foo/baz/*");
-    applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
-    applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
-    applyRead(pipeline, "gs://bucket/foo*/baz");
-    applyRead(pipeline, "gs://bucket/foo?/baz");
-    applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
-
-    // Check that running doesn't fail.
-    pipeline.run();
-  }
-
-  private void applyRead(Pipeline pipeline, String path) {
-    pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
-  }
-
   @Test
   public void testPrimitiveWriteDisplayData() {
     DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f2fb59c6/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 724a113..9a762d1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
 import org.apache.beam.sdk.io.TextIO.TextSource;
+import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -46,7 +47,9 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 
@@ -59,6 +62,9 @@ import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -66,8 +72,11 @@ import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -685,4 +694,70 @@ public class TextIOTest {
 
     return source;
   }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////
+  // Test "gs://" paths
+
+  private GcsUtil buildMockGcsUtil() throws IOException {
+    GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
+
+    // Any request to open gets a new bogus channel
+    Mockito
+        .when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
+        .then(new Answer<SeekableByteChannel>() {
+          @Override
+          public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
+            return FileChannel.open(
+                Files.createTempFile("channel-", ".tmp"),
+                StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
+          }
+        });
+
+    // Any request for expansion returns a list containing the original GcsPath
+    // This is required to pass validation that occurs in TextIO during apply()
+    Mockito
+        .when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
+        .then(new Answer<List<GcsPath>>() {
+          @Override
+          public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
+            return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
+          }
+        });
+
+    return mockGcsUtil;
+  }
+
+  /**
+   * This tests a few corner cases that should not crash.
+   */
+  @Test
+  @Category(NeedsRunner.class)
+  public void testGoodWildcards() throws Exception {
+    GcsOptions options = TestPipeline.testingPipelineOptions().as(GcsOptions.class);
+    options.setGcsUtil(buildMockGcsUtil());
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    applyRead(pipeline, "gs://bucket/foo");
+    applyRead(pipeline, "gs://bucket/foo/");
+    applyRead(pipeline, "gs://bucket/foo/*");
+    applyRead(pipeline, "gs://bucket/foo/?");
+    applyRead(pipeline, "gs://bucket/foo/[0-9]");
+    applyRead(pipeline, "gs://bucket/foo/*baz*");
+    applyRead(pipeline, "gs://bucket/foo/*baz?");
+    applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
+    applyRead(pipeline, "gs://bucket/foo/baz/*");
+    applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
+    applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
+    applyRead(pipeline, "gs://bucket/foo*/baz");
+    applyRead(pipeline, "gs://bucket/foo?/baz");
+    applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
+
+    // Check that running doesn't fail.
+    pipeline.run();
+  }
+
+  private void applyRead(Pipeline pipeline, String path) {
+    pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
+  }
 }


[7/8] incubator-beam git commit: Update the Default Pipeline Runner

Posted by ke...@apache.org.
Update the Default Pipeline Runner

Select the InProcessRunner if it is on the classpath, and throw an
exception otherwise.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a3ffd510
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a3ffd510
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a3ffd510

Branch: refs/heads/master
Commit: a3ffd510896626019723294931a4c3763faf43af
Parents: 816a3bf
Author: Thomas Groh <tg...@google.com>
Authored: Wed May 18 16:56:06 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 14 09:57:17 2016 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |  7 ++
 runners/google-cloud-dataflow-java/pom.xml      | 11 +++
 sdks/java/core/pom.xml                          |  3 +
 .../beam/sdk/options/PipelineOptions.java       | 31 +++++++-
 .../sdk/options/PipelineOptionsFactoryTest.java | 79 +++++++++++++++-----
 .../beam/sdk/options/PipelineOptionsTest.java   |  8 --
 .../options/PipelineOptionsValidatorTest.java   | 15 ++++
 .../sdk/runners/DirectPipelineRunnerTest.java   |  1 +
 .../beam/sdk/testing/TestPipelineTest.java      |  5 +-
 sdks/java/extensions/join-library/pom.xml       |  7 ++
 sdks/java/io/google-cloud-platform/pom.xml      |  7 ++
 sdks/java/io/hdfs/pom.xml                       |  7 ++
 sdks/java/io/kafka/pom.xml                      |  7 ++
 sdks/java/java8tests/pom.xml                    |  7 ++
 .../beam/sdk/transforms/WithKeysJava8Test.java  |  3 +-
 .../main/resources/archetype-resources/pom.xml  |  6 ++
 16 files changed, 170 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 3d81338..5211b80 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -49,6 +49,7 @@
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
           <systemPropertyVariables>
+            <beamUseDummyRunner></beamUseDummyRunner>
             <beamTestPipelineOptions>
             </beamTestPipelineOptions>
           </systemPropertyVariables>
@@ -213,6 +214,12 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index a6dfae3..6d8e94b 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -84,6 +84,17 @@
 
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemPropertyVariables>
+            <beamTestPipelineOptions></beamTestPipelineOptions>
+            <beamUseDummyRunner>true</beamUseDummyRunner>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-dependency-plugin</artifactId>
         <executions>
           <execution>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 372a913..c559cff 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -129,6 +129,9 @@
           <excludedGroups>
             org.apache.beam.sdk.testing.NeedsRunner
           </excludedGroups>
+          <systemPropertyVariables>
+            <beamUseDummyRunner>true</beamUseDummyRunner>
+          </systemPropertyVariables>
         </configuration>
       </plugin>
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
index a2f38ed..b1b5280 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java
@@ -21,11 +21,11 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
 import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer;
 import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
+
 import com.google.auto.service.AutoService;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -225,7 +225,7 @@ public interface PipelineOptions extends HasDisplayData {
   @Description("The pipeline runner that will be used to execute the pipeline. "
       + "For registered runners, the class name can be specified, otherwise the fully "
       + "qualified name needs to be specified.")
-  @Default.Class(DirectPipelineRunner.class)
+  @Default.InstanceFactory(DirectRunner.class)
   Class<? extends PipelineRunner<?>> getRunner();
   void setRunner(Class<? extends PipelineRunner<?>> kls);
 
@@ -262,4 +262,31 @@ public interface PipelineOptions extends HasDisplayData {
   @Description("A pipeline level default location for storing temporary files.")
   String getTempLocation();
   void setTempLocation(String value);
+
+  /**
+   * A {@link DefaultValueFactory} that obtains the class of the {@code DirectRunner} if it exists
+   * on the classpath, and throws an exception otherwise.
+   *
+   * <p>As the {@code DirectRunner} is in an independent module, it cannot be directly referenced
+   * as the {@link Default}. However, it should still be used if available, and a user is required
+   * to explicitly set the {@code --runner} property if they wish to use an alternative runner.
+   */
+  class DirectRunner implements DefaultValueFactory<Class<? extends PipelineRunner>> {
+    @Override
+    public Class<? extends PipelineRunner> create(PipelineOptions options) {
+      try {
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        Class<? extends PipelineRunner> direct = (Class<? extends PipelineRunner>) Class.forName(
+            "org.apache.beam.runners.direct.InProcessPipelineRunner");
+        return direct;
+      } catch (ClassNotFoundException e) {
+        throw new IllegalArgumentException(String.format(
+            "No Runner was specified and the DirectRunner was not found on the classpath.%n"
+                + "Specify a runner by either:%n"
+                + "    Explicitly specifying a runner by providing the 'runner' property%n"
+                + "    Adding the DirectRunner to the classpath%n"
+                + "    Calling 'PipelineOptions.setRunner(PipelineRunner)' directly"));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index 62c6909..8b8337e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -29,11 +29,13 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import org.apache.beam.sdk.testing.CrashingRunner;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.RestoreSystemProperties;
 
+import com.google.auto.service.AutoService;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -59,8 +61,9 @@ import java.util.Set;
 /** Tests for {@link PipelineOptionsFactory}. */
 @RunWith(JUnit4.class)
 public class PipelineOptionsFactoryTest {
-  private static final Class<? extends PipelineRunner<?>> DEFAULT_RUNNER_CLASS =
-      DirectPipelineRunner.class;
+  private static final String DEFAULT_RUNNER_NAME = "DirectRunner";
+  private static final Class<? extends PipelineRunner> REGISTERED_RUNNER =
+      RegisteredTestRunner.class;
 
   @Rule public ExpectedException expectedException = ExpectedException.none();
   @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
@@ -68,14 +71,13 @@ public class PipelineOptionsFactoryTest {
 
   @Test
   public void testAutomaticRegistrationOfPipelineOptions() {
-    assertTrue(PipelineOptionsFactory.getRegisteredOptions().contains(DirectPipelineOptions.class));
+    assertTrue(PipelineOptionsFactory.getRegisteredOptions().contains(RegisteredTestOptions.class));
   }
 
   @Test
   public void testAutomaticRegistrationOfRunners() {
-    assertEquals(
-        DEFAULT_RUNNER_CLASS,
-        PipelineOptionsFactory.getRegisteredRunners().get(DEFAULT_RUNNER_CLASS.getSimpleName()));
+    assertEquals(REGISTERED_RUNNER,
+        PipelineOptionsFactory.getRegisteredRunners().get(REGISTERED_RUNNER.getSimpleName()));
   }
 
   @Test
@@ -85,7 +87,7 @@ public class PipelineOptionsFactoryTest {
   }
 
   /** A simple test interface. */
-  public static interface TestPipelineOptions extends PipelineOptions {
+  public interface TestPipelineOptions extends PipelineOptions {
     String getTestPipelineOption();
     void setTestPipelineOption(String value);
   }
@@ -810,18 +812,18 @@ public class PipelineOptionsFactoryTest {
 
   @Test
   public void testSettingRunner() {
-    String[] args = new String[] {"--runner=DirectPipelineRunner"};
+    String[] args = new String[] {"--runner=" + RegisteredTestRunner.class.getSimpleName()};
 
     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
-    assertEquals(DirectPipelineRunner.class, options.getRunner());
+    assertEquals(RegisteredTestRunner.class, options.getRunner());
   }
 
   @Test
   public void testSettingRunnerFullName() {
     String[] args =
-        new String[] {String.format("--runner=%s", DirectPipelineRunner.class.getName())};
+        new String[] {String.format("--runner=%s", CrashingRunner.class.getName())};
     PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
-    assertEquals(opts.getRunner(), DirectPipelineRunner.class);
+    assertEquals(opts.getRunner(), CrashingRunner.class);
   }
 
 
@@ -832,7 +834,7 @@ public class PipelineOptionsFactoryTest {
     expectedException.expectMessage(
         "Unknown 'runner' specified 'UnknownRunner', supported " + "pipeline runners");
     Set<String> registeredRunners = PipelineOptionsFactory.getRegisteredRunners().keySet();
-    assertThat(registeredRunners, hasItem(DEFAULT_RUNNER_CLASS.getSimpleName()));
+    assertThat(registeredRunners, hasItem(REGISTERED_RUNNER.getSimpleName()));
     for (String registeredRunner : registeredRunners) {
       expectedException.expectMessage(registeredRunner);
     }
@@ -923,7 +925,7 @@ public class PipelineOptionsFactoryTest {
   public void testEmptyArgumentIsIgnored() {
     String[] args =
         new String[] {
-          "", "--string=100", "", "", "--runner=" + DEFAULT_RUNNER_CLASS.getSimpleName()
+          "", "--string=100", "", "", "--runner=" + REGISTERED_RUNNER.getSimpleName()
         };
     PipelineOptionsFactory.fromArgs(args).as(Objects.class);
   }
@@ -932,7 +934,7 @@ public class PipelineOptionsFactoryTest {
   public void testNullArgumentIsIgnored() {
     String[] args =
         new String[] {
-          "--string=100", null, null, "--runner=" + DEFAULT_RUNNER_CLASS.getSimpleName()
+          "--string=100", null, null, "--runner=" + REGISTERED_RUNNER.getSimpleName()
         };
     PipelineOptionsFactory.fromArgs(args).as(Objects.class);
   }
@@ -985,7 +987,7 @@ public class PipelineOptionsFactoryTest {
     String output = new String(baos.toByteArray());
     assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
     assertThat(output, containsString("--runner"));
-    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
     assertThat(output,
         containsString("The pipeline runner that will be used to execute the pipeline."));
   }
@@ -1000,7 +1002,7 @@ public class PipelineOptionsFactoryTest {
     String output = new String(baos.toByteArray());
     assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
     assertThat(output, containsString("--runner"));
-    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
     assertThat(output,
         containsString("The pipeline runner that will be used to execute the pipeline."));
   }
@@ -1015,7 +1017,7 @@ public class PipelineOptionsFactoryTest {
     String output = new String(baos.toByteArray());
     assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
     assertThat(output, containsString("--runner"));
-    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
     assertThat(output,
         containsString("The pipeline runner that will be used to execute the pipeline."));
   }
@@ -1109,7 +1111,7 @@ public class PipelineOptionsFactoryTest {
     String output = new String(baos.toByteArray());
     assertThat(output, containsString("org.apache.beam.sdk.options.PipelineOptions"));
     assertThat(output, containsString("--runner"));
-    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_CLASS.getSimpleName()));
+    assertThat(output, containsString("Default: " + DEFAULT_RUNNER_NAME));
     assertThat(output,
         containsString("The pipeline runner that will be used to execute the pipeline."));
   }
@@ -1147,4 +1149,43 @@ public class PipelineOptionsFactoryTest {
     thread.join();
     assertEquals(cl, classLoader[0]);
   }
+
+  private static class RegisteredTestRunner extends PipelineRunner<PipelineResult> {
+    public static PipelineRunner fromOptions(PipelineOptions options) {
+      return new RegisteredTestRunner();
+    }
+
+    public PipelineResult run(Pipeline p) {
+      throw new IllegalArgumentException();
+    }
+  }
+
+
+  /**
+   * A {@link PipelineRunnerRegistrar} to demonstrate default {@link PipelineRunner} registration.
+   */
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static class RegisteredTestRunnerRegistrar implements PipelineRunnerRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(RegisteredTestRunner.class);
+    }
+  }
+
+  private interface RegisteredTestOptions extends PipelineOptions {
+    Object getRegisteredExampleFooBar();
+    void setRegisteredExampleFooBar(Object registeredExampleFooBar);
+  }
+
+
+  /**
+   * A {@link PipelineOptionsRegistrar} to demonstrate default {@link PipelineOptions} registration.
+   */
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static class RegisteredTestOptionsRegistrar implements PipelineOptionsRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.<Class<? extends PipelineOptions>>of(RegisteredTestOptions.class);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
index dfda528..687271c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
@@ -17,15 +17,12 @@
  */
 package org.apache.beam.sdk.options;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -86,11 +83,6 @@ public class PipelineOptionsTest {
   }
 
   @Test
-  public void testDefaultRunnerIsSet() {
-    assertEquals(DirectPipelineRunner.class, PipelineOptionsFactory.create().getRunner());
-  }
-
-  @Test
   public void testCloneAs() throws IOException {
     DerivedTestOptions options = PipelineOptionsFactory.create().as(DerivedTestOptions.class);
     options.setBaseValue(Lists.<Boolean>newArrayList());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
index 0250bd1..2b684a8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsValidatorTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.options;
 
+import org.apache.beam.sdk.testing.CrashingRunner;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -40,6 +42,7 @@ public class PipelineOptionsValidatorTest {
   @Test
   public void testWhenRequiredOptionIsSet() {
     Required required = PipelineOptionsFactory.as(Required.class);
+    required.setRunner(CrashingRunner.class);
     required.setObject("blah");
     PipelineOptionsValidator.validate(Required.class, required);
   }
@@ -114,6 +117,7 @@ public class PipelineOptionsValidatorTest {
     GroupRequired groupRequired = PipelineOptionsFactory.as(GroupRequired.class);
     groupRequired.setFoo("foo");
     groupRequired.setBar(null);
+    groupRequired.setRunner(CrashingRunner.class);
 
     PipelineOptionsValidator.validate(GroupRequired.class, groupRequired);
 
@@ -126,6 +130,7 @@ public class PipelineOptionsValidatorTest {
   @Test
   public void testWhenNoneOfRequiredGroupIsSetThrowsException() {
     GroupRequired groupRequired = PipelineOptionsFactory.as(GroupRequired.class);
+    groupRequired.setRunner(CrashingRunner.class);
 
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage("Missing required value for group [ham]");
@@ -155,6 +160,7 @@ public class PipelineOptionsValidatorTest {
   public void testWhenOneOfMultipleRequiredGroupsIsSetIsValid() {
     MultiGroupRequired multiGroupRequired = PipelineOptionsFactory.as(MultiGroupRequired.class);
 
+    multiGroupRequired.setRunner(CrashingRunner.class);
     multiGroupRequired.setFoo("eggs");
 
     PipelineOptionsValidator.validate(MultiGroupRequired.class, multiGroupRequired);
@@ -194,6 +200,7 @@ public class PipelineOptionsValidatorTest {
   public void testWhenOptionIsDefinedInMultipleSuperInterfacesAndIsNotPresentFailsRequirement() {
     RightOptions rightOptions = PipelineOptionsFactory.as(RightOptions.class);
     rightOptions.setBoth("foo");
+    rightOptions.setRunner(CrashingRunner.class);
 
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage("Missing required value for group");
@@ -212,6 +219,8 @@ public class PipelineOptionsValidatorTest {
     leftOpts.setFoo("Untrue");
     leftOpts.setBoth("Raise the");
 
+    rightOpts.setRunner(CrashingRunner.class);
+    leftOpts.setRunner(CrashingRunner.class);
     PipelineOptionsValidator.validate(JoinedOptions.class, rightOpts);
     PipelineOptionsValidator.validate(JoinedOptions.class, leftOpts);
   }
@@ -226,6 +235,8 @@ public class PipelineOptionsValidatorTest {
     leftOpts.setFoo("Untrue");
     leftOpts.setBoth("Raise the");
 
+    rightOpts.setRunner(CrashingRunner.class);
+    leftOpts.setRunner(CrashingRunner.class);
     PipelineOptionsValidator.validate(RightOptions.class, leftOpts);
     PipelineOptionsValidator.validate(LeftOptions.class, rightOpts);
   }
@@ -234,6 +245,7 @@ public class PipelineOptionsValidatorTest {
   public void testWhenOptionIsDefinedOnMultipleInterfacesOnlyListedOnceWhenNotPresent() {
     JoinedOptions options = PipelineOptionsFactory.as(JoinedOptions.class);
     options.setFoo("Hello");
+    options.setRunner(CrashingRunner.class);
 
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage("required value for group [both]");
@@ -273,6 +285,7 @@ public class PipelineOptionsValidatorTest {
   public void testSuperInterfaceRequiredOptionsAlsoRequiredInSubInterface() {
     SubOptions subOpts = PipelineOptionsFactory.as(SubOptions.class);
     subOpts.setFoo("Bar");
+    subOpts.setRunner(CrashingRunner.class);
 
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage("otherSuper");
@@ -288,6 +301,7 @@ public class PipelineOptionsValidatorTest {
     SubOptions opts = PipelineOptionsFactory.as(SubOptions.class);
     opts.setFoo("Foo");
     opts.setSuperclassObj("Hello world");
+    opts.setRunner(CrashingRunner.class);
 
     // Valid SubOptions, but invalid SuperOptions
     PipelineOptionsValidator.validate(SubOptions.class, opts);
@@ -304,6 +318,7 @@ public class PipelineOptionsValidatorTest {
     subOpts.setFoo("bar");
     subOpts.setBar("bar");
     subOpts.setSuperclassObj("SuperDuper");
+    subOpts.setRunner(CrashingRunner.class);
 
     PipelineOptionsValidator.validate(SubOptions.class, subOpts);
     PipelineOptionsValidator.validate(SuperOptions.class, subOpts);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
index ae3b4e0..edf6996 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/DirectPipelineRunnerTest.java
@@ -68,6 +68,7 @@ public class DirectPipelineRunnerTest implements Serializable {
   @Test
   public void testToString() {
     PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(DirectPipelineRunner.class);
     DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options);
     assertEquals("DirectPipelineRunner#" + runner.hashCode(),
         runner.toString());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index b741e2e..043c06c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.transforms.Create;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -62,13 +61,13 @@ public class TestPipelineTest {
   public void testCreationOfPipelineOptions() throws Exception {
     ObjectMapper mapper = new ObjectMapper();
     String stringOptions = mapper.writeValueAsString(new String[]{
-      "--runner=DirectPipelineRunner",
+      "--runner=org.apache.beam.sdk.testing.CrashingRunner",
       "--project=testProject"
     });
     System.getProperties().put("beamTestPipelineOptions", stringOptions);
     GcpOptions options =
         TestPipeline.testingPipelineOptions().as(GcpOptions.class);
-    assertEquals(DirectPipelineRunner.class, options.getRunner());
+    assertEquals(CrashingRunner.class, options.getRunner());
     assertEquals(options.getProject(), "testProject");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/extensions/join-library/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml
index 828fcce..0fec148 100644
--- a/sdks/java/extensions/join-library/pom.xml
+++ b/sdks/java/extensions/join-library/pom.xml
@@ -63,6 +63,13 @@
 
     <!-- Dependency for tests -->
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 962c7b0..f567261 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -88,6 +88,13 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index e99dbd7..9c30792 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -66,6 +66,13 @@
 
     <!-- test dependencies -->
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index be19a83..76c0eb6 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -71,6 +71,13 @@
 
     <!-- test dependencies-->
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/java8tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/pom.xml b/sdks/java/java8tests/pom.xml
index 48bf682..8e20228 100644
--- a/sdks/java/java8tests/pom.xml
+++ b/sdks/java/java8tests/pom.xml
@@ -96,6 +96,13 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
index a0d1a63..1ffb147 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithKeysJava8Test.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -65,7 +64,7 @@ public class WithKeysJava8Test {
 
     values.apply("ApplyKeysWithWithKeys", WithKeys.of((String s) -> Integer.valueOf(s)));
 
-    thrown.expect(PipelineExecutionException.class);
+    thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys");
     thrown.expectMessage("Cannot provide a coder for type variable K");
     thrown.expectMessage("the actual type is unknown due to erasure.");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3ffd510/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
index d86b9cc..2b2e24b 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
@@ -101,6 +101,12 @@
     <!-- Adds a dependency on a specific version of the Dataflow runnner. -->
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>[0-incubating, 2-incubating)</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
       <version>[0-incubating, 2-incubating)</version>
     </dependency>


[3/8] incubator-beam git commit: Update Direct Module tests to explicitly set Pipeline

Posted by ke...@apache.org.
Update Direct Module tests to explicitly set Pipeline


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a8a33b19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a8a33b19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a8a33b19

Branch: refs/heads/master
Commit: a8a33b19933326c28522dee530974c96d4aef0cb
Parents: de49d03
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 10 14:38:36 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 10 14:49:33 2016 -0700

----------------------------------------------------------------------
 runners/direct-java/pom.xml                             |  6 ++++++
 .../runners/direct/AvroIOShardedWriteFactoryTest.java   | 12 ++++++++++--
 .../runners/direct/InProcessPipelineRunnerTest.java     | 11 ++++++++++-
 .../runners/direct/KeyedPValueTrackingVisitorTest.java  |  7 ++-----
 .../runners/direct/TextIOShardedWriteFactoryTest.java   | 12 ++++++++++--
 5 files changed, 38 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8a33b19/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index def7207..b2cb607 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -78,6 +78,12 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemPropertyVariables>
+            <!-- Use a dummy runner for component tests -->
+            <beamUseDummyRunner>true</beamUseDummyRunner>
+          </systemPropertyVariables>
+        </configuration>
         <executions>
           <execution>
             <id>runnable-on-service-tests</id>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8a33b19/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
index d290a4b..c0c1361 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java
@@ -21,8 +21,10 @@ import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.theInstance;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.AvroIOTest;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -82,7 +84,7 @@ public class AvroIOShardedWriteFactoryTest {
 
     assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
 
-    TestPipeline p = TestPipeline.create();
+    Pipeline p = getPipeline();
     String[] elems = new String[] {"foo", "bar", "baz"};
     p.apply(Create.<String>of(elems)).apply(overridden);
 
@@ -101,7 +103,7 @@ public class AvroIOShardedWriteFactoryTest {
 
     assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
 
-    TestPipeline p = TestPipeline.create();
+    Pipeline p = getPipeline();
     String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"};
     p.apply(Create.<String>of(elems)).apply(overridden);
 
@@ -109,4 +111,10 @@ public class AvroIOShardedWriteFactoryTest {
     p.run();
     AvroIOTest.assertTestOutputs(elems, 3, file.getAbsolutePath(), original.getShardNameTemplate());
   }
+
+  private Pipeline getPipeline() {
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    options.setRunner(InProcessPipelineRunner.class);
+    return TestPipeline.fromOptions(options);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8a33b19/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
index 5c26ac3..ab26c15 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 import org.apache.beam.runners.direct.InProcessPipelineRunner.InProcessPipelineResult;
@@ -27,6 +28,7 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
@@ -46,9 +48,9 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 
 import com.google.common.collect.ImmutableMap;
 
-
 import com.fasterxml.jackson.annotation.JsonValue;
 
+import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
@@ -77,6 +79,13 @@ public class InProcessPipelineRunnerTest implements Serializable {
   }
 
   @Test
+  public void defaultRunnerLoaded() {
+    assertThat(InProcessPipelineRunner.class,
+        Matchers.<Class<? extends PipelineRunner>>equalTo(PipelineOptionsFactory.create()
+            .getRunner()));
+  }
+
+  @Test
   public void wordCountShouldSucceed() throws Throwable {
     Pipeline p = getPipeline();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8a33b19/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
index 24f9715..aa0d976 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -26,8 +26,7 @@ import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -61,9 +60,7 @@ public class KeyedPValueTrackingVisitorTest {
 
   @Before
   public void setup() {
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    p = Pipeline.create(options);
+    p = TestPipeline.create();
     @SuppressWarnings("rawtypes")
     Set<Class<? extends PTransform>> producesKeyed =
         ImmutableSet.<Class<? extends PTransform>>of(PrimitiveKeyer.class, CompositeKeyer.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8a33b19/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
index 49c9061..fe9866c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
@@ -21,9 +21,11 @@ import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.theInstance;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.TextIOTest;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -81,7 +83,7 @@ public class TextIOShardedWriteFactoryTest {
 
     assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
 
-    TestPipeline p = TestPipeline.create();
+    Pipeline p = getPipeline();
     String[] elems = new String[] {"foo", "bar", "baz"};
     p.apply(Create.<String>of(elems)).apply(overridden);
 
@@ -100,7 +102,7 @@ public class TextIOShardedWriteFactoryTest {
 
     assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
 
-    TestPipeline p = TestPipeline.create();
+    Pipeline p = getPipeline();
     String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"};
     p.apply(Create.<String>of(elems)).apply(overridden);
 
@@ -109,4 +111,10 @@ public class TextIOShardedWriteFactoryTest {
     TextIOTest.assertOutputFiles(
         elems, StringUtf8Coder.of(), 3, tmp, "foo", original.getShardNameTemplate());
   }
+
+  private Pipeline getPipeline() {
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    options.setRunner(InProcessPipelineRunner.class);
+    return TestPipeline.fromOptions(options);
+  }
 }


[4/8] incubator-beam git commit: Update Pipeline Execution Style in WindowedWordCountTest

Posted by ke...@apache.org.
Update Pipeline Execution Style in WindowedWordCountTest

This sets the runner a Pipeline creation time rather than sending a
(potentially rewritten) pipeline to a new runner instance.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de49d032
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de49d032
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de49d032

Branch: refs/heads/master
Commit: de49d032730dd21691e6e4358fdcfef249aef46f
Parents: 8291219
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 10 14:36:42 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 10 14:49:33 2016 -0700

----------------------------------------------------------------------
 .../spark/translation/WindowedWordCountTest.java | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de49d032/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
index c6911e1..54af5e3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.SimpleWordCountTest;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
@@ -55,7 +56,9 @@ public class WindowedWordCountTest {
 
   @Test
   public void testFixed() throws Exception {
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PipelineOptions opts = PipelineOptionsFactory.create();
+    opts.setRunner(SparkPipelineRunner.class);
+    Pipeline p = Pipeline.create(opts);
     PCollection<String> inputWords =
         p.apply(Create.timestamped(WORDS, TIMESTAMPS)).setCoder(StringUtf8Coder.of());
     PCollection<String> windowedWords =
@@ -65,7 +68,7 @@ public class WindowedWordCountTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SEPARATE_COUNT_SET);
 
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
   }
 
@@ -74,7 +77,9 @@ public class WindowedWordCountTest {
 
   @Test
   public void testFixed2() throws Exception {
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PipelineOptions opts = PipelineOptionsFactory.create();
+    opts.setRunner(SparkPipelineRunner.class);
+    Pipeline p = Pipeline.create(opts);
     PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
         .withCoder(StringUtf8Coder.of()));
     PCollection<String> windowedWords = inputWords
@@ -84,7 +89,7 @@ public class WindowedWordCountTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SAME_COUNT_SET);
 
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
   }
 
@@ -94,7 +99,9 @@ public class WindowedWordCountTest {
 
   @Test
   public void testSliding() throws Exception {
-    Pipeline p = Pipeline.create(PipelineOptionsFactory.create());
+    PipelineOptions opts = PipelineOptionsFactory.create();
+    opts.setRunner(SparkPipelineRunner.class);
+    Pipeline p = Pipeline.create(opts);
     PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)
         .withCoder(StringUtf8Coder.of()));
     PCollection<String> windowedWords = inputWords
@@ -105,7 +112,7 @@ public class WindowedWordCountTest {
 
     PAssert.that(output).containsInAnyOrder(EXPECTED_SLIDING_COUNT_SET);
 
-    EvaluationResult res = SparkPipelineRunner.create().run(p);
+    EvaluationResult res = (EvaluationResult) p.run();
     res.close();
   }
 


[6/8] incubator-beam git commit: Set Runner in DataflowRunner Tests

Posted by ke...@apache.org.
Set Runner in DataflowRunner Tests

Otherwise the Default Runner is used, which may be unavailable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d1e68af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d1e68af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d1e68af

Branch: refs/heads/master
Commit: 4d1e68af96d7ef44403add666333fab29f849c69
Parents: f2fb59c
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 10 14:45:58 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 14 09:57:17 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java  | 2 ++
 .../apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java   | 2 ++
 .../org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java    | 1 -
 3 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d1e68af/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java
index bc570e1..55b4027 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BlockingDataflowPipelineRunnerTest.java
@@ -196,6 +196,7 @@ public class BlockingDataflowPipelineRunnerTest {
     DataflowPipelineRunner mockRunner = mock(DataflowPipelineRunner.class);
     TestDataflowPipelineOptions options =
         PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+    options.setRunner(BlockingDataflowPipelineRunner.class);
     options.setProject(job.getProjectId());
 
     when(mockRunner.run(isA(Pipeline.class))).thenReturn(job);
@@ -296,6 +297,7 @@ public class BlockingDataflowPipelineRunnerTest {
     options.setTempLocation("gs://test/temp/location");
     options.setGcpCredential(new TestCredential());
     options.setPathValidatorClass(NoopPathValidator.class);
+    options.setRunner(BlockingDataflowPipelineRunner.class);
     assertEquals("BlockingDataflowPipelineRunner#testjobname",
         BlockingDataflowPipelineRunner.fromOptions(options).toString());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d1e68af/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
index aa65dd1..f7068b0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
@@ -452,6 +452,7 @@ public class DataflowPipelineRunnerTest {
     options.setProject(PROJECT_ID);
     options.setGcpCredential(new TestCredential());
     options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */));
+    options.setRunner(DataflowPipelineRunner.class);
 
     DataflowPipelineRunner.fromOptions(options);
 
@@ -866,6 +867,7 @@ public class DataflowPipelineRunnerTest {
     options.setTempLocation("gs://test/temp/location");
     options.setGcpCredential(new TestCredential());
     options.setPathValidatorClass(NoopPathValidator.class);
+    options.setRunner(DataflowPipelineRunner.class);
     assertEquals(
         "DataflowPipelineRunner#testjobname",
         DataflowPipelineRunner.fromOptions(options).toString());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d1e68af/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
index ae711f0..0340435 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
-
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertThat;


[8/8] incubator-beam git commit: This closes #446

Posted by ke...@apache.org.
This closes #446


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77494401
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77494401
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77494401

Branch: refs/heads/master
Commit: 774944014af046f55b995a22258cbe7195b7b6f8
Parents: c8ad2e7 a3ffd51
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 14 09:57:31 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 14 09:57:31 2016 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |  7 ++
 runners/direct-java/pom.xml                     |  6 ++
 .../direct/AvroIOShardedWriteFactoryTest.java   | 12 ++-
 .../direct/InProcessPipelineRunnerTest.java     | 11 ++-
 .../direct/KeyedPValueTrackingVisitorTest.java  |  7 +-
 .../direct/TextIOShardedWriteFactoryTest.java   | 12 ++-
 .../beam/runners/flink/PipelineOptionsTest.java | 11 +--
 runners/google-cloud-dataflow-java/pom.xml      | 11 +++
 .../BlockingDataflowPipelineRunnerTest.java     |  2 +
 .../dataflow/DataflowPipelineRunnerTest.java    |  2 +
 .../runners/dataflow/io/DataflowTextIOTest.java | 81 +-------------------
 .../translation/WindowedWordCountTest.java      | 19 +++--
 sdks/java/core/pom.xml                          |  3 +
 .../beam/sdk/options/PipelineOptions.java       | 31 +++++++-
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  |  4 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 75 ++++++++++++++++++
 .../java/org/apache/beam/sdk/io/WriteTest.java  |  9 +--
 .../sdk/options/PipelineOptionsFactoryTest.java | 79 ++++++++++++++-----
 .../beam/sdk/options/PipelineOptionsTest.java   |  8 --
 .../options/PipelineOptionsValidatorTest.java   | 15 ++++
 .../sdk/runners/DirectPipelineRunnerTest.java   |  1 +
 .../beam/sdk/testing/TestPipelineTest.java      |  5 +-
 sdks/java/extensions/join-library/pom.xml       |  7 ++
 sdks/java/io/google-cloud-platform/pom.xml      |  7 ++
 sdks/java/io/hdfs/pom.xml                       |  7 ++
 sdks/java/io/kafka/pom.xml                      |  7 ++
 sdks/java/java8tests/pom.xml                    |  7 ++
 .../beam/sdk/transforms/WithKeysJava8Test.java  |  3 +-
 .../main/resources/archetype-resources/pom.xml  |  6 ++
 29 files changed, 314 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77494401/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------


[5/8] incubator-beam git commit: Increase Visibility of Flink Test PipelineOptions

Posted by ke...@apache.org.
Increase Visibility of Flink Test PipelineOptions

This fixes an issue where the package-private nature would cause an
exception


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/816a3bf1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/816a3bf1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/816a3bf1

Branch: refs/heads/master
Commit: 816a3bf19f21f224fdfed2fb5bddb436293f655c
Parents: 4d1e68a
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 10 14:47:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 14 09:57:17 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/flink/PipelineOptionsTest.java   | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/816a3bf1/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index d571f31..61e219c 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.beam.runners.flink;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkAbstractParDoWrapper;
 import org.apache.beam.sdk.options.Default;
@@ -30,6 +34,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.TupleTag;
+
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
@@ -38,16 +43,12 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 /**
  * Tests the serialization and deserialization of PipelineOptions.
  */
 public class PipelineOptionsTest {
 
-  private interface MyOptions extends FlinkPipelineOptions {
+  public interface MyOptions extends FlinkPipelineOptions {
     @Description("Bla bla bla")
     @Default.String("Hello")
     String getTestOption();


[2/8] incubator-beam git commit: Use TestPipeline#testingPipelineOptions in IO Tests

Posted by ke...@apache.org.
Use TestPipeline#testingPipelineOptions in IO Tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f73bd73c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f73bd73c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f73bd73c

Branch: refs/heads/master
Commit: f73bd73caa5e8222946cfc20491fd2806edd1d2b
Parents: a8a33b1
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 10 14:41:06 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 10 14:49:33 2016 -0700

----------------------------------------------------------------------
 .../test/java/org/apache/beam/sdk/io/BigQueryIOTest.java    | 4 ++--
 .../src/test/java/org/apache/beam/sdk/io/WriteTest.java     | 9 ++++-----
 2 files changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f73bd73c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index 679ae27..2a135ec 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -384,7 +384,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Before
   public void setUp() throws IOException {
-    bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
+    bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
     bqOptions.setProject("defaultProject");
     bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
@@ -755,7 +755,7 @@ public class BigQueryIOTest implements Serializable {
     options.setProject("someproject");
     options.setStreaming(streaming);
 
-    Pipeline p = Pipeline.create(options);
+    Pipeline p = TestPipeline.create(options);
 
     TableReference tableRef = new TableReference();
     tableRef.setDatasetId("somedataset");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f73bd73c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 45a4374..abda3a5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
-
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
@@ -35,9 +34,9 @@ import org.apache.beam.sdk.io.Sink.WriteOperation;
 import org.apache.beam.sdk.io.Sink.Writer;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
 import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -190,9 +189,9 @@ public class WriteTest {
   private static void runWrite(
       List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform) {
     // Flag to validate that the pipeline options are passed to the Sink
-    String[] args = {"--testFlag=test_value"};
-    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(WriteOptions.class);
-    Pipeline p = Pipeline.create(options);
+    WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class);
+    options.setTestFlag("test_value");
+    Pipeline p = TestPipeline.create(options);
 
     // Clear the sink's contents.
     sinkContents.clear();