You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/05/04 18:10:33 UTC

[1/6] beam git commit: [BEAM-1763] Replace usage of PipelineRule with TestPipeline in Spark runner tests

Repository: beam
Updated Branches:
  refs/heads/master 48c8ed176 -> b73918b55


[BEAM-1763] Replace usage of PipelineRule with TestPipeline in Spark runner tests


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8d91a97b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8d91a97b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8d91a97b

Branch: refs/heads/master
Commit: 8d91a97b77fbda74c577d2cdbd507395834e147c
Parents: 0e2bb18
Author: Aviem Zur <av...@gmail.com>
Authored: Wed May 3 21:06:00 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  47 +++++++-
 .../runners/spark/SparkRunnerRegistrar.java     |   4 +-
 .../apache/beam/runners/spark/CacheTest.java    |  12 +-
 .../beam/runners/spark/ForceStreamingTest.java  |  18 +--
 .../apache/beam/runners/spark/PipelineRule.java | 109 -------------------
 .../runners/spark/ProvidedSparkContextTest.java |  10 +-
 .../runners/spark/SparkRunnerDebuggerTest.java  |  15 +--
 .../runners/spark/SparkRunnerRegistrarTest.java |   2 +-
 .../beam/runners/spark/StreamingTest.java       |  23 ++++
 .../metrics/sink/SparkMetricsSinkTest.java      |  12 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |  10 +-
 .../beam/runners/spark/io/NumShardsTest.java    |   6 +-
 .../spark/translation/StorageLevelTest.java     |  31 +++++-
 .../translation/streaming/CreateStreamTest.java |  53 ++++-----
 .../ResumeFromCheckpointStreamingTest.java      |  50 ++++++---
 .../streaming/StreamingSourceMetricsTest.java   |  14 +--
 .../main/java/org/apache/beam/sdk/Pipeline.java |   2 +-
 .../apache/beam/sdk/testing/TestPipeline.java   |  21 +++-
 18 files changed, 217 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 38d250e..f7200d6 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -333,9 +333,6 @@
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-surefire-plugin</artifactId>
           <configuration>
-            <excludedGroups>
-              org.apache.beam.runners.spark.UsesCheckpointRecovery
-            </excludedGroups>
             <forkCount>1</forkCount>
             <reuseForks>false</reuseForks>
             <systemPropertyVariables>
@@ -344,6 +341,50 @@
               <spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress>
             </systemPropertyVariables>
           </configuration>
+          <executions>
+            <execution>
+              <id>default-test</id>
+              <goals>
+                <goal>test</goal>
+              </goals>
+              <configuration>
+                <excludedGroups>
+                  org.apache.beam.runners.spark.UsesCheckpointRecovery,
+                  org.apache.beam.runners.spark.StreamingTest
+                </excludedGroups>
+                <systemPropertyVariables>
+                  <beamTestPipelineOptions>
+                    [
+                    "--runner=TestSparkRunner",
+                    "--streaming=false",
+                    "--enableSparkMetricSinks=true"
+                    ]
+                  </beamTestPipelineOptions>
+                </systemPropertyVariables>
+              </configuration>
+            </execution>
+            <execution>
+              <id>streaming-tests</id>
+              <phase>test</phase>
+              <goals>
+                <goal>test</goal>
+              </goals>
+              <configuration>
+                <groups>
+                  org.apache.beam.runners.spark.StreamingTest
+                </groups>
+                <systemPropertyVariables>
+                  <beamTestPipelineOptions>
+                    [
+                    "--runner=TestSparkRunner",
+                    "--forceStreaming=true",
+                    "--enableSparkMetricSinks=true"
+                    ]
+                  </beamTestPipelineOptions>
+                </systemPropertyVariables>
+              </configuration>
+            </execution>
+          </executions>
         </plugin>
         <plugin>
           <groupId>org.codehaus.mojo</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index bedfda4..bf926dc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -54,7 +54,9 @@ public final class SparkRunnerRegistrar {
   public static class Options implements PipelineOptionsRegistrar {
     @Override
     public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-      return ImmutableList.<Class<? extends PipelineOptions>>of(SparkPipelineOptions.class);
+      return ImmutableList.<Class<? extends PipelineOptions>>of(
+          SparkPipelineOptions.class,
+          TestSparkPipelineOptions.class);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
index c3b48d8..24b2e7b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
@@ -23,11 +23,11 @@ import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.junit.Rule;
 import org.junit.Test;
 
 /**
@@ -36,12 +36,12 @@ import org.junit.Test;
  */
 public class CacheTest {
 
-  @Rule
-  public final transient PipelineRule pipelineRule = PipelineRule.batch();
-
   @Test
   public void cacheCandidatesUpdaterTest() throws Exception {
-    Pipeline pipeline = pipelineRule.createPipeline();
+    SparkPipelineOptions options =
+        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
+    options.setRunner(TestSparkRunner.class);
+    Pipeline pipeline = Pipeline.create(options);
     PCollection<String> pCollection = pipeline.apply(Create.of("foo", "bar"));
     // first read
     pCollection.apply(Count.<String>globally());
@@ -50,7 +50,7 @@ public class CacheTest {
     // will cache the RDD representing this PCollection
     pCollection.apply(Count.<String>globally());
 
-    JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineRule.getOptions());
+    JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
     EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
     SparkRunner.CacheVisitor cacheVisitor =
         new SparkRunner.CacheVisitor(new TransformTranslator.Translator(), ctxt);

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
index b60faf2..7bfc980 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
@@ -25,9 +25,9 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.junit.Rule;
 import org.junit.Test;
 
 
@@ -44,19 +44,23 @@ import org.junit.Test;
  */
 public class ForceStreamingTest {
 
-  @Rule
-  public final PipelineRule pipelineRule = PipelineRule.streaming();
-
   @Test
   public void test() throws IOException {
-    Pipeline pipeline = pipelineRule.createPipeline();
+    TestSparkPipelineOptions options =
+        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
+    options.setRunner(TestSparkRunner.class);
+    options.setForceStreaming(true);
+
+    // pipeline with a bounded read.
+    Pipeline pipeline = Pipeline.create(options);
 
     // apply the BoundedReadFromUnboundedSource.
     BoundedReadFromUnboundedSource<?> boundedRead =
         Read.from(CountingSource.unbounded()).withMaxNumRecords(-1);
-    //noinspection unchecked
     pipeline.apply(boundedRead);
-    TestSparkRunner runner = TestSparkRunner.fromOptions(pipelineRule.getOptions());
+
+    // adapt reads
+    TestSparkRunner runner = TestSparkRunner.fromOptions(options);
     runner.adaptBoundedReads(pipeline);
 
     UnboundedReadDetector unboundedReadDetector = new UnboundedReadDetector();

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java
deleted file mode 100644
index f8499f3..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.joda.time.Duration;
-import org.junit.rules.ExternalResource;
-import org.junit.rules.RuleChain;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.TestName;
-import org.junit.rules.TestRule;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-/**
- * A {@link org.junit.Rule} to provide a {@link Pipeline} instance for Spark runner tests.
- */
-public class PipelineRule implements TestRule {
-
-  private final SparkPipelineRule delegate;
-  private final RuleChain chain;
-
-  private PipelineRule(SparkPipelineRule delegate) {
-    TestName testName = new TestName();
-    this.delegate = delegate;
-    this.delegate.setTestName(testName);
-    this.chain = RuleChain.outerRule(testName).around(this.delegate);
-  }
-
-  public static PipelineRule streaming() {
-    return new PipelineRule(new SparkStreamingPipelineRule());
-  }
-
-  public static PipelineRule batch() {
-    return new PipelineRule(new SparkPipelineRule());
-  }
-
-  public Duration batchDuration() {
-    return Duration.millis(delegate.options.getBatchIntervalMillis());
-  }
-
-  public TestSparkPipelineOptions getOptions() {
-    return delegate.options;
-  }
-
-  public Pipeline createPipeline() {
-    return Pipeline.create(delegate.options);
-  }
-
-  @Override
-  public Statement apply(Statement statement, Description description) {
-    return chain.apply(statement, description);
-  }
-
-  private static class SparkStreamingPipelineRule extends SparkPipelineRule {
-
-    private final TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-    @Override
-    protected void before() throws Throwable {
-      super.before();
-      temporaryFolder.create();
-      options.setForceStreaming(true);
-      options.setCheckpointDir(
-          temporaryFolder.newFolder(options.getJobName()).toURI().toURL().toString());
-    }
-
-    @Override
-    protected void after() {
-      temporaryFolder.delete();
-    }
-  }
-
-  private static class SparkPipelineRule extends ExternalResource {
-
-    protected final TestSparkPipelineOptions options =
-        PipelineOptionsFactory.as(TestSparkPipelineOptions.class);
-
-    private TestName testName;
-
-    public void setTestName(TestName testName) {
-      this.testName = testName;
-    }
-
-    @Override
-    protected void before() throws Throwable {
-      options.setRunner(TestSparkRunner.class);
-      options.setEnableSparkMetricSinks(false);
-      options.setJobName(
-          testName != null ? testName.getMethodName() : "test-at-" + System.currentTimeMillis());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
index 36ba863..8112993 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -27,9 +27,11 @@ import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
@@ -95,7 +97,9 @@ public class ProvidedSparkContextTest {
         PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
 
         // Run test from pipeline
-        p.run().waitUntilFinish();
+        PipelineResult result = p.run();
+
+        TestPipeline.verifyPAssertsSucceeded(p, result);
     }
 
     private void testWithInvalidContext(JavaSparkContext jsc) {
@@ -104,11 +108,9 @@ public class ProvidedSparkContextTest {
         Pipeline p = Pipeline.create(options);
         PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
                 .of()));
-        PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+        inputWords.apply(new WordCount.CountWords())
                 .apply(MapElements.via(new WordCount.FormatAsTextFn()));
 
-        PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
-
         try {
             p.run().waitUntilFinish();
             fail("Should throw an exception when The provided Spark context is null or stopped");

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index ea058b2..9009751 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -27,6 +27,8 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Distinct;
@@ -48,7 +50,6 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
-import org.junit.Rule;
 import org.junit.Test;
 
 
@@ -57,15 +58,9 @@ import org.junit.Test;
  */
 public class SparkRunnerDebuggerTest {
 
-  @Rule
-  public final PipelineRule batchPipelineRule = PipelineRule.batch();
-
-  @Rule
-  public final PipelineRule streamingPipelineRule = PipelineRule.streaming();
-
   @Test
   public void debugBatchPipeline() {
-    TestSparkPipelineOptions options = batchPipelineRule.getOptions();
+    PipelineOptions options = PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
     options.setRunner(SparkRunnerDebugger.class);
 
     Pipeline pipeline = Pipeline.create(options);
@@ -111,7 +106,9 @@ public class SparkRunnerDebuggerTest {
 
   @Test
   public void debugStreamingPipeline() {
-    TestSparkPipelineOptions options = streamingPipelineRule.getOptions();
+    TestSparkPipelineOptions options =
+        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
+    options.setForceStreaming(true);
     options.setRunner(SparkRunnerDebugger.class);
 
     Pipeline pipeline = Pipeline.create(options);

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 4e1fd7c..75899f9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -38,7 +38,7 @@ public class SparkRunnerRegistrarTest {
   @Test
   public void testOptions() {
     assertEquals(
-        ImmutableList.of(SparkPipelineOptions.class),
+        ImmutableList.of(SparkPipelineOptions.class, TestSparkPipelineOptions.class),
         new SparkRunnerRegistrar.Options().getPipelineOptions());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java
new file mode 100644
index 0000000..a34c184
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+/**
+ * Category tag for tests that should be run in streaming mode.
+ */
+public interface StreamingTest {}

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
index b0ad972..fff95cb 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
@@ -26,11 +26,10 @@ import com.google.common.collect.ImmutableSet;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
-import org.apache.beam.runners.spark.PipelineRule;
 import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
@@ -48,12 +47,7 @@ public class SparkMetricsSinkTest {
   public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();
 
   @Rule
-  public final PipelineRule pipelineRule = PipelineRule.batch();
-
-  private Pipeline createSparkPipeline() {
-    pipelineRule.getOptions().setEnableSparkMetricSinks(true);
-    return pipelineRule.createPipeline();
-  }
+  public final TestPipeline pipeline = TestPipeline.create();
 
   private void runPipeline() {
     final List<String> words =
@@ -62,8 +56,6 @@ public class SparkMetricsSinkTest {
     final Set<String> expectedCounts =
         ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
 
-    final Pipeline pipeline = createSparkPipeline();
-
     final PCollection<String> output =
         pipeline
         .apply(Create.of(words).withCoder(StringUtf8Coder.of()))

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 7188dc5..adde8d2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -33,9 +33,8 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Before;
 import org.junit.Rule;
@@ -54,7 +53,7 @@ public class AvroPipelineTest {
   public final TemporaryFolder tmpDir = new TemporaryFolder();
 
   @Rule
-  public final PipelineRule pipelineRule = PipelineRule.batch();
+  public final TestPipeline pipeline = TestPipeline.create();
 
   @Before
   public void setUp() throws IOException {
@@ -72,11 +71,10 @@ public class AvroPipelineTest {
     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
     populateGenericFile(Lists.newArrayList(savedRecord), schema);
 
-    Pipeline p = pipelineRule.createPipeline();
-    PCollection<GenericRecord> input = p.apply(
+    PCollection<GenericRecord> input = pipeline.apply(
         AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
     input.apply(AvroIO.writeGenericRecords(schema).to(outputDir.getAbsolutePath()));
-    p.run().waitUntilFinish();
+    pipeline.run();
 
     List<GenericRecord> records = readGenericFile();
     assertEquals(Lists.newArrayList(savedRecord), records);

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 5021744..55ee938 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -30,11 +30,10 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
-import org.apache.beam.runners.spark.PipelineRule;
 import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
@@ -59,7 +58,7 @@ public class NumShardsTest {
   public final TemporaryFolder tmpDir = new TemporaryFolder();
 
   @Rule
-  public final PipelineRule pipelineRule = PipelineRule.batch();
+  public final TestPipeline p = TestPipeline.create();
 
   @Before
   public void setUp() throws IOException {
@@ -69,7 +68,6 @@ public class NumShardsTest {
 
   @Test
   public void testText() throws Exception {
-    Pipeline p = pipelineRule.createPipeline();
     PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
     PCollection<String> output = inputWords.apply(new WordCount.CountWords())
         .apply(MapElements.via(new WordCount.FormatAsTextFn()));

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
index 2b7b87b..8f2e681 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
@@ -15,30 +15,49 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.beam.runners.spark.translation;
 
-import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 
+
 /**
  * Test the RDD storage level defined by user.
  */
 public class StorageLevelTest {
 
+  private static String beamTestPipelineOptions;
+
   @Rule
-  public final transient PipelineRule pipelineRule = PipelineRule.batch();
+  public final TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void init() {
+    beamTestPipelineOptions =
+        System.getProperty(TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS);
+
+    System.setProperty(
+        TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS,
+        beamTestPipelineOptions.replace("]", ", \"--storageLevel=DISK_ONLY\"]"));
+  }
+
+  @AfterClass
+  public static void teardown() {
+    System.setProperty(
+        TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS,
+        beamTestPipelineOptions);
+  }
 
   @Test
   public void test() throws Exception {
-    pipelineRule.getOptions().setStorageLevel("DISK_ONLY");
-    Pipeline pipeline = pipelineRule.createPipeline();
-
     PCollection<String> pCollection = pipeline.apply(Create.of("foo"));
 
     // by default, the Spark runner doesn't cache the RDD if it accessed only one time.

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
index dd52c05..770e0c0 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
@@ -24,13 +24,14 @@ import static org.junit.Assert.assertThat;
 
 import java.io.IOException;
 import java.io.Serializable;
-import org.apache.beam.runners.spark.PipelineRule;
 import org.apache.beam.runners.spark.ReuseSparkContextRule;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.StreamingTest;
 import org.apache.beam.runners.spark.io.CreateStream;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -61,6 +62,7 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 
 
@@ -74,10 +76,11 @@ import org.junit.rules.ExpectedException;
  * {@link org.apache.spark.streaming.dstream.QueueInputDStream} and advance the system's WMs.
  * //TODO: add synchronized/processing time trigger.
  */
+@Category(StreamingTest.class)
 public class CreateStreamTest implements Serializable {
 
   @Rule
-  public final transient PipelineRule pipelineRule = PipelineRule.streaming();
+  public final transient TestPipeline p = TestPipeline.create();
   @Rule
   public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no();
   @Rule
@@ -85,10 +88,9 @@ public class CreateStreamTest implements Serializable {
 
   @Test
   public void testLateDataAccumulating() throws IOException {
-    Pipeline p = pipelineRule.createPipeline();
     Instant instant = new Instant(0);
     CreateStream<Integer> source =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .emptyBatch()
             .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(6)))
             .nextBatch(
@@ -159,9 +161,8 @@ public class CreateStreamTest implements Serializable {
 
   @Test
   public void testDiscardingMode() throws IOException {
-    Pipeline p = pipelineRule.createPipeline();
     CreateStream<String> source =
-        CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
+        CreateStream.of(StringUtf8Coder.of(), batchDuration())
             .nextBatch(
                 TimestampedValue.of("firstPane", new Instant(100)),
                 TimestampedValue.of("alsoFirstPane", new Instant(200)))
@@ -208,10 +209,9 @@ public class CreateStreamTest implements Serializable {
 
   @Test
   public void testFirstElementLate() throws IOException {
-    Pipeline p = pipelineRule.createPipeline();
     Instant lateElementTimestamp = new Instant(-1_000_000);
     CreateStream<String> source =
-        CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
+        CreateStream.of(StringUtf8Coder.of(), batchDuration())
             .emptyBatch()
             .advanceWatermarkForNextBatch(new Instant(0))
             .nextBatch(
@@ -242,10 +242,9 @@ public class CreateStreamTest implements Serializable {
 
   @Test
   public void testElementsAtAlmostPositiveInfinity() throws IOException {
-    Pipeline p = pipelineRule.createPipeline();
     Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
     CreateStream<String> source =
-        CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
+        CreateStream.of(StringUtf8Coder.of(), batchDuration())
             .nextBatch(
                 TimestampedValue.of("foo", endOfGlobalWindow),
                 TimestampedValue.of("bar", endOfGlobalWindow))
@@ -267,13 +266,12 @@ public class CreateStreamTest implements Serializable {
 
   @Test
   public void testMultipleStreams() throws IOException {
-    Pipeline p = pipelineRule.createPipeline();
     CreateStream<String> source =
-        CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
+        CreateStream.of(StringUtf8Coder.of(), batchDuration())
             .nextBatch("foo", "bar")
             .advanceNextBatchWatermarkToInfinity();
     CreateStream<Integer> other =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .nextBatch(1, 2, 3, 4)
             .advanceNextBatchWatermarkToInfinity();
 
@@ -298,10 +296,9 @@ public class CreateStreamTest implements Serializable {
 
   @Test
   public void testFlattenedWithWatermarkHold() throws IOException {
-    Pipeline p = pipelineRule.createPipeline();
     Instant instant = new Instant(0);
     CreateStream<Integer> source1 =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .emptyBatch()
             .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5)))
             .nextBatch(
@@ -310,7 +307,7 @@ public class CreateStreamTest implements Serializable {
                 TimestampedValue.of(3, instant))
             .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(10)));
     CreateStream<Integer> source2 =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .emptyBatch()
             .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(1)))
             .nextBatch(
@@ -323,14 +320,14 @@ public class CreateStreamTest implements Serializable {
             .advanceNextBatchWatermarkToInfinity();
 
     PCollection<Integer> windowed1 = p
-        .apply(source1)
-        .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
+        .apply("CreateStream1", source1)
+        .apply("Window1", Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
             .triggering(AfterWatermark.pastEndOfWindow())
             .accumulatingFiredPanes()
             .withAllowedLateness(Duration.ZERO));
     PCollection<Integer> windowed2 = p
-        .apply(source2)
-        .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
+        .apply("CreateStream2", source2)
+        .apply("Window2", Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
             .triggering(AfterWatermark.pastEndOfWindow())
             .accumulatingFiredPanes()
             .withAllowedLateness(Duration.ZERO));
@@ -357,10 +354,9 @@ public class CreateStreamTest implements Serializable {
    */
   @Test
   public void testMultiOutputParDo() throws IOException {
-    Pipeline p = pipelineRule.createPipeline();
     Instant instant = new Instant(0);
     CreateStream<Integer> source1 =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .emptyBatch()
             .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5)))
             .nextBatch(
@@ -397,7 +393,7 @@ public class CreateStreamTest implements Serializable {
   @Test
   public void testElementAtPositiveInfinityThrows() {
     CreateStream<Integer> source =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .nextBatch(TimestampedValue.of(-1, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L)))
             .advanceNextBatchWatermarkToInfinity();
     thrown.expect(IllegalArgumentException.class);
@@ -407,7 +403,7 @@ public class CreateStreamTest implements Serializable {
   @Test
   public void testAdvanceWatermarkNonMonotonicThrows() {
     CreateStream<Integer> source =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .advanceWatermarkForNextBatch(new Instant(0L));
     thrown.expect(IllegalArgumentException.class);
     source
@@ -418,9 +414,14 @@ public class CreateStreamTest implements Serializable {
   @Test
   public void testAdvanceWatermarkEqualToPositiveInfinityThrows() {
     CreateStream<Integer> source =
-        CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+        CreateStream.of(VarIntCoder.of(), batchDuration())
             .advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L));
     thrown.expect(IllegalArgumentException.class);
     source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
+
+  private Duration batchDuration() {
+    return Duration.millis(
+        (p.getOptions().as(SparkPipelineOptions.class)).getBatchIntervalMillis());
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 33571f0..584edac 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -28,15 +28,16 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.spark.PipelineRule;
 import org.apache.beam.runners.spark.ReuseSparkContextRule;
 import org.apache.beam.runners.spark.SparkPipelineResult;
 import org.apache.beam.runners.spark.TestSparkPipelineOptions;
+import org.apache.beam.runners.spark.TestSparkRunner;
 import org.apache.beam.runners.spark.UsesCheckpointRecovery;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
@@ -53,6 +54,7 @@ import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -81,11 +83,12 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-
+import org.junit.rules.TemporaryFolder;
 
 /**
  * Tests DStream recovery from checkpoint.
@@ -96,24 +99,34 @@ import org.junit.experimental.categories.Category;
  * {@link Metrics} values that are expected to resume from previous count and a side-input that is
  * expected to recover as well.
  */
-public class ResumeFromCheckpointStreamingTest {
+public class ResumeFromCheckpointStreamingTest implements Serializable {
   private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER =
       new EmbeddedKafkaCluster.EmbeddedZookeeper();
   private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER =
       new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new Properties());
   private static final String TOPIC = "kafka_beam_test_topic";
 
+  private transient TemporaryFolder temporaryFolder;
+
   @Rule
   public final transient ReuseSparkContextRule noContextReuse = ReuseSparkContextRule.no();
-  @Rule
-  public final transient PipelineRule pipelineRule = PipelineRule.streaming();
 
   @BeforeClass
-  public static void init() throws IOException {
+  public static void setup() throws IOException {
     EMBEDDED_ZOOKEEPER.startup();
     EMBEDDED_KAFKA_CLUSTER.startup();
   }
 
+  @Before
+  public void init() {
+    temporaryFolder = new TemporaryFolder();
+    try {
+      temporaryFolder.create();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private static void produce(Map<String, Instant> messages) {
     Properties producerProps = new Properties();
     producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps());
@@ -148,7 +161,7 @@ public class ResumeFromCheckpointStreamingTest {
             .build();
 
     // first run should expect EOT matching the last injected element.
-    SparkPipelineResult res = run(pipelineRule, Optional.of(new Instant(400)), 0);
+    SparkPipelineResult res = run(Optional.of(new Instant(400)), 0);
 
     assertThat(res.metrics().queryMetrics(metricsFilter).counters(),
         hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(),
@@ -169,7 +182,7 @@ public class ResumeFromCheckpointStreamingTest {
     ));
 
     // recovery should resume from last read offset, and read the second batch of input.
-    res = runAgain(pipelineRule, 1);
+    res = runAgain(1);
     // assertions 2:
     assertThat(res.metrics().queryMetrics(metricsFilter).counters(),
         hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(),
@@ -209,18 +222,18 @@ public class ResumeFromCheckpointStreamingTest {
         String.format("Found %d failed assertions.", failedAssertions),
         failedAssertions,
         is(0L));
-
   }
 
-  private SparkPipelineResult runAgain(PipelineRule pipelineRule, int expectedAssertions) {
+  private SparkPipelineResult runAgain(int expectedAssertions) {
     // sleep before next run.
     Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
-    return run(pipelineRule, Optional.<Instant>absent(), expectedAssertions);
+    return run(Optional.<Instant>absent(), expectedAssertions);
   }
 
   @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
-  private static SparkPipelineResult run(
-      PipelineRule pipelineRule, Optional<Instant> stopWatermarkOption, int expectedAssertions) {
+  private SparkPipelineResult run(
+      Optional<Instant> stopWatermarkOption,
+      int expectedAssertions) {
     KafkaIO.Read<String, Instant> read = KafkaIO.<String, Instant>read()
         .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
         .withTopics(Collections.singletonList(TOPIC))
@@ -242,15 +255,21 @@ public class ResumeFromCheckpointStreamingTest {
           }
         });
 
-    TestSparkPipelineOptions options = pipelineRule.getOptions();
+    TestSparkPipelineOptions options =
+        PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
     options.setSparkMaster("local[*]");
     options.setCheckpointDurationMillis(options.getBatchIntervalMillis());
     options.setExpectedAssertions(expectedAssertions);
+    options.setRunner(TestSparkRunner.class);
+    options.setEnableSparkMetricSinks(false);
+    options.setForceStreaming(true);
+    options.setCheckpointDir(temporaryFolder.getRoot().getPath());
     // timeout is per execution so it can be injected by the caller.
     if (stopWatermarkOption.isPresent()) {
       options.setStopPipelineWatermark(stopWatermarkOption.get().getMillis());
     }
-    Pipeline p = pipelineRule.createPipeline();
+
+    Pipeline p = Pipeline.create(options);
 
     PCollection<String> expectedCol =
         p.apply(Create.of(ImmutableList.of("side1", "side2")).withCoder(StringUtf8Coder.of()));
@@ -354,5 +373,4 @@ public class ResumeFromCheckpointStreamingTest {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
index 5a4b1b5..df6027c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
@@ -23,9 +23,7 @@ import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
-import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.runners.spark.TestSparkPipelineOptions;
-import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.runners.spark.StreamingTest;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Source;
@@ -34,10 +32,11 @@ import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.metrics.SourceMetrics;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
-
+import org.junit.experimental.categories.Category;
 
 /**
  * Verify metrics support for {@link Source Sources} in streaming pipelines.
@@ -47,14 +46,11 @@ public class StreamingSourceMetricsTest implements Serializable {
 
   // Force streaming pipeline using pipeline rule.
   @Rule
-  public final transient PipelineRule pipelineRule = PipelineRule.streaming();
+  public final transient TestPipeline pipeline = TestPipeline.create();
 
   @Test
+  @Category(StreamingTest.class)
   public void testUnboundedSourceMetrics() {
-    TestSparkPipelineOptions options = pipelineRule.getOptions();
-
-    Pipeline pipeline = Pipeline.create(options);
-
     final long numElements = 1000;
 
     pipeline.apply(

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 6b15f0d..6fa7a5a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -460,7 +460,7 @@ public class Pipeline {
   private Set<String> usedFullNames = new HashSet<>();
   private CoderRegistry coderRegistry;
   private final List<String> unstableNames = new ArrayList<>();
-  private final PipelineOptions defaultOptions;
+  protected final PipelineOptions defaultOptions;
 
   protected Pipeline(PipelineOptions options) {
     this.defaultOptions = options;

http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index d8fe51d..2d34b22 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.TreeNode;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
@@ -244,8 +245,11 @@ public class TestPipeline extends Pipeline implements TestRule {
     }
   }
 
-  static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
+  /** System property used to set {@link TestPipelineOptions}. */
+  public static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
+
   static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner";
+
   private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
       ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
 
@@ -331,7 +335,7 @@ public class TestPipeline extends Pipeline implements TestRule {
     try {
       enforcement.get().beforePipelineExecution();
       pipelineResult = super.run();
-      verifyPAssertsSucceeded(pipelineResult);
+      verifyPAssertsSucceeded(this, pipelineResult);
     } catch (RuntimeException exc) {
       Throwable cause = exc.getCause();
       if (cause instanceof AssertionError) {
@@ -377,6 +381,15 @@ public class TestPipeline extends Pipeline implements TestRule {
     return this;
   }
 
+  @VisibleForTesting
+  @Override
+  /**
+   * Get this pipeline's options.
+   */
+  public PipelineOptions getOptions() {
+    return defaultOptions;
+  }
+
   @Override
   public String toString() {
     return "TestPipeline#" + getOptions().as(ApplicationNameOptions.class).getAppName();
@@ -501,9 +514,9 @@ public class TestPipeline extends Pipeline implements TestRule {
    * <p>Note this only runs for runners which support Metrics. Runners which do not should verify
    * this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001</p>
    */
-  private void verifyPAssertsSucceeded(PipelineResult pipelineResult) {
+  public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pipelineResult) {
     if (MetricsEnvironment.isMetricsSupported()) {
-      long expectedNumberOfAssertions = (long) PAssert.countAsserts(this);
+      long expectedNumberOfAssertions = (long) PAssert.countAsserts(pipeline);
 
       long successfulAssertions = 0;
       Iterable<MetricResult<Long>> successCounterResults =


[3/6] beam git commit: [BEAM-1763] Verify PAssert execution in runners which support metrics.

Posted by av...@apache.org.
[BEAM-1763] Verify PAssert execution in runners which support metrics.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95ade45e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95ade45e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95ade45e

Branch: refs/heads/master
Commit: 95ade45eced4787eb67a9d4d13dae48ffb176919
Parents: 48c8ed1
Author: Aviem Zur <av...@gmail.com>
Authored: Tue May 2 19:00:29 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300

----------------------------------------------------------------------
 .../apache/beam/runners/flink/FlinkRunner.java  |  3 ++
 .../beam/runners/spark/TestSparkRunner.java     | 47 --------------------
 .../ResumeFromCheckpointStreamingTest.java      | 12 +++--
 .../beam/sdk/metrics/MetricsEnvironment.java    |  5 +++
 .../apache/beam/sdk/testing/TestPipeline.java   | 46 ++++++++++++++++---
 5 files changed, 57 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 181ffda..a5972ef 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -31,6 +31,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -103,6 +104,8 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
   public PipelineResult run(Pipeline pipeline) {
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
 
+    MetricsEnvironment.setMetricsSupported(true);
+
     LOG.info("Executing pipeline using FlinkRunner.");
 
     FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);

http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 10e98b8..1e67813 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -40,15 +40,11 @@ import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
-import org.apache.beam.sdk.metrics.MetricResult;
-import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -116,8 +112,6 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
     }
     SparkPipelineResult result = null;
 
-    int expectedNumberOfAssertions = PAssert.countAsserts(pipeline);
-
     // clear state of Aggregators, Metrics and Watermarks if exists.
     AggregatorsAccumulator.clear();
     MetricsAccumulator.clear();
@@ -137,47 +131,6 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
             String.format("Finish state %s is not allowed.", finishState),
             finishState,
             isOneOf(PipelineResult.State.STOPPED, PipelineResult.State.DONE));
-
-        // validate assertion succeeded (at least once).
-        long successAssertions = 0;
-        Iterable<MetricResult<Long>> counterResults = result.metrics().queryMetrics(
-            MetricsFilter.builder()
-                .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
-                .build()).counters();
-        for (MetricResult<Long> counter : counterResults) {
-          if (counter.attempted().longValue() > 0) {
-            successAssertions++;
-          }
-        }
-        Integer expectedAssertions = testSparkPipelineOptions.getExpectedAssertions() != null
-            ? testSparkPipelineOptions.getExpectedAssertions() : expectedNumberOfAssertions;
-        assertThat(
-            String.format(
-                "Expected %d successful assertions, but found %d.",
-                expectedAssertions, successAssertions),
-            successAssertions,
-            is(expectedAssertions.longValue()));
-        // validate assertion didn't fail.
-        long failedAssertions = 0;
-        Iterable<MetricResult<Long>> failCounterResults = result.metrics().queryMetrics(
-            MetricsFilter.builder()
-                .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.FAILURE_COUNTER))
-                .build()).counters();
-        for (MetricResult<Long> counter : failCounterResults) {
-          if (counter.attempted().longValue() > 0) {
-            failedAssertions++;
-          }
-        }
-        assertThat(
-            String.format("Found %d failed assertions.", failedAssertions),
-            failedAssertions,
-            is(0L));
-
-        LOG.info(
-            String.format(
-                "Successfully asserted pipeline %s with %d successful assertions.",
-                testSparkPipelineOptions.getJobName(),
-                successAssertions));
       } finally {
         try {
           // cleanup checkpoint dir.

http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 7d7fd08..33571f0 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -180,7 +180,8 @@ public class ResumeFromCheckpointStreamingTest {
     long successAssertions = 0;
     Iterable<MetricResult<Long>> counterResults = res.metrics().queryMetrics(
         MetricsFilter.builder()
-            .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
+            .addNameFilter(
+                MetricNameFilter.named(PAssertWithoutFlatten.class, PAssert.SUCCESS_COUNTER))
             .build()).counters();
     for (MetricResult<Long> counter : counterResults) {
       if (counter.attempted().longValue() > 0) {
@@ -196,7 +197,8 @@ public class ResumeFromCheckpointStreamingTest {
     long failedAssertions = 0;
     Iterable<MetricResult<Long>> failCounterResults = res.metrics().queryMetrics(
         MetricsFilter.builder()
-            .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.FAILURE_COUNTER))
+            .addNameFilter(MetricNameFilter.named(
+                PAssertWithoutFlatten.class, PAssert.FAILURE_COUNTER))
             .build()).counters();
     for (MetricResult<Long> counter : failCounterResults) {
       if (counter.attempted().longValue() > 0) {
@@ -330,8 +332,10 @@ public class ResumeFromCheckpointStreamingTest {
     }
 
     private static class AssertDoFn<T> extends DoFn<Iterable<T>, Void> {
-      private final Counter success = Metrics.counter(PAssert.class, PAssert.SUCCESS_COUNTER);
-      private final Counter failure = Metrics.counter(PAssert.class, PAssert.FAILURE_COUNTER);
+      private final Counter success =
+          Metrics.counter(PAssertWithoutFlatten.class, PAssert.SUCCESS_COUNTER);
+      private final Counter failure =
+          Metrics.counter(PAssertWithoutFlatten.class, PAssert.FAILURE_COUNTER);
       private final T[] expected;
 
       AssertDoFn(T[] expected) {

http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index 2942578..a4b311f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -70,6 +70,11 @@ public class MetricsEnvironment {
     METRICS_SUPPORTED.set(supported);
   }
 
+  /** Indicates whether metrics reporting is supported. */
+  public static boolean isMetricsSupported() {
+    return METRICS_SUPPORTED.get();
+  }
+
   /**
    * Set the {@link MetricsContainer} for the current thread.
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 868dcbd..d8fe51d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -18,6 +18,8 @@
 package org.apache.beam.sdk.testing;
 
 import static com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.TreeNode;
@@ -41,6 +43,10 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
@@ -186,8 +192,8 @@ public class TestPipeline extends Pipeline implements TestRule {
           if (pipelineRunSucceeded() && !visitedAll(pipelineNodes)) {
             final boolean hasDanglingPAssert =
                 FluentIterable.from(pipelineNodes)
-                              .filter(Predicates.not(Predicates.in(runVisitedNodes)))
-                              .anyMatch(isPAssertNode);
+                    .filter(Predicates.not(Predicates.in(runVisitedNodes)))
+                    .anyMatch(isPAssertNode);
             if (hasDanglingPAssert) {
               throw new AbandonedNodeException("The pipeline contains abandoned PAssert(s).");
             } else {
@@ -319,12 +325,13 @@ public class TestPipeline extends Pipeline implements TestRule {
     checkState(
         enforcement.isPresent(),
         "Is your TestPipeline declaration missing a @Rule annotation? Usage: "
-        + "@Rule public final transient TestPipeline pipeline = TestPipeline.create();");
+            + "@Rule public final transient TestPipeline pipeline = TestPipeline.create();");
 
     final PipelineResult pipelineResult;
     try {
       enforcement.get().beforePipelineExecution();
       pipelineResult = super.run();
+      verifyPAssertsSucceeded(pipelineResult);
     } catch (RuntimeException exc) {
       Throwable cause = exc.getCause();
       if (cause instanceof AssertionError) {
@@ -385,8 +392,8 @@ public class TestPipeline extends Pipeline implements TestRule {
           Strings.isNullOrEmpty(beamTestPipelineOptions)
               ? PipelineOptionsFactory.create()
               : PipelineOptionsFactory.fromArgs(
-                      MAPPER.readValue(beamTestPipelineOptions, String[].class))
-                  .as(TestPipelineOptions.class);
+              MAPPER.readValue(beamTestPipelineOptions, String[].class))
+              .as(TestPipelineOptions.class);
 
       options.as(ApplicationNameOptions.class).setAppName(getAppName());
       // If no options were specified, set some reasonable defaults
@@ -488,6 +495,35 @@ public class TestPipeline extends Pipeline implements TestRule {
     return firstInstanceAfterTestPipeline;
   }
 
+  /**
+   * Verifies all {{@link PAssert PAsserts}} in the pipeline have been executed and were successful.
+   *
+   * <p>Note this only runs for runners which support Metrics. Runners which do not should verify
+   * this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001</p>
+   */
+  private void verifyPAssertsSucceeded(PipelineResult pipelineResult) {
+    if (MetricsEnvironment.isMetricsSupported()) {
+      long expectedNumberOfAssertions = (long) PAssert.countAsserts(this);
+
+      long successfulAssertions = 0;
+      Iterable<MetricResult<Long>> successCounterResults =
+          pipelineResult.metrics().queryMetrics(
+              MetricsFilter.builder()
+                  .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
+                  .build())
+              .counters();
+      for (MetricResult<Long> counter : successCounterResults) {
+        if (counter.attempted() > 0) {
+          successfulAssertions++;
+        }
+      }
+
+      assertThat(String
+          .format("Expected %d successful assertions, but found %d.", expectedNumberOfAssertions,
+              successfulAssertions), successfulAssertions, is(expectedNumberOfAssertions));
+    }
+  }
+
   private static class IsEmptyVisitor extends PipelineVisitor.Defaults {
     private boolean empty = true;
 


[6/6] beam git commit: This closes #2729

Posted by av...@apache.org.
This closes #2729


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b73918b5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b73918b5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b73918b5

Branch: refs/heads/master
Commit: b73918b55ab06e5a47ef9dc33ae3dbaebaed330a
Parents: 48c8ed1 8d91a97
Author: Aviem Zur <av...@gmail.com>
Authored: Thu May 4 21:10:14 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 21:10:14 2017 +0300

----------------------------------------------------------------------
 .../beam/runners/core/SideInputHandler.java     |  10 +-
 .../apache/beam/runners/flink/FlinkRunner.java  |   3 +
 .../FlinkStreamingTransformTranslators.java     |  26 +++++
 .../wrappers/streaming/DoFnOperator.java        |  27 ++++-
 .../streaming/state/FlinkStateInternals.java    |   2 +
 runners/spark/pom.xml                           |  47 +++++++-
 .../runners/spark/SparkRunnerRegistrar.java     |   4 +-
 .../beam/runners/spark/TestSparkRunner.java     |  47 --------
 .../apache/beam/runners/spark/CacheTest.java    |  12 +-
 .../beam/runners/spark/ForceStreamingTest.java  |  18 +--
 .../apache/beam/runners/spark/PipelineRule.java | 109 -------------------
 .../runners/spark/ProvidedSparkContextTest.java |  10 +-
 .../runners/spark/SparkRunnerDebuggerTest.java  |  15 +--
 .../runners/spark/SparkRunnerRegistrarTest.java |   2 +-
 .../beam/runners/spark/StreamingTest.java       |  23 ++++
 .../metrics/sink/SparkMetricsSinkTest.java      |  12 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |  10 +-
 .../beam/runners/spark/io/NumShardsTest.java    |   6 +-
 .../spark/translation/StorageLevelTest.java     |  31 +++++-
 .../translation/streaming/CreateStreamTest.java |  53 ++++-----
 .../ResumeFromCheckpointStreamingTest.java      |  62 +++++++----
 .../streaming/StreamingSourceMetricsTest.java   |  14 +--
 .../main/java/org/apache/beam/sdk/Pipeline.java |   2 +-
 .../beam/sdk/metrics/MetricsEnvironment.java    |   5 +
 .../apache/beam/sdk/testing/TestPipeline.java   |  61 ++++++++++-
 25 files changed, 330 insertions(+), 281 deletions(-)
----------------------------------------------------------------------



[5/6] beam git commit: [BEAM-1726] Fix RuntimeException throwing in FlinkStateInternals

Posted by av...@apache.org.
[BEAM-1726] Fix RuntimeException throwing in FlinkStateInternals


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c44935e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c44935e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c44935e

Branch: refs/heads/master
Commit: 7c44935e1c47cce2ecfe842e37c2cf89f48d8583
Parents: 5555040
Author: Aviem Zur <av...@gmail.com>
Authored: Sat Mar 18 15:21:45 2017 +0200
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300

----------------------------------------------------------------------
 .../translation/wrappers/streaming/state/FlinkStateInternals.java  | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7c44935e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index c033be6..cea6e0f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -592,6 +592,8 @@ public class FlinkStateInternals<K> implements StateInternals {
         }
         current = combineFn.addInput(current, value);
         state.update(current);
+      } catch (RuntimeException re) {
+        throw re;
       } catch (Exception e) {
         throw new RuntimeException("Error adding to state." , e);
       }


[4/6] beam git commit: [BEAM-1726] Fix empty side inputs in Flink Streaming Runner

Posted by av...@apache.org.
[BEAM-1726] Fix empty side inputs in Flink Streaming Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5555040d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5555040d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5555040d

Branch: refs/heads/master
Commit: 5555040d935c67f5cd48f2ffe2721a07fe6e0a50
Parents: 95ade45
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sat Mar 18 12:16:06 2017 +0100
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300

----------------------------------------------------------------------
 .../beam/runners/core/SideInputHandler.java     | 10 ++++----
 .../wrappers/streaming/DoFnOperator.java        | 27 +++++++++++++++++++-
 2 files changed, 31 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5555040d/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 5c67148..b29f9d0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -161,11 +162,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
   @Override
   public <T> T get(PCollectionView<T> sideInput, BoundedWindow window) {
 
-    if (!isReady(sideInput, window)) {
-      throw new IllegalStateException(
-          "Side input " + sideInput + " is not ready for window " + window);
-    }
-
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> windowCoder =
         (Coder<BoundedWindow>) sideInput
@@ -181,6 +177,10 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
 
     Iterable<WindowedValue<?>> elements = state.read();
 
+    if (elements == null) {
+      elements = Collections.emptyList();
+    }
+
     return sideInput.getViewFn().apply(elements);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5555040d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index c624036..16bf5d2 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -463,7 +463,32 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   @Override
   public void processWatermark2(Watermark mark) throws Exception {
-    // ignore watermarks from the side-input input
+    if (mark.getTimestamp() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+      // this means we will never see any more side input
+      pushbackDoFnRunner.startBundle();
+
+      BagState<WindowedValue<InputT>> pushedBack =
+          pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+      Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
+      if (pushedBackContents != null) {
+        for (WindowedValue<InputT> elem : pushedBackContents) {
+
+          // we need to set the correct key in case the operator is
+          // a (keyed) window operator
+          setKeyContextElement1(new StreamRecord<>(elem));
+
+          doFnRunner.processElement(elem);
+        }
+      }
+
+      setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+
+      pushbackDoFnRunner.finishBundle();
+
+      // maybe output a new watermark
+      processWatermark1(new Watermark(currentInputWatermark));
+    }
   }
 
   @Override


[2/6] beam git commit: [BEAM-1726] Fix Flatten with input copies in Flink Streaming Runner

Posted by av...@apache.org.
[BEAM-1726] Fix Flatten with input copies in Flink Streaming Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e2bb180
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e2bb180
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e2bb180

Branch: refs/heads/master
Commit: 0e2bb1808350cbebf771d0971deb06787732800d
Parents: 7c44935
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Mar 19 07:49:08 2017 +0100
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 26 ++++++++++++++++++++
 1 file changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0e2bb180/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index c024493..7339c01 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -966,10 +966,36 @@ class FlinkStreamingTransformTranslators {
 
       } else {
         DataStream<T> result = null;
+
+        // Determine DataStreams that we use as input several times. For those, we need to uniquify
+        // input streams because Flink seems to swallow watermarks when we have a union of one and
+        // the same stream.
+        Map<DataStream<T>, Integer> duplicates = new HashMap<>();
+        for (PValue input : allInputs.values()) {
+          DataStream<T> current = context.getInputDataStream(input);
+          Integer oldValue = duplicates.put(current, 1);
+          if (oldValue != null) {
+            duplicates.put(current, oldValue + 1);
+          }
+        }
+
         for (PValue input : allInputs.values()) {
           DataStream<T> current = context.getInputDataStream(input);
+
+          final Integer timesRequired = duplicates.get(current);
+          if (timesRequired > 1) {
+            current = current.flatMap(new FlatMapFunction<T, T>() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public void flatMap(T t, Collector<T> collector) throws Exception {
+                collector.collect(t);
+              }
+            });
+          }
           result = (result == null) ? current : result.union(current);
         }
+
         context.setOutputDataStream(context.getOutput(transform), result);
       }
     }