You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/17 06:16:10 UTC
[1/2] incubator-beam git commit: [BEAM-853] Force streaming execution
on batch pipelines for testing. Expose the adapted source.
Repository: incubator-beam
Updated Branches:
refs/heads/master ffe3ab3d6 -> 4206408bf
[BEAM-853] Force streaming execution on batch pipelines for testing.
Expose the adapted source.
Force streaming execution, if set in PipelineOptions.
Added test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5f41deda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5f41deda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5f41deda
Branch: refs/heads/master
Commit: 5f41deda509acbbbc6280323e583bb3c1af2dad2
Parents: 1ad638e
Author: Sela <an...@paypal.com>
Authored: Wed Dec 14 12:20:08 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Dec 16 22:03:44 2016 -0800
----------------------------------------------------------------------
.../runners/spark/SparkPipelineOptions.java | 5 +
.../beam/runners/spark/TestSparkRunner.java | 80 +++++++++++-
.../beam/runners/spark/ForceStreamingTest.java | 123 +++++++++++++++++++
.../sdk/io/BoundedReadFromUnboundedSource.java | 14 ++-
4 files changed, 217 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f41deda/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index a2cd887..04c559e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -100,4 +100,9 @@ public interface SparkPipelineOptions
@Default.Boolean(false)
boolean getUsesProvidedSparkContext();
void setUsesProvidedSparkContext(boolean value);
+
+ @Description("A special flag that forces streaming in tests.")
+ @Default.Boolean(false)
+ boolean isForceStreaming();
+ void setForceStreaming(boolean forceStreaming);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f41deda/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 2c26d84..798ca47 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -19,16 +19,26 @@
package org.apache.beam.runners.spark;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.ValueWithRecordId;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
+
/**
* The SparkRunner translate operations defined on a pipeline to a representation executable
* by Spark, and then submitting the job to Spark to be executed. If we wanted to run a Beam
@@ -53,9 +63,12 @@ import org.apache.beam.sdk.values.POutput;
public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
private SparkRunner delegate;
+ private boolean isForceStreaming;
+ private int expectedNumberOfAssertions = 0;
private TestSparkRunner(SparkPipelineOptions options) {
this.delegate = SparkRunner.fromOptions(options);
+ this.isForceStreaming = options.isForceStreaming();
}
public static TestSparkRunner fromOptions(PipelineOptions options) {
@@ -65,19 +78,78 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
return new TestSparkRunner(sparkOptions);
}
+ /**
+ * Overrides for the test runner.
+ */
+ @SuppressWarnings("unchecked")
@Override
- public <OutputT extends POutput, InputT extends PInput>
- OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
- return delegate.apply(transform, input);
- };
+ public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+ PTransform<InputT, OutputT> transform, InputT input) {
+ // if the pipeline forces execution as a streaming pipeline,
+ // and the source is an adapted unbounded source (as bounded),
+ // read it as unbounded source via UnboundedReadFromBoundedSource.
+ if (isForceStreaming && transform instanceof BoundedReadFromUnboundedSource) {
+ return (OutputT) delegate.apply(new AdaptedBoundedAsUnbounded(
+ (BoundedReadFromUnboundedSource) transform), input);
+ } else {
+ // no actual override, simply counts asserting transforms in the pipeline.
+ if (transform instanceof PAssert.OneSideInputAssert
+ || transform instanceof PAssert.GroupThenAssert
+ || transform instanceof PAssert.GroupThenAssertForSingleton) {
+ expectedNumberOfAssertions += 1;
+ }
+
+ return delegate.apply(transform, input);
+ }
+ }
@Override
public SparkPipelineResult run(Pipeline pipeline) {
TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class);
SparkPipelineResult result = delegate.run(pipeline);
result.waitUntilFinish();
+
+ // make sure the test pipeline finished successfully.
+ State resultState = result.getState();
+ assertThat(
+ String.format("Test pipeline result state was %s instead of %s", resultState, State.DONE),
+ resultState,
+ is(State.DONE));
assertThat(result, testPipelineOptions.getOnCreateMatcher());
assertThat(result, testPipelineOptions.getOnSuccessMatcher());
+
+ // if the pipeline was executed in streaming mode, validate aggregators.
+ if (isForceStreaming) {
+ // validate assertion succeeded (at least once).
+ int success = result.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
+ assertThat(
+ String.format(
+ "Expected %d successful assertions, but found %d.",
+ expectedNumberOfAssertions, success),
+ success,
+ is(expectedNumberOfAssertions));
+ // validate assertion didn't fail.
+ int failure = result.getAggregatorValue(PAssert.FAILURE_COUNTER, Integer.class);
+ assertThat("Failure aggregator should be zero.", failure, is(0));
+ }
return result;
}
+
+ private static class AdaptedBoundedAsUnbounded<T> extends PTransform<PBegin, PCollection<T>> {
+ private final BoundedReadFromUnboundedSource<T> source;
+
+ AdaptedBoundedAsUnbounded(BoundedReadFromUnboundedSource<T> source) {
+ this.source = source;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ PTransform<PBegin, ? extends PCollection<ValueWithRecordId<T>>> replacingTransform =
+ new UnboundedReadFromBoundedSource<>(source.getAdaptedSource());
+ return (PCollection<T>) input.apply(replacingTransform)
+ .apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn()));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f41deda/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
new file mode 100644
index 0000000..eb17eea
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.junit.Test;
+
+
+/**
+ * Test that we can "force streaming" on pipelines with {@link BoundedReadFromUnboundedSource}
+ * inputs using the {@link TestSparkRunner}.
+ *
+ * <p>The test validates that when a pipeline reads from a {@link BoundedReadFromUnboundedSource},
+ * with {@link SparkPipelineOptions#setStreaming(boolean)} true
+ * and using the {@link TestSparkRunner}; the {@link Read.Bounded} transform
+ * is replaced by an {@link Read.Unbounded} transform.
+ *
+ * <p>This test does not execute a pipeline.
+ */
+public class ForceStreamingTest {
+
+ @Test
+ public void test() throws IOException {
+ SparkPipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+ options.setRunner(TestSparkRunner.class);
+ // force streaming.
+ options.setForceStreaming(true);
+
+ Pipeline pipeline = Pipeline.create(options);
+
+ // apply the BoundedReadFromUnboundedSource.
+ @SuppressWarnings("unchecked")
+ BoundedReadFromUnboundedSource boundedRead =
+ Read.from(new FakeUnboundedSource()).withMaxNumRecords(-1);
+ //noinspection unchecked
+ pipeline.apply(boundedRead);
+
+ UnboundedReadDetector unboundedReadDetector = new UnboundedReadDetector();
+ pipeline.traverseTopologically(unboundedReadDetector);
+
+ // assert that the applied BoundedReadFromUnboundedSource
+ // is being treated as an unbounded read.
+ assertThat("Expected to have an unbounded read.", unboundedReadDetector.isUnbounded);
+ }
+
+ /**
+ * Traverses the Pipeline to check if the input is indeed a {@link Read.Unbounded}.
+ */
+ private class UnboundedReadDetector extends Pipeline.PipelineVisitor.Defaults {
+ private boolean isUnbounded = false;
+
+ @Override
+ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ Class<? extends PTransform> transformClass = node.getTransform().getClass();
+ if (transformClass == Read.Unbounded.class) {
+ isUnbounded = true;
+ }
+ }
+
+ }
+
+ /**
+ * A fake {@link UnboundedSource} to satisfy the compiler.
+ */
+ private static class FakeUnboundedSource extends UnboundedSource {
+
+ @Override
+ public List<? extends UnboundedSource> generateInitialSplits(
+ int desiredNumSplits,
+ PipelineOptions options) throws Exception {
+ return null;
+ }
+
+ @Override
+ public UnboundedReader createReader(
+ PipelineOptions options,
+ CheckpointMark checkpointMark) throws IOException {
+ return null;
+ }
+
+ @Override
+ public Coder getCheckpointMarkCoder() {
+ return null;
+ }
+
+ @Override
+ public void validate() { }
+
+ @Override
+ public Coder getDefaultOutputCoder() {
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f41deda/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index f2ef358..84e3044 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Distinct;
@@ -50,6 +51,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
private final UnboundedSource<T, ?> source;
private final long maxNumRecords;
private final Duration maxReadTime;
+ private final BoundedSource<ValueWithRecordId<T>> adaptedSource;
private static final FluentBackoff BACKOFF_FACTORY =
FluentBackoff.DEFAULT
.withInitialBackoff(Duration.millis(10))
@@ -81,12 +83,22 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
this.source = source;
this.maxNumRecords = maxNumRecords;
this.maxReadTime = maxReadTime;
+ this.adaptedSource = new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime);
+ }
+
+ /**
+ * Returns an adapted {@link BoundedSource} wrapping the underlying {@link UnboundedSource},
+ * with the specified bounds on number of records and read time.
+ */
+ @Experimental
+ public BoundedSource<ValueWithRecordId<T>> getAdaptedSource() {
+ return adaptedSource;
}
@Override
public PCollection<T> expand(PBegin input) {
PCollection<ValueWithRecordId<T>> read = Pipeline.applyTransform(input,
- Read.from(new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime)));
+ Read.from(getAdaptedSource()));
if (source.requiresDeduping()) {
read = read.apply(Distinct.withRepresentativeValueFn(
new SerializableFunction<ValueWithRecordId<T>, byte[]>() {
[2/2] incubator-beam git commit: This closes #1614: Spark runner:
Force streaming execution on batch pipelines for testing
Posted by ke...@apache.org.
This closes #1614: Spark runner: Force streaming execution on batch pipelines for testing
[BEAM-853] Force streaming execution on batch pipelines for testing. Expose the adapted source.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4206408b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4206408b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4206408b
Branch: refs/heads/master
Commit: 4206408bf98898be3c37c1d9f518de8ea9acd170
Parents: ffe3ab3 5f41ded
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Dec 16 22:03:59 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Dec 16 22:03:59 2016 -0800
----------------------------------------------------------------------
.../runners/spark/SparkPipelineOptions.java | 5 +
.../beam/runners/spark/TestSparkRunner.java | 80 +++++++++++-
.../beam/runners/spark/ForceStreamingTest.java | 123 +++++++++++++++++++
.../sdk/io/BoundedReadFromUnboundedSource.java | 14 ++-
4 files changed, 217 insertions(+), 5 deletions(-)
----------------------------------------------------------------------