You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/11/15 18:09:20 UTC
[1/2] incubator-beam git commit: [BEAM-891] fix build occasionally
fails on IndexOutOfBoundsException.
Repository: incubator-beam
Updated Branches:
refs/heads/master 47646d641 -> 503f26f44
[BEAM-891] fix build occasionally fails on IndexOutOfBoundsException.
Moved "TestPipelineOptions#withTmpCheckpointDir" to TestPipelineOptionsForStreaming.
Removed an unused member in ProvidedSparkContextTest.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0331dd1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0331dd1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0331dd1c
Branch: refs/heads/master
Commit: 0331dd1cd75e60484f0b15723e4e7edc280a4d12
Parents: 47646d6
Author: Stas Levin <st...@gmail.com>
Authored: Thu Nov 10 13:32:51 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Nov 15 19:52:12 2016 +0200
----------------------------------------------------------------------
runners/spark/pom.xml | 3 +-
.../runners/spark/SparkPipelineOptions.java | 4 +-
.../spark/translation/SparkRuntimeContext.java | 4 +-
.../runners/spark/ProvidedSparkContextTest.java | 26 ++++-----
.../metrics/sink/NamedAggregatorsTest.java | 13 +++--
.../beam/runners/spark/io/AvroPipelineTest.java | 12 ++---
.../beam/runners/spark/io/NumShardsTest.java | 10 ++--
.../io/hadoop/HadoopFileFormatPipelineTest.java | 12 ++---
.../spark/translation/SideEffectsTest.java | 11 ++--
.../streaming/EmptyStreamAssertionTest.java | 4 +-
.../streaming/FlattenStreamingTest.java | 4 +-
.../streaming/KafkaStreamingTest.java | 4 +-
.../ResumeFromCheckpointStreamingTest.java | 4 +-
.../streaming/SimpleStreamingWordCountTest.java | 6 +--
.../utils/TestOptionsForStreaming.java | 55 --------------------
.../streaming/utils/TestPipelineOptions.java | 25 +++++++++
.../utils/TestPipelineOptionsForStreaming.java | 44 ++++++++++++++++
17 files changed, 132 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 1e4a720..4c5b3f5 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -82,7 +82,8 @@
<beamTestPipelineOptions>
[
"--runner=TestSparkRunner",
- "--streaming=false"
+ "--streaming=false",
+ "--enableSparkMetricSinks=false"
]
</beamTestPipelineOptions>
<beam.spark.test.reuseSparkContext>true</beam.spark.test.reuseSparkContext>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 5168c6c..b1ebde9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -88,8 +88,8 @@ public interface SparkPipelineOptions
@Description("Enable/disable sending aggregator values to Spark's metric sinks")
@Default.Boolean(true)
- Boolean getEnableSparkSinks();
- void setEnableSparkSinks(Boolean enableSparkSinks);
+ Boolean getEnableSparkMetricSinks();
+ void setEnableSparkMetricSinks(Boolean enableSparkMetricSinks);
@Description("If the spark runner will be initialized with a provided Spark Context. "
+ "The Spark Context should be provided with SparkContextOptions.")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 181a111..564db39 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -86,11 +86,11 @@ public class SparkRuntimeContext implements Serializable {
final Accumulator<NamedAggregators> accum = AccumulatorSingleton.getInstance(jsc);
final NamedAggregators initialValue = accum.value();
- if (opts.getEnableSparkSinks()) {
+ if (opts.getEnableSparkMetricSinks()) {
final MetricsSystem metricsSystem = SparkEnv$.MODULE$.get().metricsSystem();
final AggregatorMetricSource aggregatorMetricSource =
new AggregatorMetricSource(opts.getAppName(), initialValue);
- // in case the context was not cleared
+ // re-register the metrics in case of context re-use
metricsSystem.removeSource(aggregatorMetricSource);
metricsSystem.registerSource(aggregatorMetricSource);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
index c225073..bc337c7 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.beam.runners.spark.examples.WordCount;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -33,6 +34,7 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.Rule;
import org.junit.Test;
/**
@@ -48,6 +50,15 @@ public class ProvidedSparkContextTest {
private static final String PROVIDED_CONTEXT_EXCEPTION =
"The provided Spark context was not created or was stopped";
+ private SparkContextOptions getSparkContextOptions(JavaSparkContext jsc) {
+ final SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
+ options.setRunner(SparkRunner.class);
+ options.setUsesProvidedSparkContext(true);
+ options.setProvidedSparkContext(jsc);
+ options.setEnableSparkMetricSinks(false);
+ return options;
+ }
+
/**
* Provide a context and call pipeline run.
* @throws Exception
@@ -56,10 +67,7 @@ public class ProvidedSparkContextTest {
public void testWithProvidedContext() throws Exception {
JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
- SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
- options.setRunner(SparkRunner.class);
- options.setUsesProvidedSparkContext(true);
- options.setProvidedSparkContext(jsc);
+ SparkContextOptions options = getSparkContextOptions(jsc);
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
@@ -83,10 +91,7 @@ public class ProvidedSparkContextTest {
public void testWithNullContext() throws Exception {
JavaSparkContext jsc = null;
- SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
- options.setRunner(SparkRunner.class);
- options.setUsesProvidedSparkContext(true);
- options.setProvidedSparkContext(jsc);
+ SparkContextOptions options = getSparkContextOptions(jsc);
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
@@ -114,10 +119,7 @@ public class ProvidedSparkContextTest {
// Stop the provided Spark context directly
jsc.stop();
- SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class);
- options.setRunner(SparkRunner.class);
- options.setUsesProvidedSparkContext(true);
- options.setProvidedSparkContext(jsc);
+ SparkContextOptions options = getSparkContextOptions(jsc);
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
index 194d66a..c220f2b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
@@ -27,11 +27,10 @@ import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.examples.WordCount;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
@@ -52,9 +51,12 @@ public class NamedAggregatorsTest {
@Rule
public ClearAggregatorsRule clearAggregators = new ClearAggregatorsRule();
+ @Rule
+ public final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+
private Pipeline createSparkPipeline() {
- final SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkRunner.class);
+ SparkPipelineOptions options = pipelineOptions.getOptions();
+ options.setEnableSparkMetricSinks(true);
return Pipeline.create(options);
}
@@ -82,6 +84,9 @@ public class NamedAggregatorsTest {
@Test
public void testNamedAggregators() throws Exception {
+ // don't reuse context in this test, as is tends to mess up Spark's MetricsSystem thread-safety
+ System.setProperty("beam.spark.test.reuseSparkContext", "false");
+
assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue()));
runPipeline();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index fc53dbd..396a30d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -33,11 +33,10 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Before;
import org.junit.Rule;
@@ -55,6 +54,9 @@ public class AvroPipelineTest {
@Rule
public final TemporaryFolder tmpDir = new TemporaryFolder();
+ @Rule
+ public final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+
@Before
public void setUp() throws IOException {
inputFile = tmpDir.newFile("test.avro");
@@ -71,9 +73,7 @@ public class AvroPipelineTest {
savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
populateGenericFile(Lists.newArrayList(savedRecord), schema);
- PipelineOptions options = PipelineOptionsFactory.create();
- options.setRunner(SparkRunner.class);
- Pipeline p = Pipeline.create(options);
+ Pipeline p = Pipeline.create(pipelineOptions.getOptions());
PCollection<GenericRecord> input = p.apply(
AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 0ff30ac..922046c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -31,12 +31,11 @@ import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.examples.WordCount;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
@@ -60,6 +59,9 @@ public class NumShardsTest {
@Rule
public final TemporaryFolder tmpDir = new TemporaryFolder();
+ @Rule
+ public final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+
@Before
public void setUp() throws IOException {
outputDir = tmpDir.newFolder("out");
@@ -68,9 +70,7 @@ public class NumShardsTest {
@Test
public void testText() throws Exception {
- SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkRunner.class);
- Pipeline p = Pipeline.create(options);
+ Pipeline p = Pipeline.create(pipelineOptions.getOptions());
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<String> output = inputWords.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index aa1e1ce..628d8b6 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -22,12 +22,11 @@ import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
-import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.WritableCoder;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.hadoop.conf.Configuration;
@@ -54,6 +53,9 @@ public class HadoopFileFormatPipelineTest {
private File outputFile;
@Rule
+ public final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+
+ @Rule
public final TemporaryFolder tmpDir = new TemporaryFolder();
@Before
@@ -67,9 +69,7 @@ public class HadoopFileFormatPipelineTest {
public void testSequenceFile() throws Exception {
populateFile();
- PipelineOptions options = PipelineOptionsFactory.create();
- options.setRunner(SparkRunner.class);
- Pipeline p = Pipeline.create(options);
+ Pipeline p = Pipeline.create(pipelineOptions.getOptions());
@SuppressWarnings("unchecked")
Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
(Class<? extends FileInputFormat<IntWritable, Text>>)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
index 1304e12..7d39d89 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
@@ -25,13 +25,13 @@ import static org.junit.Assert.fail;
import java.io.Serializable;
import java.net.URI;
import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringDelegateCoder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.junit.Rule;
import org.junit.Test;
/**
@@ -42,11 +42,12 @@ public class SideEffectsTest implements Serializable {
static class UserException extends RuntimeException {
}
+ @Rule
+ public transient final TestPipelineOptions pipelineOptions = new TestPipelineOptions();
+
@Test
public void test() throws Exception {
- SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
- options.setRunner(SparkRunner.class);
- Pipeline p = Pipeline.create(options);
+ Pipeline p = Pipeline.create(pipelineOptions.getOptions());
p.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
index 3e95b4d..2a38e30 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
@@ -25,7 +25,7 @@ import java.util.Collections;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -51,7 +51,7 @@ public class EmptyStreamAssertionTest implements Serializable {
public TemporaryFolder checkpointParentDir = new TemporaryFolder();
@Rule
- public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
+ public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming();
@Test
public void testAssertion() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index 319b5e9..bd544e9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -23,7 +23,7 @@ import java.util.List;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Create;
@@ -57,7 +57,7 @@ public class FlattenStreamingTest {
public TemporaryFolder checkpointParentDir = new TemporaryFolder();
@Rule
- public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
+ public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming();
@Test
public void testFlattenUnbounded() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 29e4609..5841331 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -28,7 +28,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
import org.apache.beam.runners.spark.translation.streaming.utils.KafkaWriteOnBatchCompleted;
import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -71,7 +71,7 @@ public class KafkaStreamingTest {
public TemporaryFolder checkpointParentDir = new TemporaryFolder();
@Rule
- public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
+ public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming();
@Test
public void testEarliest2Topics() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 34ffbe2..e345831 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -32,7 +32,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
@@ -80,7 +80,7 @@ public class ResumeFromCheckpointStreamingTest {
public TemporaryFolder checkpointParentDir = new TemporaryFolder();
@Rule
- public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
+ public TestPipelineOptionsForStreaming commonOptions = new TestPipelineOptionsForStreaming();
@BeforeClass
public static void init() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index edba507..bdfc24f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -25,7 +25,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
-import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
+import org.apache.beam.runners.spark.translation.streaming.utils.TestPipelineOptionsForStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.MapElements;
@@ -47,7 +47,7 @@ public class SimpleStreamingWordCountTest implements Serializable {
public TemporaryFolder checkpointParentDir = new TemporaryFolder();
@Rule
- public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
+ public TestPipelineOptionsForStreaming pipelineOptions = new TestPipelineOptionsForStreaming();
private static final String[] WORDS = {"hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
@@ -62,7 +62,7 @@ public class SimpleStreamingWordCountTest implements Serializable {
@Test
public void testFixedWindows() throws Exception {
- SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);
+ SparkPipelineOptions options = pipelineOptions.withTmpCheckpointDir(checkpointParentDir);
// override defaults
options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
deleted file mode 100644
index 2861d9f..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestOptionsForStreaming.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation.streaming.utils;
-
-
-import java.io.IOException;
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.rules.ExternalResource;
-import org.junit.rules.TemporaryFolder;
-
-
-
-/**
- * A rule to create a common {@link SparkPipelineOptions} for testing streaming pipelines.
- */
-public class TestOptionsForStreaming extends ExternalResource {
- private final SparkPipelineOptions options =
- PipelineOptionsFactory.as(SparkPipelineOptions.class);
-
- @Override
- protected void before() throws Throwable {
- options.setRunner(SparkRunner.class);
- options.setStreaming(true);
- options.setTimeout(1000L);
- }
-
- public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent)
- throws IOException {
- // tests use JUnit's TemporaryFolder path in the form of: /.../junit/...
- options.setCheckpointDir(parent.newFolder(options.getJobName()).toURI().toURL().toString());
- return options;
- }
-
- public SparkPipelineOptions getOptions() {
- return options;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java
new file mode 100644
index 0000000..ccfb29e
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptions.java
@@ -0,0 +1,25 @@
+package org.apache.beam.runners.spark.translation.streaming.utils;
+
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.rules.ExternalResource;
+
+/**
+ * A rule to create a common {@link SparkPipelineOptions} test options for spark-runner.
+ */
+public class TestPipelineOptions extends ExternalResource {
+
+ protected final SparkPipelineOptions options =
+ PipelineOptionsFactory.as(SparkPipelineOptions.class);
+
+ @Override
+ protected void before() throws Throwable {
+ options.setRunner(SparkRunner.class);
+ options.setEnableSparkMetricSinks(false);
+ }
+
+ public SparkPipelineOptions getOptions() {
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0331dd1c/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java
new file mode 100644
index 0000000..3d178ae
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/TestPipelineOptionsForStreaming.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation.streaming.utils;
+
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.junit.rules.TemporaryFolder;
+import java.io.IOException;
+
+
+/**
+ * A rule to create a common {@link SparkPipelineOptions} for testing streaming pipelines.
+ */
+public class TestPipelineOptionsForStreaming extends TestPipelineOptions {
+
+ @Override
+ protected void before() throws Throwable {
+ super.before();
+ options.setTimeout(1000L);
+ options.setStreaming(true);
+ }
+
+ public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent)
+ throws IOException {
+ // tests use JUnit's TemporaryFolder path in the form of: /.../junit/...
+ options.setCheckpointDir(parent.newFolder(options.getJobName()).toURI().toURL().toString());
+ return options;
+ }
+}
[2/2] incubator-beam git commit: This closes #1332
Posted by am...@apache.org.
This closes #1332
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/503f26f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/503f26f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/503f26f4
Branch: refs/heads/master
Commit: 503f26f448ea9f46fcfcdd46e60cba80e4844e28
Parents: 47646d6 0331dd1
Author: Sela <an...@paypal.com>
Authored: Tue Nov 15 19:53:52 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Nov 15 19:53:52 2016 +0200
----------------------------------------------------------------------
runners/spark/pom.xml | 3 +-
.../runners/spark/SparkPipelineOptions.java | 4 +-
.../spark/translation/SparkRuntimeContext.java | 4 +-
.../runners/spark/ProvidedSparkContextTest.java | 26 ++++-----
.../metrics/sink/NamedAggregatorsTest.java | 13 +++--
.../beam/runners/spark/io/AvroPipelineTest.java | 12 ++---
.../beam/runners/spark/io/NumShardsTest.java | 10 ++--
.../io/hadoop/HadoopFileFormatPipelineTest.java | 12 ++---
.../spark/translation/SideEffectsTest.java | 11 ++--
.../streaming/EmptyStreamAssertionTest.java | 4 +-
.../streaming/FlattenStreamingTest.java | 4 +-
.../streaming/KafkaStreamingTest.java | 4 +-
.../ResumeFromCheckpointStreamingTest.java | 4 +-
.../streaming/SimpleStreamingWordCountTest.java | 6 +--
.../utils/TestOptionsForStreaming.java | 55 --------------------
.../streaming/utils/TestPipelineOptions.java | 25 +++++++++
.../utils/TestPipelineOptionsForStreaming.java | 44 ++++++++++++++++
17 files changed, 132 insertions(+), 109 deletions(-)
----------------------------------------------------------------------