You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/25 18:55:50 UTC

[36/50] incubator-beam git commit: SparkRunner batch interval as a configuration instead of Beam Windows.

SparkRunner batch interval as a configuration instead of Beam Windows.

Add the batch interval to the pipeline options, default arbitrarily to 1000 msec.

Pick-up the batch interval from pipeline options and remove StreamingWindowPipelineDetector.

Use SDK API to get the window function.

Update the README

Update streaming tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/08dca30a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/08dca30a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/08dca30a

Branch: refs/heads/gearpump-runner
Commit: 08dca30a38e11c13e8a4b2db1529f1306cc489b0
Parents: 95e7f01
Author: Sela <an...@paypal.com>
Authored: Wed Aug 10 13:30:30 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Aug 10 20:42:17 2016 +0300

----------------------------------------------------------------------
 runners/spark/README.md                         |   3 +-
 .../runners/spark/SparkPipelineOptions.java     |   5 +
 .../apache/beam/runners/spark/SparkRunner.java  |  19 +---
 .../streaming/StreamingTransformTranslator.java |  10 +-
 .../StreamingWindowPipelineDetector.java        | 102 -------------------
 .../streaming/FlattenStreamingTest.java         |   1 +
 .../streaming/KafkaStreamingTest.java           |   2 +
 .../streaming/SimpleStreamingWordCountTest.java |   2 +
 8 files changed, 18 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/README.md
----------------------------------------------------------------------
diff --git a/runners/spark/README.md b/runners/spark/README.md
index d2bfd3e..ef42fa7 100644
--- a/runners/spark/README.md
+++ b/runners/spark/README.md
@@ -63,8 +63,7 @@ The Spark runner provides support for batch processing of Beam bounded PCollecti
 ### Streaming
 
 The Spark runner currently provides partial support for stream processing of Beam unbounded PCollections as Spark [DStream](http://spark.apache.org/docs/latest/streaming-programming-guide.html#discretized-streams-dstreams)s.  
-Current implementation of *Windowing* takes the first window size in the pipeline and treats it as the Spark "batch interval", while following windows will be treated as *Processing Time* windows.  
-Further work is required to provide full support for the Beam Model *event-time* and *out-of-order* stream processing.
+Currently, both *FixedWindows* and *SlidingWindows* are supported, but only with processing-time triggers and discarding pane.  
 
 ### issue tracking
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 6ef3741..080ff19 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -39,4 +39,9 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
   @Default.Long(-1)
   Long getTimeout();
   void setTimeout(Long batchInterval);
+
+  @Description("Batch interval for Spark streaming in milliseconds.")
+  @Default.Long(1000)
+  Long getBatchIntervalMillis();
+  void setBatchIntervalMillis(Long batchInterval);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index d994ec4..be50f70 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -26,7 +26,6 @@ import org.apache.beam.runners.spark.translation.SparkProcessContext;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
 import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
-import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector;
 import org.apache.beam.runners.spark.util.SinglePrimitiveOutputPTransform;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -145,24 +144,16 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
   public EvaluationResult run(Pipeline pipeline) {
     try {
       LOG.info("Executing pipeline using the SparkRunner.");
-      JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions
-              .getSparkMaster(), mOptions.getAppName());
+      JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions.getSparkMaster(),
+          mOptions.getAppName());
 
       if (mOptions.isStreaming()) {
         SparkPipelineTranslator translator =
-                new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
-        // if streaming - fixed window should be defined on all UNBOUNDED inputs
-        StreamingWindowPipelineDetector streamingWindowPipelineDetector =
-            new StreamingWindowPipelineDetector(translator);
-        pipeline.traverseTopologically(streamingWindowPipelineDetector);
-        if (!streamingWindowPipelineDetector.isWindowing()) {
-          throw new IllegalStateException("Spark streaming pipeline must be windowed!");
-        }
-
-        Duration batchInterval = streamingWindowPipelineDetector.getBatchDuration();
+            new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
+        Duration batchInterval = new Duration(mOptions.getBatchIntervalMillis());
         LOG.info("Setting Spark streaming batchInterval to {} msec", batchInterval.milliseconds());
-        EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval);
 
+        EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval);
         pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
         ctxt.computeOutputs();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index b0fb931..2ce2c29 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -25,7 +25,6 @@ import org.apache.beam.runners.spark.translation.DoFnFunction;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
 import org.apache.beam.runners.spark.translation.TransformEvaluator;
-import org.apache.beam.runners.spark.translation.TransformTranslator;
 import org.apache.beam.runners.spark.translation.WindowingHelpers;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.AvroIO;
@@ -35,7 +34,6 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -290,16 +288,12 @@ public final class StreamingTransformTranslator {
     }
   }
 
-  private static final TransformTranslator.FieldGetter WINDOW_FG =
-      new TransformTranslator.FieldGetter(Window.Bound.class);
-
-  private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
+  private static <T> TransformEvaluator<Window.Bound<T>> window() {
     return new TransformEvaluator<Window.Bound<T>>() {
       @Override
       public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
         StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        //--- first we apply windowing to the stream
-        WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
+        WindowFn<? super T, ?> windowFn = transform.getWindowFn();
         @SuppressWarnings("unchecked")
         JavaDStream<WindowedValue<T>> dStream =
             (JavaDStream<WindowedValue<T>>) sec.getStream(transform);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
deleted file mode 100644
index 394b2c5..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation.streaming;
-
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
-import org.apache.beam.runners.spark.translation.TransformTranslator;
-import org.apache.beam.sdk.runners.TransformTreeNode;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.Durations;
-
-
-/**
- * Pipeline {@link SparkRunner.Evaluator} to detect windowing.
- */
-public final class StreamingWindowPipelineDetector extends SparkRunner.Evaluator {
-
-  // Currently, Spark streaming recommends batches no smaller then 500 msec
-  private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500);
-
-  private boolean windowing;
-  private Duration batchDuration;
-
-  public StreamingWindowPipelineDetector(SparkPipelineTranslator translator) {
-    super(translator);
-  }
-
-  private static final TransformTranslator.FieldGetter WINDOW_FG =
-      new TransformTranslator.FieldGetter(Window.Bound.class);
-
-  // Use the smallest window (fixed or sliding) as Spark streaming's batch duration
-  @Override
-  protected <TransformT extends PTransform<? super PInput, POutput>> void
-      doVisitTransform(TransformTreeNode node) {
-    @SuppressWarnings("unchecked")
-    TransformT transform = (TransformT) node.getTransform();
-    @SuppressWarnings("unchecked")
-    Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) transform.getClass();
-    if (transformClass.isAssignableFrom(Window.Bound.class)) {
-      WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform);
-      if (windowFn instanceof FixedWindows) {
-        setBatchDuration(((FixedWindows) windowFn).getSize());
-      } else if (windowFn instanceof SlidingWindows) {
-        if (((SlidingWindows) windowFn).getOffset().getMillis() > 0) {
-          throw new UnsupportedOperationException("Spark does not support window offsets");
-        }
-        // Sliding window size might as well set the batch duration. Applying the transformation
-        // will add the "slide"
-        setBatchDuration(((SlidingWindows) windowFn).getSize());
-      } else if (!(windowFn instanceof GlobalWindows)) {
-        throw new IllegalStateException("Windowing function not supported: " + windowFn);
-      }
-    }
-  }
-
-  private void setBatchDuration(org.joda.time.Duration duration) {
-    Long durationMillis = duration.getMillis();
-    // validate window size
-    if (durationMillis < SPARK_MIN_WINDOW.milliseconds()) {
-      throw new IllegalArgumentException("Windowing of size " + durationMillis
-          + "msec is not supported!");
-    }
-    // choose the smallest duration to be Spark's batch duration, larger ones will be handled
-    // as window functions  over the batched-stream
-    if (!windowing || this.batchDuration.milliseconds() > durationMillis) {
-      this.batchDuration = Durations.milliseconds(durationMillis);
-    }
-    windowing = true;
-  }
-
-  public boolean isWindowing() {
-    return windowing;
-  }
-
-  public Duration getBatchDuration() {
-    return batchDuration;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index deb1b6a..6f4d8fb 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -61,6 +61,7 @@ public class FlattenStreamingTest {
         PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkRunner.class);
     options.setStreaming(true);
+    // using the default 1000 msec interval
     options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
     Pipeline p = Pipeline.create(options);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 17044aa..2527152 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.spark.streaming.Durations;
 import org.joda.time.Duration;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -92,6 +93,7 @@ public class KafkaStreamingTest {
         PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkRunner.class);
     options.setStreaming(true);
+    options.setBatchIntervalMillis(Durations.seconds(1).milliseconds());
     options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
     Pipeline p = Pipeline.create(options);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 5627056..c761fae 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
 
+import org.apache.spark.streaming.Durations;
 import org.joda.time.Duration;
 import org.junit.Test;
 
@@ -58,6 +59,7 @@ public class SimpleStreamingWordCountTest implements Serializable {
         PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkRunner.class);
     options.setStreaming(true);
+    options.setBatchIntervalMillis(Durations.seconds(1).milliseconds());
     options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
     Pipeline p = Pipeline.create(options);