You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/05/04 18:10:33 UTC
[1/6] beam git commit: [BEAM-1763] Replace usage of PipelineRule with
TestPipeline in Spark runner tests
Repository: beam
Updated Branches:
refs/heads/master 48c8ed176 -> b73918b55
[BEAM-1763] Replace usage of PipelineRule with TestPipeline in Spark runner tests
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8d91a97b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8d91a97b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8d91a97b
Branch: refs/heads/master
Commit: 8d91a97b77fbda74c577d2cdbd507395834e147c
Parents: 0e2bb18
Author: Aviem Zur <av...@gmail.com>
Authored: Wed May 3 21:06:00 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300
----------------------------------------------------------------------
runners/spark/pom.xml | 47 +++++++-
.../runners/spark/SparkRunnerRegistrar.java | 4 +-
.../apache/beam/runners/spark/CacheTest.java | 12 +-
.../beam/runners/spark/ForceStreamingTest.java | 18 +--
.../apache/beam/runners/spark/PipelineRule.java | 109 -------------------
.../runners/spark/ProvidedSparkContextTest.java | 10 +-
.../runners/spark/SparkRunnerDebuggerTest.java | 15 +--
.../runners/spark/SparkRunnerRegistrarTest.java | 2 +-
.../beam/runners/spark/StreamingTest.java | 23 ++++
.../metrics/sink/SparkMetricsSinkTest.java | 12 +-
.../beam/runners/spark/io/AvroPipelineTest.java | 10 +-
.../beam/runners/spark/io/NumShardsTest.java | 6 +-
.../spark/translation/StorageLevelTest.java | 31 +++++-
.../translation/streaming/CreateStreamTest.java | 53 ++++-----
.../ResumeFromCheckpointStreamingTest.java | 50 ++++++---
.../streaming/StreamingSourceMetricsTest.java | 14 +--
.../main/java/org/apache/beam/sdk/Pipeline.java | 2 +-
.../apache/beam/sdk/testing/TestPipeline.java | 21 +++-
18 files changed, 217 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 38d250e..f7200d6 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -333,9 +333,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
- <excludedGroups>
- org.apache.beam.runners.spark.UsesCheckpointRecovery
- </excludedGroups>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<systemPropertyVariables>
@@ -344,6 +341,50 @@
<spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress>
</systemPropertyVariables>
</configuration>
+ <executions>
+ <execution>
+ <id>default-test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <excludedGroups>
+ org.apache.beam.runners.spark.UsesCheckpointRecovery,
+ org.apache.beam.runners.spark.StreamingTest
+ </excludedGroups>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions>
+ [
+ "--runner=TestSparkRunner",
+ "--streaming=false",
+ "--enableSparkMetricSinks=true"
+ ]
+ </beamTestPipelineOptions>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
+ <execution>
+ <id>streaming-tests</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <groups>
+ org.apache.beam.runners.spark.StreamingTest
+ </groups>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions>
+ [
+ "--runner=TestSparkRunner",
+ "--forceStreaming=true",
+ "--enableSparkMetricSinks=true"
+ ]
+ </beamTestPipelineOptions>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
+ </executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index bedfda4..bf926dc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -54,7 +54,9 @@ public final class SparkRunnerRegistrar {
public static class Options implements PipelineOptionsRegistrar {
@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
- return ImmutableList.<Class<? extends PipelineOptions>>of(SparkPipelineOptions.class);
+ return ImmutableList.<Class<? extends PipelineOptions>>of(
+ SparkPipelineOptions.class,
+ TestSparkPipelineOptions.class);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
index c3b48d8..24b2e7b 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
@@ -23,11 +23,11 @@ import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.spark.api.java.JavaSparkContext;
-import org.junit.Rule;
import org.junit.Test;
/**
@@ -36,12 +36,12 @@ import org.junit.Test;
*/
public class CacheTest {
- @Rule
- public final transient PipelineRule pipelineRule = PipelineRule.batch();
-
@Test
public void cacheCandidatesUpdaterTest() throws Exception {
- Pipeline pipeline = pipelineRule.createPipeline();
+ SparkPipelineOptions options =
+ PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
+ options.setRunner(TestSparkRunner.class);
+ Pipeline pipeline = Pipeline.create(options);
PCollection<String> pCollection = pipeline.apply(Create.of("foo", "bar"));
// first read
pCollection.apply(Count.<String>globally());
@@ -50,7 +50,7 @@ public class CacheTest {
// will cache the RDD representing this PCollection
pCollection.apply(Count.<String>globally());
- JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineRule.getOptions());
+ JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
SparkRunner.CacheVisitor cacheVisitor =
new SparkRunner.CacheVisitor(new TransformTranslator.Translator(), ctxt);
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
index b60faf2..7bfc980 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
@@ -25,9 +25,9 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
-import org.junit.Rule;
import org.junit.Test;
@@ -44,19 +44,23 @@ import org.junit.Test;
*/
public class ForceStreamingTest {
- @Rule
- public final PipelineRule pipelineRule = PipelineRule.streaming();
-
@Test
public void test() throws IOException {
- Pipeline pipeline = pipelineRule.createPipeline();
+ TestSparkPipelineOptions options =
+ PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
+ options.setRunner(TestSparkRunner.class);
+ options.setForceStreaming(true);
+
+ // pipeline with a bounded read.
+ Pipeline pipeline = Pipeline.create(options);
// apply the BoundedReadFromUnboundedSource.
BoundedReadFromUnboundedSource<?> boundedRead =
Read.from(CountingSource.unbounded()).withMaxNumRecords(-1);
- //noinspection unchecked
pipeline.apply(boundedRead);
- TestSparkRunner runner = TestSparkRunner.fromOptions(pipelineRule.getOptions());
+
+ // adapt reads
+ TestSparkRunner runner = TestSparkRunner.fromOptions(options);
runner.adaptBoundedReads(pipeline);
UnboundedReadDetector unboundedReadDetector = new UnboundedReadDetector();
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java
deleted file mode 100644
index f8499f3..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.joda.time.Duration;
-import org.junit.rules.ExternalResource;
-import org.junit.rules.RuleChain;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.TestName;
-import org.junit.rules.TestRule;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-/**
- * A {@link org.junit.Rule} to provide a {@link Pipeline} instance for Spark runner tests.
- */
-public class PipelineRule implements TestRule {
-
- private final SparkPipelineRule delegate;
- private final RuleChain chain;
-
- private PipelineRule(SparkPipelineRule delegate) {
- TestName testName = new TestName();
- this.delegate = delegate;
- this.delegate.setTestName(testName);
- this.chain = RuleChain.outerRule(testName).around(this.delegate);
- }
-
- public static PipelineRule streaming() {
- return new PipelineRule(new SparkStreamingPipelineRule());
- }
-
- public static PipelineRule batch() {
- return new PipelineRule(new SparkPipelineRule());
- }
-
- public Duration batchDuration() {
- return Duration.millis(delegate.options.getBatchIntervalMillis());
- }
-
- public TestSparkPipelineOptions getOptions() {
- return delegate.options;
- }
-
- public Pipeline createPipeline() {
- return Pipeline.create(delegate.options);
- }
-
- @Override
- public Statement apply(Statement statement, Description description) {
- return chain.apply(statement, description);
- }
-
- private static class SparkStreamingPipelineRule extends SparkPipelineRule {
-
- private final TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- @Override
- protected void before() throws Throwable {
- super.before();
- temporaryFolder.create();
- options.setForceStreaming(true);
- options.setCheckpointDir(
- temporaryFolder.newFolder(options.getJobName()).toURI().toURL().toString());
- }
-
- @Override
- protected void after() {
- temporaryFolder.delete();
- }
- }
-
- private static class SparkPipelineRule extends ExternalResource {
-
- protected final TestSparkPipelineOptions options =
- PipelineOptionsFactory.as(TestSparkPipelineOptions.class);
-
- private TestName testName;
-
- public void setTestName(TestName testName) {
- this.testName = testName;
- }
-
- @Override
- protected void before() throws Throwable {
- options.setRunner(TestSparkRunner.class);
- options.setEnableSparkMetricSinks(false);
- options.setJobName(
- testName != null ? testName.getMethodName() : "test-at-" + System.currentTimeMillis());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
index 36ba863..8112993 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -27,9 +27,11 @@ import java.util.List;
import java.util.Set;
import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
@@ -95,7 +97,9 @@ public class ProvidedSparkContextTest {
PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
// Run test from pipeline
- p.run().waitUntilFinish();
+ PipelineResult result = p.run();
+
+ TestPipeline.verifyPAssertsSucceeded(p, result);
}
private void testWithInvalidContext(JavaSparkContext jsc) {
@@ -104,11 +108,9 @@ public class ProvidedSparkContextTest {
Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
.of()));
- PCollection<String> output = inputWords.apply(new WordCount.CountWords())
+ inputWords.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));
- PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
-
try {
p.run().waitUntilFinish();
fail("Should throw an exception when The provided Spark context is null or stopped");
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index ea058b2..9009751 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -27,6 +27,8 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Distinct;
@@ -48,7 +50,6 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
-import org.junit.Rule;
import org.junit.Test;
@@ -57,15 +58,9 @@ import org.junit.Test;
*/
public class SparkRunnerDebuggerTest {
- @Rule
- public final PipelineRule batchPipelineRule = PipelineRule.batch();
-
- @Rule
- public final PipelineRule streamingPipelineRule = PipelineRule.streaming();
-
@Test
public void debugBatchPipeline() {
- TestSparkPipelineOptions options = batchPipelineRule.getOptions();
+ PipelineOptions options = PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
options.setRunner(SparkRunnerDebugger.class);
Pipeline pipeline = Pipeline.create(options);
@@ -111,7 +106,9 @@ public class SparkRunnerDebuggerTest {
@Test
public void debugStreamingPipeline() {
- TestSparkPipelineOptions options = streamingPipelineRule.getOptions();
+ TestSparkPipelineOptions options =
+ PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
+ options.setForceStreaming(true);
options.setRunner(SparkRunnerDebugger.class);
Pipeline pipeline = Pipeline.create(options);
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 4e1fd7c..75899f9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -38,7 +38,7 @@ public class SparkRunnerRegistrarTest {
@Test
public void testOptions() {
assertEquals(
- ImmutableList.of(SparkPipelineOptions.class),
+ ImmutableList.of(SparkPipelineOptions.class, TestSparkPipelineOptions.class),
new SparkRunnerRegistrar.Options().getPipelineOptions());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java
new file mode 100644
index 0000000..a34c184
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/StreamingTest.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark;
+
+/**
+ * Category tag for tests that should be run in streaming mode.
+ */
+public interface StreamingTest {}
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
index b0ad972..fff95cb 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/SparkMetricsSinkTest.java
@@ -26,11 +26,10 @@ import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
-import org.apache.beam.runners.spark.PipelineRule;
import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
@@ -48,12 +47,7 @@ public class SparkMetricsSinkTest {
public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();
@Rule
- public final PipelineRule pipelineRule = PipelineRule.batch();
-
- private Pipeline createSparkPipeline() {
- pipelineRule.getOptions().setEnableSparkMetricSinks(true);
- return pipelineRule.createPipeline();
- }
+ public final TestPipeline pipeline = TestPipeline.create();
private void runPipeline() {
final List<String> words =
@@ -62,8 +56,6 @@ public class SparkMetricsSinkTest {
final Set<String> expectedCounts =
ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
- final Pipeline pipeline = createSparkPipeline();
-
final PCollection<String> output =
pipeline
.apply(Create.of(words).withCoder(StringUtf8Coder.of()))
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 7188dc5..adde8d2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -33,9 +33,8 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Before;
import org.junit.Rule;
@@ -54,7 +53,7 @@ public class AvroPipelineTest {
public final TemporaryFolder tmpDir = new TemporaryFolder();
@Rule
- public final PipelineRule pipelineRule = PipelineRule.batch();
+ public final TestPipeline pipeline = TestPipeline.create();
@Before
public void setUp() throws IOException {
@@ -72,11 +71,10 @@ public class AvroPipelineTest {
savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
populateGenericFile(Lists.newArrayList(savedRecord), schema);
- Pipeline p = pipelineRule.createPipeline();
- PCollection<GenericRecord> input = p.apply(
+ PCollection<GenericRecord> input = pipeline.apply(
AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
input.apply(AvroIO.writeGenericRecords(schema).to(outputDir.getAbsolutePath()));
- p.run().waitUntilFinish();
+ pipeline.run();
List<GenericRecord> records = readGenericFile();
assertEquals(Lists.newArrayList(savedRecord), records);
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 5021744..55ee938 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -30,11 +30,10 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
-import org.apache.beam.runners.spark.PipelineRule;
import org.apache.beam.runners.spark.examples.WordCount;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
@@ -59,7 +58,7 @@ public class NumShardsTest {
public final TemporaryFolder tmpDir = new TemporaryFolder();
@Rule
- public final PipelineRule pipelineRule = PipelineRule.batch();
+ public final TestPipeline p = TestPipeline.create();
@Before
public void setUp() throws IOException {
@@ -69,7 +68,6 @@ public class NumShardsTest {
@Test
public void testText() throws Exception {
- Pipeline p = pipelineRule.createPipeline();
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<String> output = inputWords.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
index 2b7b87b..8f2e681 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
@@ -15,30 +15,49 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.beam.runners.spark.translation;
-import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+
/**
* Test the RDD storage level defined by user.
*/
public class StorageLevelTest {
+ private static String beamTestPipelineOptions;
+
@Rule
- public final transient PipelineRule pipelineRule = PipelineRule.batch();
+ public final TestPipeline pipeline = TestPipeline.create();
+
+ @BeforeClass
+ public static void init() {
+ beamTestPipelineOptions =
+ System.getProperty(TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS);
+
+ System.setProperty(
+ TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS,
+ beamTestPipelineOptions.replace("]", ", \"--storageLevel=DISK_ONLY\"]"));
+ }
+
+ @AfterClass
+ public static void teardown() {
+ System.setProperty(
+ TestPipeline.PROPERTY_BEAM_TEST_PIPELINE_OPTIONS,
+ beamTestPipelineOptions);
+ }
@Test
public void test() throws Exception {
- pipelineRule.getOptions().setStorageLevel("DISK_ONLY");
- Pipeline pipeline = pipelineRule.createPipeline();
-
PCollection<String> pCollection = pipeline.apply(Create.of("foo"));
// by default, the Spark runner doesn't cache the RDD if it accessed only one time.
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
index dd52c05..770e0c0 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
@@ -24,13 +24,14 @@ import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.io.Serializable;
-import org.apache.beam.runners.spark.PipelineRule;
import org.apache.beam.runners.spark.ReuseSparkContextRule;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.StreamingTest;
import org.apache.beam.runners.spark.io.CreateStream;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
@@ -61,6 +62,7 @@ import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
@@ -74,10 +76,11 @@ import org.junit.rules.ExpectedException;
* {@link org.apache.spark.streaming.dstream.QueueInputDStream} and advance the system's WMs.
* //TODO: add synchronized/processing time trigger.
*/
+@Category(StreamingTest.class)
public class CreateStreamTest implements Serializable {
@Rule
- public final transient PipelineRule pipelineRule = PipelineRule.streaming();
+ public final transient TestPipeline p = TestPipeline.create();
@Rule
public final transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no();
@Rule
@@ -85,10 +88,9 @@ public class CreateStreamTest implements Serializable {
@Test
public void testLateDataAccumulating() throws IOException {
- Pipeline p = pipelineRule.createPipeline();
Instant instant = new Instant(0);
CreateStream<Integer> source =
- CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+ CreateStream.of(VarIntCoder.of(), batchDuration())
.emptyBatch()
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(6)))
.nextBatch(
@@ -159,9 +161,8 @@ public class CreateStreamTest implements Serializable {
@Test
public void testDiscardingMode() throws IOException {
- Pipeline p = pipelineRule.createPipeline();
CreateStream<String> source =
- CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
+ CreateStream.of(StringUtf8Coder.of(), batchDuration())
.nextBatch(
TimestampedValue.of("firstPane", new Instant(100)),
TimestampedValue.of("alsoFirstPane", new Instant(200)))
@@ -208,10 +209,9 @@ public class CreateStreamTest implements Serializable {
@Test
public void testFirstElementLate() throws IOException {
- Pipeline p = pipelineRule.createPipeline();
Instant lateElementTimestamp = new Instant(-1_000_000);
CreateStream<String> source =
- CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
+ CreateStream.of(StringUtf8Coder.of(), batchDuration())
.emptyBatch()
.advanceWatermarkForNextBatch(new Instant(0))
.nextBatch(
@@ -242,10 +242,9 @@ public class CreateStreamTest implements Serializable {
@Test
public void testElementsAtAlmostPositiveInfinity() throws IOException {
- Pipeline p = pipelineRule.createPipeline();
Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
CreateStream<String> source =
- CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
+ CreateStream.of(StringUtf8Coder.of(), batchDuration())
.nextBatch(
TimestampedValue.of("foo", endOfGlobalWindow),
TimestampedValue.of("bar", endOfGlobalWindow))
@@ -267,13 +266,12 @@ public class CreateStreamTest implements Serializable {
@Test
public void testMultipleStreams() throws IOException {
- Pipeline p = pipelineRule.createPipeline();
CreateStream<String> source =
- CreateStream.of(StringUtf8Coder.of(), pipelineRule.batchDuration())
+ CreateStream.of(StringUtf8Coder.of(), batchDuration())
.nextBatch("foo", "bar")
.advanceNextBatchWatermarkToInfinity();
CreateStream<Integer> other =
- CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+ CreateStream.of(VarIntCoder.of(), batchDuration())
.nextBatch(1, 2, 3, 4)
.advanceNextBatchWatermarkToInfinity();
@@ -298,10 +296,9 @@ public class CreateStreamTest implements Serializable {
@Test
public void testFlattenedWithWatermarkHold() throws IOException {
- Pipeline p = pipelineRule.createPipeline();
Instant instant = new Instant(0);
CreateStream<Integer> source1 =
- CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+ CreateStream.of(VarIntCoder.of(), batchDuration())
.emptyBatch()
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5)))
.nextBatch(
@@ -310,7 +307,7 @@ public class CreateStreamTest implements Serializable {
TimestampedValue.of(3, instant))
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(10)));
CreateStream<Integer> source2 =
- CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+ CreateStream.of(VarIntCoder.of(), batchDuration())
.emptyBatch()
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(1)))
.nextBatch(
@@ -323,14 +320,14 @@ public class CreateStreamTest implements Serializable {
.advanceNextBatchWatermarkToInfinity();
PCollection<Integer> windowed1 = p
- .apply(source1)
- .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
+ .apply("CreateStream1", source1)
+ .apply("Window1", Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(AfterWatermark.pastEndOfWindow())
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO));
PCollection<Integer> windowed2 = p
- .apply(source2)
- .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
+ .apply("CreateStream2", source2)
+ .apply("Window2", Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(AfterWatermark.pastEndOfWindow())
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO));
@@ -357,10 +354,9 @@ public class CreateStreamTest implements Serializable {
*/
@Test
public void testMultiOutputParDo() throws IOException {
- Pipeline p = pipelineRule.createPipeline();
Instant instant = new Instant(0);
CreateStream<Integer> source1 =
- CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+ CreateStream.of(VarIntCoder.of(), batchDuration())
.emptyBatch()
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5)))
.nextBatch(
@@ -397,7 +393,7 @@ public class CreateStreamTest implements Serializable {
@Test
public void testElementAtPositiveInfinityThrows() {
CreateStream<Integer> source =
- CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+ CreateStream.of(VarIntCoder.of(), batchDuration())
.nextBatch(TimestampedValue.of(-1, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L)))
.advanceNextBatchWatermarkToInfinity();
thrown.expect(IllegalArgumentException.class);
@@ -407,7 +403,7 @@ public class CreateStreamTest implements Serializable {
@Test
public void testAdvanceWatermarkNonMonotonicThrows() {
CreateStream<Integer> source =
- CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+ CreateStream.of(VarIntCoder.of(), batchDuration())
.advanceWatermarkForNextBatch(new Instant(0L));
thrown.expect(IllegalArgumentException.class);
source
@@ -418,9 +414,14 @@ public class CreateStreamTest implements Serializable {
@Test
public void testAdvanceWatermarkEqualToPositiveInfinityThrows() {
CreateStream<Integer> source =
- CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
+ CreateStream.of(VarIntCoder.of(), batchDuration())
.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L));
thrown.expect(IllegalArgumentException.class);
source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE);
}
+
+ private Duration batchDuration() {
+ return Duration.millis(
+ (p.getOptions().as(SparkPipelineOptions.class)).getBatchIntervalMillis());
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 33571f0..584edac 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -28,15 +28,16 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.spark.PipelineRule;
import org.apache.beam.runners.spark.ReuseSparkContextRule;
import org.apache.beam.runners.spark.SparkPipelineResult;
import org.apache.beam.runners.spark.TestSparkPipelineOptions;
+import org.apache.beam.runners.spark.TestSparkRunner;
import org.apache.beam.runners.spark.UsesCheckpointRecovery;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.io.MicrobatchSource;
@@ -53,6 +54,7 @@ import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -81,11 +83,12 @@ import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-
+import org.junit.rules.TemporaryFolder;
/**
* Tests DStream recovery from checkpoint.
@@ -96,24 +99,34 @@ import org.junit.experimental.categories.Category;
* {@link Metrics} values that are expected to resume from previous count and a side-input that is
* expected to recover as well.
*/
-public class ResumeFromCheckpointStreamingTest {
+public class ResumeFromCheckpointStreamingTest implements Serializable {
private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER =
new EmbeddedKafkaCluster.EmbeddedZookeeper();
private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER =
new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new Properties());
private static final String TOPIC = "kafka_beam_test_topic";
+ private transient TemporaryFolder temporaryFolder;
+
@Rule
public final transient ReuseSparkContextRule noContextReuse = ReuseSparkContextRule.no();
- @Rule
- public final transient PipelineRule pipelineRule = PipelineRule.streaming();
@BeforeClass
- public static void init() throws IOException {
+ public static void setup() throws IOException {
EMBEDDED_ZOOKEEPER.startup();
EMBEDDED_KAFKA_CLUSTER.startup();
}
+ @Before
+ public void init() {
+ temporaryFolder = new TemporaryFolder();
+ try {
+ temporaryFolder.create();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private static void produce(Map<String, Instant> messages) {
Properties producerProps = new Properties();
producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps());
@@ -148,7 +161,7 @@ public class ResumeFromCheckpointStreamingTest {
.build();
// first run should expect EOT matching the last injected element.
- SparkPipelineResult res = run(pipelineRule, Optional.of(new Instant(400)), 0);
+ SparkPipelineResult res = run(Optional.of(new Instant(400)), 0);
assertThat(res.metrics().queryMetrics(metricsFilter).counters(),
hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(),
@@ -169,7 +182,7 @@ public class ResumeFromCheckpointStreamingTest {
));
// recovery should resume from last read offset, and read the second batch of input.
- res = runAgain(pipelineRule, 1);
+ res = runAgain(1);
// assertions 2:
assertThat(res.metrics().queryMetrics(metricsFilter).counters(),
hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(),
@@ -209,18 +222,18 @@ public class ResumeFromCheckpointStreamingTest {
String.format("Found %d failed assertions.", failedAssertions),
failedAssertions,
is(0L));
-
}
- private SparkPipelineResult runAgain(PipelineRule pipelineRule, int expectedAssertions) {
+ private SparkPipelineResult runAgain(int expectedAssertions) {
// sleep before next run.
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
- return run(pipelineRule, Optional.<Instant>absent(), expectedAssertions);
+ return run(Optional.<Instant>absent(), expectedAssertions);
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
- private static SparkPipelineResult run(
- PipelineRule pipelineRule, Optional<Instant> stopWatermarkOption, int expectedAssertions) {
+ private SparkPipelineResult run(
+ Optional<Instant> stopWatermarkOption,
+ int expectedAssertions) {
KafkaIO.Read<String, Instant> read = KafkaIO.<String, Instant>read()
.withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
.withTopics(Collections.singletonList(TOPIC))
@@ -242,15 +255,21 @@ public class ResumeFromCheckpointStreamingTest {
}
});
- TestSparkPipelineOptions options = pipelineRule.getOptions();
+ TestSparkPipelineOptions options =
+ PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
options.setSparkMaster("local[*]");
options.setCheckpointDurationMillis(options.getBatchIntervalMillis());
options.setExpectedAssertions(expectedAssertions);
+ options.setRunner(TestSparkRunner.class);
+ options.setEnableSparkMetricSinks(false);
+ options.setForceStreaming(true);
+ options.setCheckpointDir(temporaryFolder.getRoot().getPath());
// timeout is per execution so it can be injected by the caller.
if (stopWatermarkOption.isPresent()) {
options.setStopPipelineWatermark(stopWatermarkOption.get().getMillis());
}
- Pipeline p = pipelineRule.createPipeline();
+
+ Pipeline p = Pipeline.create(options);
PCollection<String> expectedCol =
p.apply(Create.of(ImmutableList.of("side1", "side2")).withCoder(StringUtf8Coder.of()));
@@ -354,5 +373,4 @@ public class ResumeFromCheckpointStreamingTest {
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
index 5a4b1b5..df6027c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java
@@ -23,9 +23,7 @@ import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertThat;
import java.io.Serializable;
-import org.apache.beam.runners.spark.PipelineRule;
-import org.apache.beam.runners.spark.TestSparkPipelineOptions;
-import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.runners.spark.StreamingTest;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Source;
@@ -34,10 +32,11 @@ import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.SourceMetrics;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
-
+import org.junit.experimental.categories.Category;
/**
* Verify metrics support for {@link Source Sources} in streaming pipelines.
@@ -47,14 +46,11 @@ public class StreamingSourceMetricsTest implements Serializable {
// Force streaming pipeline using pipeline rule.
@Rule
- public final transient PipelineRule pipelineRule = PipelineRule.streaming();
+ public final transient TestPipeline pipeline = TestPipeline.create();
@Test
+ @Category(StreamingTest.class)
public void testUnboundedSourceMetrics() {
- TestSparkPipelineOptions options = pipelineRule.getOptions();
-
- Pipeline pipeline = Pipeline.create(options);
-
final long numElements = 1000;
pipeline.apply(
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 6b15f0d..6fa7a5a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -460,7 +460,7 @@ public class Pipeline {
private Set<String> usedFullNames = new HashSet<>();
private CoderRegistry coderRegistry;
private final List<String> unstableNames = new ArrayList<>();
- private final PipelineOptions defaultOptions;
+ protected final PipelineOptions defaultOptions;
protected Pipeline(PipelineOptions options) {
this.defaultOptions = options;
http://git-wip-us.apache.org/repos/asf/beam/blob/8d91a97b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index d8fe51d..2d34b22 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
@@ -244,8 +245,11 @@ public class TestPipeline extends Pipeline implements TestRule {
}
}
- static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
+ /** System property used to set {@link TestPipelineOptions}. */
+ public static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
+
static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner";
+
private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(
ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
@@ -331,7 +335,7 @@ public class TestPipeline extends Pipeline implements TestRule {
try {
enforcement.get().beforePipelineExecution();
pipelineResult = super.run();
- verifyPAssertsSucceeded(pipelineResult);
+ verifyPAssertsSucceeded(this, pipelineResult);
} catch (RuntimeException exc) {
Throwable cause = exc.getCause();
if (cause instanceof AssertionError) {
@@ -377,6 +381,15 @@ public class TestPipeline extends Pipeline implements TestRule {
return this;
}
+ @VisibleForTesting
+ @Override
+ /**
+ * Get this pipeline's options.
+ */
+ public PipelineOptions getOptions() {
+ return defaultOptions;
+ }
+
@Override
public String toString() {
return "TestPipeline#" + getOptions().as(ApplicationNameOptions.class).getAppName();
@@ -501,9 +514,9 @@ public class TestPipeline extends Pipeline implements TestRule {
* <p>Note this only runs for runners which support Metrics. Runners which do not should verify
* this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001</p>
*/
- private void verifyPAssertsSucceeded(PipelineResult pipelineResult) {
+ public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pipelineResult) {
if (MetricsEnvironment.isMetricsSupported()) {
- long expectedNumberOfAssertions = (long) PAssert.countAsserts(this);
+ long expectedNumberOfAssertions = (long) PAssert.countAsserts(pipeline);
long successfulAssertions = 0;
Iterable<MetricResult<Long>> successCounterResults =
[3/6] beam git commit: [BEAM-1763] Verify PAssert execution in
runners which support metrics.
Posted by av...@apache.org.
[BEAM-1763] Verify PAssert execution in runners which support metrics.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95ade45e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95ade45e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95ade45e
Branch: refs/heads/master
Commit: 95ade45eced4787eb67a9d4d13dae48ffb176919
Parents: 48c8ed1
Author: Aviem Zur <av...@gmail.com>
Authored: Tue May 2 19:00:29 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300
----------------------------------------------------------------------
.../apache/beam/runners/flink/FlinkRunner.java | 3 ++
.../beam/runners/spark/TestSparkRunner.java | 47 --------------------
.../ResumeFromCheckpointStreamingTest.java | 12 +++--
.../beam/sdk/metrics/MetricsEnvironment.java | 5 +++
.../apache/beam/sdk/testing/TestPipeline.java | 46 ++++++++++++++++---
5 files changed, 57 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 181ffda..a5972ef 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -31,6 +31,7 @@ import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
@@ -103,6 +104,8 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
public PipelineResult run(Pipeline pipeline) {
logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
+ MetricsEnvironment.setMetricsSupported(true);
+
LOG.info("Executing pipeline using FlinkRunner.");
FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);
http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 10e98b8..1e67813 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -40,15 +40,11 @@ import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
-import org.apache.beam.sdk.metrics.MetricResult;
-import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -116,8 +112,6 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
}
SparkPipelineResult result = null;
- int expectedNumberOfAssertions = PAssert.countAsserts(pipeline);
-
// clear state of Aggregators, Metrics and Watermarks if exists.
AggregatorsAccumulator.clear();
MetricsAccumulator.clear();
@@ -137,47 +131,6 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
String.format("Finish state %s is not allowed.", finishState),
finishState,
isOneOf(PipelineResult.State.STOPPED, PipelineResult.State.DONE));
-
- // validate assertion succeeded (at least once).
- long successAssertions = 0;
- Iterable<MetricResult<Long>> counterResults = result.metrics().queryMetrics(
- MetricsFilter.builder()
- .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
- .build()).counters();
- for (MetricResult<Long> counter : counterResults) {
- if (counter.attempted().longValue() > 0) {
- successAssertions++;
- }
- }
- Integer expectedAssertions = testSparkPipelineOptions.getExpectedAssertions() != null
- ? testSparkPipelineOptions.getExpectedAssertions() : expectedNumberOfAssertions;
- assertThat(
- String.format(
- "Expected %d successful assertions, but found %d.",
- expectedAssertions, successAssertions),
- successAssertions,
- is(expectedAssertions.longValue()));
- // validate assertion didn't fail.
- long failedAssertions = 0;
- Iterable<MetricResult<Long>> failCounterResults = result.metrics().queryMetrics(
- MetricsFilter.builder()
- .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.FAILURE_COUNTER))
- .build()).counters();
- for (MetricResult<Long> counter : failCounterResults) {
- if (counter.attempted().longValue() > 0) {
- failedAssertions++;
- }
- }
- assertThat(
- String.format("Found %d failed assertions.", failedAssertions),
- failedAssertions,
- is(0L));
-
- LOG.info(
- String.format(
- "Successfully asserted pipeline %s with %d successful assertions.",
- testSparkPipelineOptions.getJobName(),
- successAssertions));
} finally {
try {
// cleanup checkpoint dir.
http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 7d7fd08..33571f0 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -180,7 +180,8 @@ public class ResumeFromCheckpointStreamingTest {
long successAssertions = 0;
Iterable<MetricResult<Long>> counterResults = res.metrics().queryMetrics(
MetricsFilter.builder()
- .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
+ .addNameFilter(
+ MetricNameFilter.named(PAssertWithoutFlatten.class, PAssert.SUCCESS_COUNTER))
.build()).counters();
for (MetricResult<Long> counter : counterResults) {
if (counter.attempted().longValue() > 0) {
@@ -196,7 +197,8 @@ public class ResumeFromCheckpointStreamingTest {
long failedAssertions = 0;
Iterable<MetricResult<Long>> failCounterResults = res.metrics().queryMetrics(
MetricsFilter.builder()
- .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.FAILURE_COUNTER))
+ .addNameFilter(MetricNameFilter.named(
+ PAssertWithoutFlatten.class, PAssert.FAILURE_COUNTER))
.build()).counters();
for (MetricResult<Long> counter : failCounterResults) {
if (counter.attempted().longValue() > 0) {
@@ -330,8 +332,10 @@ public class ResumeFromCheckpointStreamingTest {
}
private static class AssertDoFn<T> extends DoFn<Iterable<T>, Void> {
- private final Counter success = Metrics.counter(PAssert.class, PAssert.SUCCESS_COUNTER);
- private final Counter failure = Metrics.counter(PAssert.class, PAssert.FAILURE_COUNTER);
+ private final Counter success =
+ Metrics.counter(PAssertWithoutFlatten.class, PAssert.SUCCESS_COUNTER);
+ private final Counter failure =
+ Metrics.counter(PAssertWithoutFlatten.class, PAssert.FAILURE_COUNTER);
private final T[] expected;
AssertDoFn(T[] expected) {
http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index 2942578..a4b311f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -70,6 +70,11 @@ public class MetricsEnvironment {
METRICS_SUPPORTED.set(supported);
}
+ /** Indicates whether metrics reporting is supported. */
+ public static boolean isMetricsSupported() {
+ return METRICS_SUPPORTED.get();
+ }
+
/**
* Set the {@link MetricsContainer} for the current thread.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/95ade45e/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 868dcbd..d8fe51d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -18,6 +18,8 @@
package org.apache.beam.sdk.testing;
import static com.google.common.base.Preconditions.checkState;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.TreeNode;
@@ -41,6 +43,10 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
@@ -186,8 +192,8 @@ public class TestPipeline extends Pipeline implements TestRule {
if (pipelineRunSucceeded() && !visitedAll(pipelineNodes)) {
final boolean hasDanglingPAssert =
FluentIterable.from(pipelineNodes)
- .filter(Predicates.not(Predicates.in(runVisitedNodes)))
- .anyMatch(isPAssertNode);
+ .filter(Predicates.not(Predicates.in(runVisitedNodes)))
+ .anyMatch(isPAssertNode);
if (hasDanglingPAssert) {
throw new AbandonedNodeException("The pipeline contains abandoned PAssert(s).");
} else {
@@ -319,12 +325,13 @@ public class TestPipeline extends Pipeline implements TestRule {
checkState(
enforcement.isPresent(),
"Is your TestPipeline declaration missing a @Rule annotation? Usage: "
- + "@Rule public final transient TestPipeline pipeline = TestPipeline.create();");
+ + "@Rule public final transient TestPipeline pipeline = TestPipeline.create();");
final PipelineResult pipelineResult;
try {
enforcement.get().beforePipelineExecution();
pipelineResult = super.run();
+ verifyPAssertsSucceeded(pipelineResult);
} catch (RuntimeException exc) {
Throwable cause = exc.getCause();
if (cause instanceof AssertionError) {
@@ -385,8 +392,8 @@ public class TestPipeline extends Pipeline implements TestRule {
Strings.isNullOrEmpty(beamTestPipelineOptions)
? PipelineOptionsFactory.create()
: PipelineOptionsFactory.fromArgs(
- MAPPER.readValue(beamTestPipelineOptions, String[].class))
- .as(TestPipelineOptions.class);
+ MAPPER.readValue(beamTestPipelineOptions, String[].class))
+ .as(TestPipelineOptions.class);
options.as(ApplicationNameOptions.class).setAppName(getAppName());
// If no options were specified, set some reasonable defaults
@@ -488,6 +495,35 @@ public class TestPipeline extends Pipeline implements TestRule {
return firstInstanceAfterTestPipeline;
}
+ /**
+ * Verifies all {{@link PAssert PAsserts}} in the pipeline have been executed and were successful.
+ *
+ * <p>Note this only runs for runners which support Metrics. Runners which do not should verify
+ * this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001</p>
+ */
+ private void verifyPAssertsSucceeded(PipelineResult pipelineResult) {
+ if (MetricsEnvironment.isMetricsSupported()) {
+ long expectedNumberOfAssertions = (long) PAssert.countAsserts(this);
+
+ long successfulAssertions = 0;
+ Iterable<MetricResult<Long>> successCounterResults =
+ pipelineResult.metrics().queryMetrics(
+ MetricsFilter.builder()
+ .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
+ .build())
+ .counters();
+ for (MetricResult<Long> counter : successCounterResults) {
+ if (counter.attempted() > 0) {
+ successfulAssertions++;
+ }
+ }
+
+ assertThat(String
+ .format("Expected %d successful assertions, but found %d.", expectedNumberOfAssertions,
+ successfulAssertions), successfulAssertions, is(expectedNumberOfAssertions));
+ }
+ }
+
private static class IsEmptyVisitor extends PipelineVisitor.Defaults {
private boolean empty = true;
[6/6] beam git commit: This closes #2729
Posted by av...@apache.org.
This closes #2729
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b73918b5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b73918b5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b73918b5
Branch: refs/heads/master
Commit: b73918b55ab06e5a47ef9dc33ae3dbaebaed330a
Parents: 48c8ed1 8d91a97
Author: Aviem Zur <av...@gmail.com>
Authored: Thu May 4 21:10:14 2017 +0300
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 21:10:14 2017 +0300
----------------------------------------------------------------------
.../beam/runners/core/SideInputHandler.java | 10 +-
.../apache/beam/runners/flink/FlinkRunner.java | 3 +
.../FlinkStreamingTransformTranslators.java | 26 +++++
.../wrappers/streaming/DoFnOperator.java | 27 ++++-
.../streaming/state/FlinkStateInternals.java | 2 +
runners/spark/pom.xml | 47 +++++++-
.../runners/spark/SparkRunnerRegistrar.java | 4 +-
.../beam/runners/spark/TestSparkRunner.java | 47 --------
.../apache/beam/runners/spark/CacheTest.java | 12 +-
.../beam/runners/spark/ForceStreamingTest.java | 18 +--
.../apache/beam/runners/spark/PipelineRule.java | 109 -------------------
.../runners/spark/ProvidedSparkContextTest.java | 10 +-
.../runners/spark/SparkRunnerDebuggerTest.java | 15 +--
.../runners/spark/SparkRunnerRegistrarTest.java | 2 +-
.../beam/runners/spark/StreamingTest.java | 23 ++++
.../metrics/sink/SparkMetricsSinkTest.java | 12 +-
.../beam/runners/spark/io/AvroPipelineTest.java | 10 +-
.../beam/runners/spark/io/NumShardsTest.java | 6 +-
.../spark/translation/StorageLevelTest.java | 31 +++++-
.../translation/streaming/CreateStreamTest.java | 53 ++++-----
.../ResumeFromCheckpointStreamingTest.java | 62 +++++++----
.../streaming/StreamingSourceMetricsTest.java | 14 +--
.../main/java/org/apache/beam/sdk/Pipeline.java | 2 +-
.../beam/sdk/metrics/MetricsEnvironment.java | 5 +
.../apache/beam/sdk/testing/TestPipeline.java | 61 ++++++++++-
25 files changed, 330 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
[5/6] beam git commit: [BEAM-1726] Fix RuntimeException throwing in
FlinkStateInternals
Posted by av...@apache.org.
[BEAM-1726] Fix RuntimeException throwing in FlinkStateInternals
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c44935e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c44935e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c44935e
Branch: refs/heads/master
Commit: 7c44935e1c47cce2ecfe842e37c2cf89f48d8583
Parents: 5555040
Author: Aviem Zur <av...@gmail.com>
Authored: Sat Mar 18 15:21:45 2017 +0200
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300
----------------------------------------------------------------------
.../translation/wrappers/streaming/state/FlinkStateInternals.java | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7c44935e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
index c033be6..cea6e0f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -592,6 +592,8 @@ public class FlinkStateInternals<K> implements StateInternals {
}
current = combineFn.addInput(current, value);
state.update(current);
+ } catch (RuntimeException re) {
+ throw re;
} catch (Exception e) {
throw new RuntimeException("Error adding to state." , e);
}
[4/6] beam git commit: [BEAM-1726] Fix empty side inputs in Flink
Streaming Runner
Posted by av...@apache.org.
[BEAM-1726] Fix empty side inputs in Flink Streaming Runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5555040d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5555040d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5555040d
Branch: refs/heads/master
Commit: 5555040d935c67f5cd48f2ffe2721a07fe6e0a50
Parents: 95ade45
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sat Mar 18 12:16:06 2017 +0100
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300
----------------------------------------------------------------------
.../beam/runners/core/SideInputHandler.java | 10 ++++----
.../wrappers/streaming/DoFnOperator.java | 27 +++++++++++++++++++-
2 files changed, 31 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5555040d/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 5c67148..b29f9d0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -161,11 +162,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
@Override
public <T> T get(PCollectionView<T> sideInput, BoundedWindow window) {
- if (!isReady(sideInput, window)) {
- throw new IllegalStateException(
- "Side input " + sideInput + " is not ready for window " + window);
- }
-
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder =
(Coder<BoundedWindow>) sideInput
@@ -181,6 +177,10 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
Iterable<WindowedValue<?>> elements = state.read();
+ if (elements == null) {
+ elements = Collections.emptyList();
+ }
+
return sideInput.getViewFn().apply(elements);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5555040d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index c624036..16bf5d2 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -463,7 +463,32 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
@Override
public void processWatermark2(Watermark mark) throws Exception {
- // ignore watermarks from the side-input input
+ if (mark.getTimestamp() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ // this means we will never see any more side input
+ pushbackDoFnRunner.startBundle();
+
+ BagState<WindowedValue<InputT>> pushedBack =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+ Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
+ if (pushedBackContents != null) {
+ for (WindowedValue<InputT> elem : pushedBackContents) {
+
+ // we need to set the correct key in case the operator is
+ // a (keyed) window operator
+ setKeyContextElement1(new StreamRecord<>(elem));
+
+ doFnRunner.processElement(elem);
+ }
+ }
+
+ setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+
+ pushbackDoFnRunner.finishBundle();
+
+ // maybe output a new watermark
+ processWatermark1(new Watermark(currentInputWatermark));
+ }
}
@Override
[2/6] beam git commit: [BEAM-1726] Fix Flatten with input copies in
Flink Streaming Runner
Posted by av...@apache.org.
[BEAM-1726] Fix Flatten with input copies in Flink Streaming Runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e2bb180
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e2bb180
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e2bb180
Branch: refs/heads/master
Commit: 0e2bb1808350cbebf771d0971deb06787732800d
Parents: 7c44935
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Mar 19 07:49:08 2017 +0100
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 26 ++++++++++++++++++++
1 file changed, 26 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0e2bb180/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index c024493..7339c01 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -966,10 +966,36 @@ class FlinkStreamingTransformTranslators {
} else {
DataStream<T> result = null;
+
+ // Determine DataStreams that we use as input several times. For those, we need to uniquify
+ // input streams because Flink seems to swallow watermarks when we have a union of one and
+ // the same stream.
+ Map<DataStream<T>, Integer> duplicates = new HashMap<>();
+ for (PValue input : allInputs.values()) {
+ DataStream<T> current = context.getInputDataStream(input);
+ Integer oldValue = duplicates.put(current, 1);
+ if (oldValue != null) {
+ duplicates.put(current, oldValue + 1);
+ }
+ }
+
for (PValue input : allInputs.values()) {
DataStream<T> current = context.getInputDataStream(input);
+
+ final Integer timesRequired = duplicates.get(current);
+ if (timesRequired > 1) {
+ current = current.flatMap(new FlatMapFunction<T, T>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(T t, Collector<T> collector) throws Exception {
+ collector.collect(t);
+ }
+ });
+ }
result = (result == null) ? current : result.union(current);
}
+
context.setOutputDataStream(context.getOutput(transform), result);
}
}