You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/11/15 18:09:20 UTC

[1/2] incubator-beam git commit: [BEAM-891] fix build occasionally fails on IndexOutOfBoundsException.

Repository: incubator-beam
Updated Branches:
  refs/heads/master 47646d641 -> 503f26f44


[BEAM-891] fix build occasionally fails on IndexOutOfBoundsException.

Moved "TestPipelineOptions#withTmpCheckpointDir" to TestPipelineOptionsForStreaming.
Removed an unused member in ProvidedSparkContextTest.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0331dd1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0331dd1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0331dd1c

Branch: refs/heads/master
Commit: 0331dd1cd75e60484f0b15723e4e7edc280a4d12
Parents: 47646d6
Author: Stas Levin <st...@gmail.com>
Authored: Thu Nov 10 13:32:51 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Nov 15 19:52:12 2016 +0200

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  3 +-
 .../runners/spark/SparkPipelineOptions.java     |  4 +-
 .../spark/translation/SparkRuntimeContext.java  |  4 +-
 .../runners/spark/ProvidedSparkContextTest.java | 26 ++++-----
 .../metrics/sink/NamedAggregatorsTest.java      | 13 +++--
 .../beam/runners/spark/io/AvroPipelineTest.java | 12 ++---
 .../beam/runners/spark/io/NumShardsTest.java    | 10 ++--
 .../io/hadoop/HadoopFileFormatPipelineTest.java | 12 ++---
 .../spark/translation/SideEffectsTest.java      | 11 ++--
 .../streaming/EmptyStreamAssertionTest.java     |  4 +-
 .../streaming/FlattenStreamingTest.java         |  4 +-
 .../streaming/KafkaStreamingTest.java           |  4 +-
 .../ResumeFromCheckpointStreamingTest.java      |  4 +-
 .../streaming/SimpleStreamingWordCountTest.java |  6 +--
 .../utils/TestOptionsForStreaming.java          | 55 --------------------
 .../streaming/utils/TestPipelineOptions.java    | 25 +++++++++
 .../utils/TestPipelineOptionsForStreaming.java  | 44 ++++++++++++++++
 17 files changed, 132 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 1e4a720..4c5b3f5 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -82,7 +82,8 @@
                     <beamTestPipelineOptions>
                       [
                         "--runner=TestSparkRunner",
-                        "--streaming=false"
+                        "--streaming=false",
+                        "--enableSparkMetricSinks=false"
                       ]
                     </beamTestPipelineOptions>
                     <beam.spark.test.reuseSparkContext>true</beam.spark.test.reuseSparkContext>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 5168c6c..b1ebde9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -88,8 +88,8 @@ public interface SparkPipelineOptions
 
   @Description("Enable/disable sending aggregator values to Spark's metric sinks")
   @Default.Boolean(true)
-  Boolean getEnableSparkSinks();
-  void setEnableSparkSinks(Boolean enableSparkSinks);
+  Boolean getEnableSparkMetricSinks();
+  void setEnableSparkMetricSinks(Boolean enableSparkMetricSinks);
 
   @Description("If the spark runner will be initialized with a provided Spark Context. "
       + "The Spark Context should be provided with SparkContextOptions.")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 181a111..564db39 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -86,11 +86,11 @@ public class SparkRuntimeContext implements Serializable {
     final Accumulator<NamedAggregators> accum = AccumulatorSingleton.getInstance(jsc);
     final NamedAggregators initialValue = accum.value();
 
-    if (opts.getEnableSparkSinks()) {
+    if (opts.getEnableSparkMetricSinks()) {
       final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
       final AggregatorMetricSource aggregatorMetricSource =
           new AggregatorMetricSource(opts.getAppName(), initialValue);
-      // in case the context was not cleared
+      // re-register the metrics in case of context re-use
       metricsSystem.removeSource(aggregatorMetricSource);
       metricsSystem.registerSource(aggregatorMetricSource);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
index c225073..bc337c7 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.spark.examples.WordCount;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -33,6 +34,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.Rule;
 import org.junit.Test;
 
 /**
@@ -48,6 +50,15 @@ public class ProvidedSparkContextTest {
     private static final String PROVIDED_CONTEXT_EXCEPTION =
             "The provided Spark context was not created or was stopped";
 
+    private SparkContextOptions getSparkContextOptions(JavaSparkContext jsc) {
+        final SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
+        options.setRunner(SparkRunner.class);
+        options.setUsesProvidedSparkContext(true);
+        options.setProvidedSparkContext(jsc);
+        options.setEnableSparkMetricSinks(false);
+        return options;
+    }
+
     /**
      * Provide a context and call pipeline run.
      * @throws Exception
@@ -56,10 +67,7 @@ public class ProvidedSparkContextTest {
     public void testWithProvidedContext() throws Exception {
         JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
 
-        SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
-        options.setRunner(SparkRunner.class);
-        options.setUsesProvidedSparkContext(true);
-        options.setProvidedSparkContext(jsc);
+        SparkContextOptions options = getSparkContextOptions(jsc);
 
         Pipeline p = Pipeline.create(options);
         PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
@@ -83,10 +91,7 @@ public class ProvidedSparkContextTest {
     public void testWithNullContext() throws Exception {
         JavaSparkContext jsc = null;
 
-        SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
-        options.setRunner(SparkRunner.class);
-        options.setUsesProvidedSparkContext(true);
-        options.setProvidedSparkContext(jsc);
+        SparkContextOptions options = getSparkContextOptions(jsc);
 
         Pipeline p = Pipeline.create(options);
         PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
@@ -114,10 +119,7 @@ public class ProvidedSparkContextTest {
         // Stop the provided Spark context directly
         jsc.stop();
 
-        SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
-        options.setRunner(SparkRunner.class);
-        options.setUsesProvidedSparkContext(true);
-        options.setProvidedSparkContext(jsc);
+        SparkContextOptions options = getSparkContextOptions(jsc);
 
         Pipeline p = Pipeline.create(options);
         PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
index 194d66a..c220f2b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
@@ -27,11 +27,10 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.examples.WordCount;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -52,9 +51,12 @@ public class NamedAggregatorsTest {
   @Rule
   public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule();
 
+  @Rule
+  public final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+
   private Pipeline createSparkPipeline() {
-    final SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkRunner.class);
+    SparkPipelineOptions options = pipelineOptions.getOptions();
+    options.setEnableSparkMetricSinks(true);
     return Pipeline.create(options);
   }
 
@@ -82,6 +84,9 @@ public class NamedAggregatorsTest {
   @Test
   public void testNamedAggregators() throws Exception {
 
+    // don't reuse context in this test, as is tends to mess up Spark's MetricsSystem thread-safety
+    System.setProperty("beam.spark.test.reuseSparkContext", "false");
+
     assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue()));
 
     runPipeline();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index fc53dbd..396a30d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -33,11 +33,10 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Before;
 import org.junit.Rule;
@@ -55,6 +54,9 @@ public class AvroPipelineTest {
   @Rule
   public final TemporaryFolder tmpDir = new TemporaryFolder();
 
+  @Rule
+  public final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+
   @Before
   public void setUp() throws IOException {
     inputFile = tmpDir.newFile("test.avro");
@@ -71,9 +73,7 @@ public class AvroPipelineTest {
     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
     populateGenericFile(Lists.newArrayList(savedRecord), schema);
 
-    PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(SparkRunner.class);
-    Pipeline p = Pipeline.create(options);
+    Pipeline p = Pipeline.create(pipelineOptions.getOptions());
     PCollection<GenericRecord> input = p.apply(
         AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
     input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 0ff30ac..922046c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -31,12 +31,11 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.examples.WordCount;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
@@ -60,6 +59,9 @@ public class NumShardsTest {
   @Rule
   public final TemporaryFolder tmpDir = new TemporaryFolder();
 
+  @Rule
+  public final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+
   @Before
   public void setUp() throws IOException {
     outputDir = tmpDir.newFolder("out");
@@ -68,9 +70,7 @@ public class NumShardsTest {
 
   @Test
   public void testText() throws Exception {
-    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkRunner.class);
-    Pipeline p = Pipeline.create(options);
+    Pipeline p = Pipeline.create(pipelineOptions.getOptions());
     PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
     PCollection<String> output = inputWords.apply(new WordCount.CountWords())
         .apply(MapElements.via(new WordCount.FormatAsTextFn()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index aa1e1ce..628d8b6 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -22,12 +22,11 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
-import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.WritableCoder;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.hadoop.conf.Configuration;
@@ -54,6 +53,9 @@ public class HadoopFileFormatPipelineTest {
   private File outputFile;
 
   @Rule
+  public final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+
+  @Rule
   public final TemporaryFolder tmpDir = new TemporaryFolder();
 
   @Before
@@ -67,9 +69,7 @@ public class HadoopFileFormatPipelineTest {
   public void testSequenceFile() throws Exception {
     populateFile();
 
-    PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(SparkRunner.class);
-    Pipeline p = Pipeline.create(options);
+    Pipeline p = Pipeline.create(pipelineOptions.getOptions());
     @SuppressWarnings("unchecked")
     Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
         (Class<? extends FileInputFormat<IntWritable, Text>>)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index 1304e12..7d39d89 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -25,13 +25,13 @@ import static org.junit.Assert.fail;
 import java.io.Serializable;
 import java.net.URI;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringDelegateCoder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.junit.Rule;
 import org.junit.Test;
 
 /**
@@ -42,11 +42,12 @@ public class SideEffectsTest implements Serializable {
   static class UserException extends RuntimeException {
   }
 
+  @Rule
+  public transient final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+
   @Test
   public void test() throws Exception {
-    SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
-    options.setRunner(SparkRunner.class);
-    Pipeline p = Pipeline.create(options);
+    Pipeline p = Pipeline.create(pipelineOptions.getOptions());
 
     p.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
index 3e95b4d..2a38e30 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
@@ -25,7 +25,7 @@ import java.util.Collections;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -51,7 +51,7 @@ public class EmptyStreamAssertionTest implements Serializable {
   public TemporaryFolder checkpointParentDir = new TemporaryFolder();
 
   @Rule
-  public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
+  public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming();
 
   @Test
   public void testAssertion() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index 319b5e9..bd544e9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -23,7 +23,7 @@ import java.util.List;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.Create;
@@ -57,7 +57,7 @@ public class FlattenStreamingTest {
   public TemporaryFolder checkpointParentDir = new TemporaryFolder();
 
   @Rule
-  public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
+  public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming();
 
   @Test
   public void testFlattenUnbounded() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 29e4609..5841331 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -28,7 +28,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
 import org.apache.beam.runners.spark.translation.streaming.utils.KafkaWriteOnBatchCompleted;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -71,7 +71,7 @@ public class KafkaStreamingTest {
   public TemporaryFolder checkpointParentDir = new TemporaryFolder();
 
   @Rule
-  public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
+  public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming();
 
   @Test
   public void testEarliest2Topics() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 34ffbe2..e345831 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -32,7 +32,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
@@ -80,7 +80,7 @@ public class ResumeFromCheckpointStreamingTest {
   public TemporaryFolder checkpointParentDir = new TemporaryFolder();
 
   @Rule
-  public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
+  public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming();
 
   @BeforeClass
   public static void init() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index edba507..bdfc24f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -25,7 +25,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -47,7 +47,7 @@ public class SimpleStreamingWordCountTest implements Serializable {
   public TemporaryFolder checkpointParentDir = new TemporaryFolder();
 
   @Rule
-  public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
+  public TestPipelineOptionsForStreaming pipelineOptions = new TestPipelineOptionsForStreaming();
 
   private static final String[] WORDS = {"hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
 
@@ -62,7 +62,7 @@ public class SimpleStreamingWordCountTest implements Serializable {
 
   @Test
   public void testFixedWindows() throws Exception {
-    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
+    SparkPipelineOptions options = pipelineOptions.withTmpCheckpointDir(checkpointParentDir);
 
     // override defaults
     options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
deleted file mode 100644
index 2861d9f..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation.streaming.utils;
-
-
-import java.io.IOException;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.rules.ExternalResource;
-import org.junit.rules.TemporaryFolder;
-
-
-
-/**
- * A rule to create a common {@link SparkPipelineOptions} for testing streaming pipelines.
- */
-public class TestOptionsForStreaming extends ExternalResource {
-  private final SparkPipelineOptions options =
-      PipelineOptionsFactory.as(SparkPipelineOptions.class);
-
-  @Override
-  protected void before() throws Throwable {
-    options.setRunner(SparkRunner.class);
-    options.setStreaming(true);
-    options.setTimeout(1000L);
-  }
-
-  public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent)
-      throws IOException {
-    // tests use JUnit's TemporaryFolder path in the form of: /.../junit/...
-    options.setCheckpointDir(parent.newFolder(options.getJobName()).toURI().toURL().toString());
-    return options;
-  }
-
-  public SparkPipelineOptions getOptions() {
-    return options;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java
new file mode 100644
index 0000000..ccfb29e
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java
@@ -0,0 +1,25 @@
+package org.apache.beam.runners.spark.translation.streaming.utils;
+
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.rules.ExternalResource;
+
+/**
+ * A rule to create a common {@link SparkPipelineOptions} test options for spark-runner.
+ */
+public class TestPipelineOptions extends ExternalResource {
+
+  protected final SparkPipelineOptions options =
+      PipelineOptionsFactory.as(SparkPipelineOptions.class);
+
+  @Override
+  protected void before() throws Throwable {
+    options.setRunner(SparkRunner.class);
+    options.setEnableSparkMetricSinks(false);
+  }
+
+  public SparkPipelineOptions getOptions() {
+    return options;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java
new file mode 100644
index 0000000..3d178ae
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation.streaming.utils;
+
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.junit.rules.TemporaryFolder;
+import java.io.IOException;
+
+
+/**
+ * A rule to create a common {@link SparkPipelineOptions} for testing streaming pipelines.
+ */
+public class TestPipelineOptionsForStreaming extends TestPipelineOptions {
+
+  @Override
+  protected void before() throws Throwable {
+    super.before();
+    options.setTimeout(1000L);
+    options.setStreaming(true);
+  }
+
+  public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent)
+      throws IOException {
+    // tests use JUnit's TemporaryFolder path in the form of: /.../junit/...
+    options.setCheckpointDir(parent.newFolder(options.getJobName()).toURI().toURL().toString());
+    return options;
+  }
+}


[2/2] incubator-beam git commit: This closes #1332

Posted by am...@apache.org.
This closes #1332


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/503f26f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/503f26f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/503f26f4

Branch: refs/heads/master
Commit: 503f26f448ea9f46fcfcdd46e60cba80e4844e28
Parents: 47646d6 0331dd1
Author: Sela <an...@paypal.com>
Authored: Tue Nov 15 19:53:52 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Nov 15 19:53:52 2016 +0200

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  3 +-
 .../runners/spark/SparkPipelineOptions.java     |  4 +-
 .../spark/translation/SparkRuntimeContext.java  |  4 +-
 .../runners/spark/ProvidedSparkContextTest.java | 26 ++++-----
 .../metrics/sink/NamedAggregatorsTest.java      | 13 +++--
 .../beam/runners/spark/io/AvroPipelineTest.java | 12 ++---
 .../beam/runners/spark/io/NumShardsTest.java    | 10 ++--
 .../io/hadoop/HadoopFileFormatPipelineTest.java | 12 ++---
 .../spark/translation/SideEffectsTest.java      | 11 ++--
 .../streaming/EmptyStreamAssertionTest.java     |  4 +-
 .../streaming/FlattenStreamingTest.java         |  4 +-
 .../streaming/KafkaStreamingTest.java           |  4 +-
 .../ResumeFromCheckpointStreamingTest.java      |  4 +-
 .../streaming/SimpleStreamingWordCountTest.java |  6 +--
 .../utils/TestOptionsForStreaming.java          | 55 --------------------
 .../streaming/utils/TestPipelineOptions.java    | 25 +++++++++
 .../utils/TestPipelineOptionsForStreaming.java  | 44 ++++++++++++++++
 17 files changed, 132 insertions(+), 109 deletions(-)
----------------------------------------------------------------------