You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/25 18:55:50 UTC
[36/50] incubator-beam git commit: SparkRunner batch interval as a
configuration instead of Beam Windows.
SparkRunner batch interval as a configuration instead of Beam Windows.
Add the batch interval to the pipeline options, default arbitrarily to 1000 msec.
Pick-up the batch interval from pipeline options and remove StreamingWindowPipelineDetector.
Use SDK API to get the window function.
Update the README
Update streaming tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/08dca30a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/08dca30a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/08dca30a
Branch: refs/heads/gearpump-runner
Commit: 08dca30a38e11c13e8a4b2db1529f1306cc489b0
Parents: 95e7f01
Author: Sela <an...@paypal.com>
Authored: Wed Aug 10 13:30:30 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Aug 10 20:42:17 2016 +0300
----------------------------------------------------------------------
runners/spark/README.md | 3 +-
.../runners/spark/SparkPipelineOptions.java | 5 +
.../apache/beam/runners/spark/SparkRunner.java | 19 +---
.../streaming/StreamingTransformTranslator.java | 10 +-
.../StreamingWindowPipelineDetector.java | 102 -------------------
.../streaming/FlattenStreamingTest.java | 1 +
.../streaming/KafkaStreamingTest.java | 2 +
.../streaming/SimpleStreamingWordCountTest.java | 2 +
8 files changed, 18 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/README.md
----------------------------------------------------------------------
diff --git a/runners/spark/README.md b/runners/spark/README.md
index d2bfd3e..ef42fa7 100644
--- a/runners/spark/README.md
+++ b/runners/spark/README.md
@@ -63,8 +63,7 @@ The Spark runner provides support for batch processing of Beam bounded PCollecti
### Streaming
The Spark runner currently provides partial support for stream processing of Beam unbounded PCollections as Spark [DStream](http://spark.apache.org/docs/latest/streaming-programming-guide.html#discretized-streams-dstreams)s.
-Current implementation of *Windowing* takes the first window size in the pipeline and treats it as the Spark "batch interval", while following windows will be treated as *Processing Time* windows.
-Further work is required to provide full support for the Beam Model *event-time* and *out-of-order* stream processing.
+Currently, both *FixedWindows* and *SlidingWindows* are supported, but only with processing-time triggers and discarding pane.
### issue tracking
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 6ef3741..080ff19 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -39,4 +39,9 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
@Default.Long(-1)
Long getTimeout();
void setTimeout(Long batchInterval);
+
+ @Description("Batch interval for Spark streaming in milliseconds.")
+ @Default.Long(1000)
+ Long getBatchIntervalMillis();
+ void setBatchIntervalMillis(Long batchInterval);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index d994ec4..be50f70 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -26,7 +26,6 @@ import org.apache.beam.runners.spark.translation.SparkProcessContext;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext;
import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
-import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector;
import org.apache.beam.runners.spark.util.SinglePrimitiveOutputPTransform;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -145,24 +144,16 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
public EvaluationResult run(Pipeline pipeline) {
try {
LOG.info("Executing pipeline using the SparkRunner.");
- JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions
- .getSparkMaster(), mOptions.getAppName());
+ JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions.getSparkMaster(),
+ mOptions.getAppName());
if (mOptions.isStreaming()) {
SparkPipelineTranslator translator =
- new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
- // if streaming - fixed window should be defined on all UNBOUNDED inputs
- StreamingWindowPipelineDetector streamingWindowPipelineDetector =
- new StreamingWindowPipelineDetector(translator);
- pipeline.traverseTopologically(streamingWindowPipelineDetector);
- if (!streamingWindowPipelineDetector.isWindowing()) {
- throw new IllegalStateException("Spark streaming pipeline must be windowed!");
- }
-
- Duration batchInterval = streamingWindowPipelineDetector.getBatchDuration();
+ new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
+ Duration batchInterval = new Duration(mOptions.getBatchIntervalMillis());
LOG.info("Setting Spark streaming batchInterval to {} msec", batchInterval.milliseconds());
- EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval);
+ EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval);
pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator));
ctxt.computeOutputs();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index b0fb931..2ce2c29 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -25,7 +25,6 @@ import org.apache.beam.runners.spark.translation.DoFnFunction;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
import org.apache.beam.runners.spark.translation.TransformEvaluator;
-import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.WindowingHelpers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.AvroIO;
@@ -35,7 +34,6 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -290,16 +288,12 @@ public final class StreamingTransformTranslator {
}
}
- private static final TransformTranslator.FieldGetter WINDOW_FG =
- new TransformTranslator.FieldGetter(Window.Bound.class);
-
- private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() {
+ private static <T> TransformEvaluator<Window.Bound<T>> window() {
return new TransformEvaluator<Window.Bound<T>>() {
@Override
public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
- //--- first we apply windowing to the stream
- WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
+ WindowFn<? super T, ?> windowFn = transform.getWindowFn();
@SuppressWarnings("unchecked")
JavaDStream<WindowedValue<T>> dStream =
(JavaDStream<WindowedValue<T>>) sec.getStream(transform);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
deleted file mode 100644
index 394b2c5..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation.streaming;
-
-import org.apache.beam.runners.spark.SparkRunner;
-import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
-import org.apache.beam.runners.spark.translation.TransformTranslator;
-import org.apache.beam.sdk.runners.TransformTreeNode;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.Durations;
-
-
-/**
- * Pipeline {@link SparkRunner.Evaluator} to detect windowing.
- */
-public final class StreamingWindowPipelineDetector extends SparkRunner.Evaluator {
-
- // Currently, Spark streaming recommends batches no smaller then 500 msec
- private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500);
-
- private boolean windowing;
- private Duration batchDuration;
-
- public StreamingWindowPipelineDetector(SparkPipelineTranslator translator) {
- super(translator);
- }
-
- private static final TransformTranslator.FieldGetter WINDOW_FG =
- new TransformTranslator.FieldGetter(Window.Bound.class);
-
- // Use the smallest window (fixed or sliding) as Spark streaming's batch duration
- @Override
- protected <TransformT extends PTransform<? super PInput, POutput>> void
- doVisitTransform(TransformTreeNode node) {
- @SuppressWarnings("unchecked")
- TransformT transform = (TransformT) node.getTransform();
- @SuppressWarnings("unchecked")
- Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) transform.getClass();
- if (transformClass.isAssignableFrom(Window.Bound.class)) {
- WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform);
- if (windowFn instanceof FixedWindows) {
- setBatchDuration(((FixedWindows) windowFn).getSize());
- } else if (windowFn instanceof SlidingWindows) {
- if (((SlidingWindows) windowFn).getOffset().getMillis() > 0) {
- throw new UnsupportedOperationException("Spark does not support window offsets");
- }
- // Sliding window size might as well set the batch duration. Applying the transformation
- // will add the "slide"
- setBatchDuration(((SlidingWindows) windowFn).getSize());
- } else if (!(windowFn instanceof GlobalWindows)) {
- throw new IllegalStateException("Windowing function not supported: " + windowFn);
- }
- }
- }
-
- private void setBatchDuration(org.joda.time.Duration duration) {
- Long durationMillis = duration.getMillis();
- // validate window size
- if (durationMillis < SPARK_MIN_WINDOW.milliseconds()) {
- throw new IllegalArgumentException("Windowing of size " + durationMillis
- + "msec is not supported!");
- }
- // choose the smallest duration to be Spark's batch duration, larger ones will be handled
- // as window functions over the batched-stream
- if (!windowing || this.batchDuration.milliseconds() > durationMillis) {
- this.batchDuration = Durations.milliseconds(durationMillis);
- }
- windowing = true;
- }
-
- public boolean isWindowing() {
- return windowing;
- }
-
- public Duration getBatchDuration() {
- return batchDuration;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index deb1b6a..6f4d8fb 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -61,6 +61,7 @@ public class FlattenStreamingTest {
PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setStreaming(true);
+ // using the default 1000 msec interval
options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
Pipeline p = Pipeline.create(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 17044aa..2527152 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.spark.streaming.Durations;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -92,6 +93,7 @@ public class KafkaStreamingTest {
PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setStreaming(true);
+ options.setBatchIntervalMillis(Durations.seconds(1).milliseconds());
options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
Pipeline p = Pipeline.create(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 5627056..c761fae 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.streaming.Durations;
import org.joda.time.Duration;
import org.junit.Test;
@@ -58,6 +59,7 @@ public class SimpleStreamingWordCountTest implements Serializable {
PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setStreaming(true);
+ options.setBatchIntervalMillis(Durations.seconds(1).milliseconds());
options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval
Pipeline p = Pipeline.create(options);