You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 20:23:24 UTC

[16/50] incubator-beam git commit: [BEAM-853] Force streaming execution on batch pipelines for testing. Expose the adapted source.

[BEAM-853] Force streaming execution on batch pipelines for testing.
Expose the adapted source.

Force streaming execution, if set in PipelineOptions.

Added test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5f41deda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5f41deda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5f41deda

Branch: refs/heads/gearpump-runner
Commit: 5f41deda509acbbbc6280323e583bb3c1af2dad2
Parents: 1ad638e
Author: Sela <an...@paypal.com>
Authored: Wed Dec 14 12:20:08 2016 +0200
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Dec 16 22:03:44 2016 -0800

----------------------------------------------------------------------
 .../runners/spark/SparkPipelineOptions.java     |   5 +
 .../beam/runners/spark/TestSparkRunner.java     |  80 +++++++++++-
 .../beam/runners/spark/ForceStreamingTest.java  | 123 +++++++++++++++++++
 .../sdk/io/BoundedReadFromUnboundedSource.java  |  14 ++-
 4 files changed, 217 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f41deda/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index a2cd887..04c559e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -100,4 +100,9 @@ public interface SparkPipelineOptions
   @Default.Boolean(false)
   boolean getUsesProvidedSparkContext();
   void setUsesProvidedSparkContext(boolean value);
+
+  @Description("A special flag that forces streaming in tests.")
+  @Default.Boolean(false)
+  boolean isForceStreaming();
+  void setForceStreaming(boolean forceStreaming);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f41deda/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 2c26d84..798ca47 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -19,16 +19,26 @@
 package org.apache.beam.runners.spark;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 
+import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.ValueWithRecordId;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 
+
 /**
  * The SparkRunner translate operations defined on a pipeline to a representation executable
  * by Spark, and then submitting the job to Spark to be executed. If we wanted to run a Beam
@@ -53,9 +63,12 @@ import org.apache.beam.sdk.values.POutput;
 public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
 
   private SparkRunner delegate;
+  private boolean isForceStreaming;
+  private int expectedNumberOfAssertions = 0;
 
   private TestSparkRunner(SparkPipelineOptions options) {
     this.delegate = SparkRunner.fromOptions(options);
+    this.isForceStreaming = options.isForceStreaming();
   }
 
   public static TestSparkRunner fromOptions(PipelineOptions options) {
@@ -65,19 +78,78 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
     return new TestSparkRunner(sparkOptions);
   }
 
+  /**
+   * Overrides for the test runner.
+   */
+  @SuppressWarnings("unchecked")
   @Override
-  public <OutputT extends POutput, InputT extends PInput>
-      OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
-    return delegate.apply(transform, input);
-  };
+  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+          PTransform<InputT, OutputT> transform, InputT input) {
+    // if the pipeline forces execution as a streaming pipeline,
+    // and the source is an adapted unbounded source (as bounded),
+    // read it as unbounded source via UnboundedReadFromBoundedSource.
+    if (isForceStreaming && transform instanceof BoundedReadFromUnboundedSource) {
+      return (OutputT) delegate.apply(new AdaptedBoundedAsUnbounded(
+          (BoundedReadFromUnboundedSource) transform), input);
+    } else {
+      // no actual override, simply counts asserting transforms in the pipeline.
+      if (transform instanceof PAssert.OneSideInputAssert
+          || transform instanceof PAssert.GroupThenAssert
+          || transform instanceof PAssert.GroupThenAssertForSingleton) {
+        expectedNumberOfAssertions += 1;
+      }
+
+      return delegate.apply(transform, input);
+    }
+  }
 
   @Override
   public SparkPipelineResult run(Pipeline pipeline) {
     TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class);
     SparkPipelineResult result = delegate.run(pipeline);
     result.waitUntilFinish();
+
+    // make sure the test pipeline finished successfully.
+    State resultState = result.getState();
+    assertThat(
+        String.format("Test pipeline result state was %s instead of %s", resultState, State.DONE),
+        resultState,
+        is(State.DONE));
     assertThat(result, testPipelineOptions.getOnCreateMatcher());
     assertThat(result, testPipelineOptions.getOnSuccessMatcher());
+
+    // if the pipeline was executed in streaming mode, validate aggregators.
+    if (isForceStreaming) {
+      // validate assertion succeeded (at least once).
+      int success = result.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
+      assertThat(
+          String.format(
+              "Expected %d successful assertions, but found %d.",
+              expectedNumberOfAssertions, success),
+          success,
+          is(expectedNumberOfAssertions));
+      // validate assertion didn't fail.
+      int failure = result.getAggregatorValue(PAssert.FAILURE_COUNTER, Integer.class);
+      assertThat("Failure aggregator should be zero.", failure, is(0));
+    }
     return result;
   }
+
+  private static class AdaptedBoundedAsUnbounded<T> extends PTransform<PBegin, PCollection<T>> {
+    private final BoundedReadFromUnboundedSource<T> source;
+
+    AdaptedBoundedAsUnbounded(BoundedReadFromUnboundedSource<T> source) {
+      this.source = source;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      PTransform<PBegin, ? extends PCollection<ValueWithRecordId<T>>> replacingTransform =
+          new UnboundedReadFromBoundedSource<>(source.getAdaptedSource());
+      return (PCollection<T>) input.apply(replacingTransform)
+          .apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn()));
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f41deda/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
new file mode 100644
index 0000000..eb17eea
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.junit.Test;
+
+
+/**
+ * Test that we can "force streaming" on pipelines with {@link BoundedReadFromUnboundedSource}
+ * inputs using the {@link TestSparkRunner}.
+ *
+ * <p>The test validates that when a pipeline reads from a {@link BoundedReadFromUnboundedSource},
+ * with {@link SparkPipelineOptions#setStreaming(boolean)} true
+ * and using the {@link TestSparkRunner}; the {@link Read.Bounded} transform
+ * is replaced by an {@link Read.Unbounded} transform.
+ *
+ * <p>This test does not execute a pipeline.
+ */
+public class ForceStreamingTest {
+
+  @Test
+  public void test() throws IOException {
+    SparkPipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+    options.setRunner(TestSparkRunner.class);
+    // force streaming.
+    options.setForceStreaming(true);
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    // apply the BoundedReadFromUnboundedSource.
+    @SuppressWarnings("unchecked")
+    BoundedReadFromUnboundedSource boundedRead =
+        Read.from(new FakeUnboundedSource()).withMaxNumRecords(-1);
+    //noinspection unchecked
+    pipeline.apply(boundedRead);
+
+    UnboundedReadDetector unboundedReadDetector = new UnboundedReadDetector();
+    pipeline.traverseTopologically(unboundedReadDetector);
+
+    // assert that the applied BoundedReadFromUnboundedSource
+    // is being treated as an unbounded read.
+    assertThat("Expected to have an unbounded read.", unboundedReadDetector.isUnbounded);
+  }
+
+  /**
+   * Traverses the Pipeline to check if the input is indeed a {@link Read.Unbounded}.
+   */
+  private class UnboundedReadDetector extends Pipeline.PipelineVisitor.Defaults {
+    private boolean isUnbounded = false;
+
+    @Override
+    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+      Class<? extends PTransform> transformClass = node.getTransform().getClass();
+      if (transformClass == Read.Unbounded.class) {
+        isUnbounded = true;
+      }
+    }
+
+  }
+
+  /**
+   * A fake {@link UnboundedSource} to satisfy the compiler.
+   */
+  private static class FakeUnboundedSource extends UnboundedSource {
+
+    @Override
+    public List<? extends UnboundedSource> generateInitialSplits(
+        int desiredNumSplits,
+        PipelineOptions options) throws Exception {
+      return null;
+    }
+
+    @Override
+    public UnboundedReader createReader(
+        PipelineOptions options,
+        CheckpointMark checkpointMark) throws IOException {
+      return null;
+    }
+
+    @Override
+    public Coder getCheckpointMarkCoder() {
+      return null;
+    }
+
+    @Override
+    public void validate() { }
+
+    @Override
+    public Coder getDefaultOutputCoder() {
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f41deda/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index f2ef358..84e3044 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Distinct;
@@ -50,6 +51,7 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
   private final UnboundedSource<T, ?> source;
   private final long maxNumRecords;
   private final Duration maxReadTime;
+  private final BoundedSource<ValueWithRecordId<T>> adaptedSource;
   private static final FluentBackoff BACKOFF_FACTORY =
       FluentBackoff.DEFAULT
           .withInitialBackoff(Duration.millis(10))
@@ -81,12 +83,22 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
     this.source = source;
     this.maxNumRecords = maxNumRecords;
     this.maxReadTime = maxReadTime;
+    this.adaptedSource = new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime);
+  }
+
+  /**
+   * Returns an adapted {@link BoundedSource} wrapping the underlying {@link UnboundedSource},
+   * with the specified bounds on number of records and read time.
+   */
+  @Experimental
+  public BoundedSource<ValueWithRecordId<T>> getAdaptedSource() {
+    return adaptedSource;
   }
 
   @Override
   public PCollection<T> expand(PBegin input) {
     PCollection<ValueWithRecordId<T>> read = Pipeline.applyTransform(input,
-        Read.from(new UnboundedToBoundedSourceAdapter<>(source, maxNumRecords, maxReadTime)));
+        Read.from(getAdaptedSource()));
     if (source.requiresDeduping()) {
       read = read.apply(Distinct.withRepresentativeValueFn(
           new SerializableFunction<ValueWithRecordId<T>, byte[]>() {