You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by st...@apache.org on 2017/08/24 06:43:42 UTC
[1/2] beam git commit: [BEAM-2671] Implemented an InputDStream that
syncs up with the watermark values,
this should help with streaming tests in spark-runner.
Repository: beam
Updated Branches:
refs/heads/master c4517d04c -> 5181e619f
[BEAM-2671] Implemented an InputDStream that syncs up with the watermark values, this should help with streaming tests in spark-runner.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/15472b28
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/15472b28
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/15472b28
Branch: refs/heads/master
Commit: 15472b28c649381b90a0405d80012aa8523d13c5
Parents: c4517d0
Author: Stas Levin <st...@apache.org>
Authored: Sun Aug 20 16:48:57 2017 +0300
Committer: Stas Levin <st...@apache.org>
Committed: Thu Aug 24 09:42:12 2017 +0300
----------------------------------------------------------------------
.../apache/beam/runners/spark/SparkRunner.java | 5 +-
.../beam/runners/spark/io/CreateStream.java | 104 ++++---
.../SparkGroupAlsoByWindowViaWindowSet.java | 158 +++++++---
.../spark/stateful/SparkTimerInternals.java | 6 +
.../streaming/StreamingTransformTranslator.java | 71 +++--
.../streaming/WatermarkSyncedDStream.java | 149 +++++++++
.../spark/util/GlobalWatermarkHolder.java | 302 +++++++++++++------
.../runners/spark/SparkPipelineStateTest.java | 4 +-
.../translation/streaming/CreateStreamTest.java | 33 +-
.../spark/src/test/resources/log4j.properties | 11 +-
10 files changed, 633 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 595521f..98ca1be 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -40,7 +40,7 @@ import org.apache.beam.runners.spark.translation.TransformEvaluator;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
-import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarksListener;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarkAdvancingStreamingListener;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.io.Read;
@@ -171,7 +171,8 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
}
// register Watermarks listener to broadcast the advanced WMs.
- jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener()));
+ jssc.addStreamingListener(
+ new JavaStreamingListenerWrapper(new WatermarkAdvancingStreamingListener()));
// The reason we call initAccumulators here even though it is called in
// SparkRunnerStreamingContextFactory is because the factory is not called when resuming
http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index d485d25..4c73d95 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -41,34 +41,34 @@ import org.joda.time.Instant;
/**
* Create an input stream from Queue. For SparkRunner tests only.
*
- * <p>To properly compose a stream of micro-batches with their Watermarks, please keep in mind
- * that eventually there a two queues here - one for batches and another for Watermarks.
+ * <p>To properly compose a stream of micro-batches with their Watermarks, please keep in mind that
+ * eventually there a two queues here - one for batches and another for Watermarks.
*
- * <p>While both queues advance according to Spark's batch-interval, there is a slight difference
- * in how data is pushed into the stream compared to the advancement of Watermarks since Watermarks
+ * <p>While both queues advance according to Spark's batch-interval, there is a slight difference in
+ * how data is pushed into the stream compared to the advancement of Watermarks since Watermarks
* advance onBatchCompleted hook call so if you'd want to set the watermark advance for a specific
- * batch it should be called before that batch.
- * Also keep in mind that being a queue that is polled per batch interval, if there is a need to
- * "hold" the same Watermark without advancing it it should be stated explicitly or the Watermark
- * will advance as soon as it can (in the next batch completed hook).
+ * batch it should be called before that batch. Also keep in mind that being a queue that is polled
+ * per batch interval, if there is a need to "hold" the same Watermark without advancing it it
+ * should be stated explicitly or the Watermark will advance as soon as it can (in the next batch
+ * completed hook).
*
* <p>Example 1:
*
- * {@code
- * CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration)
- * .nextBatch(
- * TimestampedValue.of("foo", endOfGlobalWindow),
- * TimestampedValue.of("bar", endOfGlobalWindow))
- * .advanceNextBatchWatermarkToInfinity();
- * }
- * The first batch will see the default start-of-time WM of
- * {@link BoundedWindow#TIMESTAMP_MIN_VALUE} and any following batch will see
- * the end-of-time WM {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+ * <pre>{@code
+ * CreateStream.of(StringUtf8Coder.of(), batchDuration)
+ * .nextBatch(
+ * TimestampedValue.of("foo", endOfGlobalWindow),
+ * TimestampedValue.of("bar", endOfGlobalWindow))
+ * .advanceNextBatchWatermarkToInfinity();
+ * }</pre>
+ * The first batch will see the default start-of-time WM of {@link
+ * BoundedWindow#TIMESTAMP_MIN_VALUE} and any following batch will see the end-of-time WM {@link
+ * BoundedWindow#TIMESTAMP_MAX_VALUE}.
*
* <p>Example 2:
*
- * {@code
- * CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration)
+ * <pre>{@code
+ * CreateStream.of(VarIntCoder.of(), batchDuration)
* .nextBatch(
* TimestampedValue.of(1, instant))
* .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20)))
@@ -77,32 +77,59 @@ import org.joda.time.Instant;
* .nextBatch(
* TimestampedValue.of(3, instant))
* .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(30)))
- * }
- * The first batch will see the start-of-time WM and the second will see the advanced (+20 min.) WM.
- * The third WM will see the WM advanced to +30 min, because this is the next advancement of the WM
- * regardless of where it ws called in the construction of CreateStream.
- * //TODO: write a proper Builder enforcing all those rules mentioned.
- * @param <T> stream type.
+ * }</pre>
+ *
+ * <p>
+ * The first batch will see the start-of-time WM and the second will see the advanced (+20 min.)
+ * WM. The third WM will see the WM advanced to +30 min, because this is the next advancement
+ * of the WM regardless of where it ws called in the construction of CreateStream.
+ * </p>
+ *
+ * @param <T> The type of the element in this stream.
*/
+//TODO: write a proper Builder enforcing all those rules mentioned.
public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
- private final Duration batchInterval;
+ private final Duration batchDuration;
private final Queue<Iterable<TimestampedValue<T>>> batches = new LinkedList<>();
private final Deque<SparkWatermarks> times = new LinkedList<>();
private final Coder<T> coder;
private Instant initialSystemTime;
+ private final boolean forceWatermarkSync;
private Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; //for test purposes.
- private CreateStream(Duration batchInterval, Instant initialSystemTime, Coder<T> coder) {
- this.batchInterval = batchInterval;
+ private CreateStream(Duration batchDuration,
+ Instant initialSystemTime,
+ Coder<T> coder,
+ boolean forceWatermarkSync) {
+ this.batchDuration = batchDuration;
this.initialSystemTime = initialSystemTime;
this.coder = coder;
+ this.forceWatermarkSync = forceWatermarkSync;
}
- /** Set the batch interval for the stream. */
- public static <T> CreateStream<T> of(Coder<T> coder, Duration batchInterval) {
- return new CreateStream<>(batchInterval, new Instant(0), coder);
+ /**
+ * Creates a new Spark based stream intended for test purposes.
+ *
+ * @param batchDuration the batch duration (interval) to be used for creating this stream.
+ * @param coder the coder to be used for this stream.
+ * @param forceWatermarkSync whether this stream should be synced with the advancement of the
+ * watermark maintained by the
+ * {@link org.apache.beam.runners.spark.util.GlobalWatermarkHolder}.
+ */
+ public static <T> CreateStream<T> of(Coder<T> coder,
+ Duration batchDuration,
+ boolean forceWatermarkSync) {
+ return new CreateStream<>(batchDuration, new Instant(0), coder, forceWatermarkSync);
+ }
+
+ /**
+ * Creates a new Spark based stream without forced watermark sync, intended for test purposes.
+ * See also {@link CreateStream#of(Coder, Duration, boolean)}.
+ */
+ public static <T> CreateStream<T> of(Coder<T> coder, Duration batchDuration) {
+ return of(coder, batchDuration, true);
}
/**
@@ -112,8 +139,7 @@ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
@SafeVarargs
public final CreateStream<T> nextBatch(TimestampedValue<T>... batchElements) {
// validate timestamps if timestamped elements.
- for (TimestampedValue<T> element: batchElements) {
- TimestampedValue timestampedValue = (TimestampedValue) element;
+ for (final TimestampedValue<T> timestampedValue: batchElements) {
checkArgument(
timestampedValue.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
"Elements must have timestamps before %s. Got: %s",
@@ -177,7 +203,7 @@ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
// advance the system time.
Instant currentSynchronizedProcessingTime = times.peekLast() == null ? initialSystemTime
: times.peekLast().getSynchronizedProcessingTime();
- Instant nextSynchronizedProcessingTime = currentSynchronizedProcessingTime.plus(batchInterval);
+ Instant nextSynchronizedProcessingTime = currentSynchronizedProcessingTime.plus(batchDuration);
checkArgument(
nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime),
"Synchronized processing time must always advance.");
@@ -186,6 +212,10 @@ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
return this;
}
+ public long getBatchDuration() {
+ return batchDuration.getMillis();
+ }
+
/** Get the underlying queue representing the mock stream of micro-batches. */
public Queue<Iterable<TimestampedValue<T>>> getBatches() {
return batches;
@@ -199,6 +229,10 @@ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
return times;
}
+ public boolean isForceWatermarkSync() {
+ return forceWatermarkSync;
+ }
+
@Override
public PCollection<T> expand(PBegin input) {
return PCollection.createPrimitiveOutputInternal(
http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 1263618..52f7376 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -17,12 +17,15 @@
*/
package org.apache.beam.runners.spark.stateful;
+import com.google.common.base.Joiner;
import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
import com.google.common.collect.Table;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators;
import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
import org.apache.beam.runners.core.LateDataUtils;
@@ -46,6 +49,7 @@ import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -61,6 +65,7 @@ import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.dstream.DStream;
@@ -104,12 +109,13 @@ public class SparkGroupAlsoByWindowViaWindowSet {
public static <K, InputT, W extends BoundedWindow>
JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> groupAlsoByWindow(
- final JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream,
- final Coder<K> keyCoder,
- final Coder<WindowedValue<InputT>> wvCoder,
- final WindowingStrategy<?, W> windowingStrategy,
- final SerializablePipelineOptions options,
- final List<Integer> sourceIds) {
+ final JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream,
+ final Coder<K> keyCoder,
+ final Coder<WindowedValue<InputT>> wvCoder,
+ final WindowingStrategy<?, W> windowingStrategy,
+ final SerializablePipelineOptions options,
+ final List<Integer> sourceIds,
+ final String transformFullName) {
final long batchDurationMillis =
options.get().as(SparkPipelineOptions.class).getBatchIntervalMillis();
@@ -140,30 +146,44 @@ public class SparkGroupAlsoByWindowViaWindowSet {
DStream<Tuple2</*K*/ ByteArray, /*Itr<WV<I>>*/ byte[]>> pairDStream =
inputDStream
.transformToPair(
- new Function<
+ new org.apache.spark.api.java.function.Function2<
JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>>,
- JavaPairRDD<ByteArray, byte[]>>() {
+ Time, JavaPairRDD<ByteArray, byte[]>>() {
// we use mapPartitions with the RDD API because its the only available API
// that allows to preserve partitioning.
@Override
public JavaPairRDD<ByteArray, byte[]> call(
- JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> rdd)
+ JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> rdd,
+ final Time time)
throws Exception {
return rdd.mapPartitions(
- TranslationUtils.functionToFlatMapFunction(
- WindowingHelpers
- .<KV<K, Iterable<WindowedValue<InputT>>>>unwindowFunction()),
- true)
- .mapPartitionsToPair(
- TranslationUtils
- .<K, Iterable<WindowedValue<InputT>>>toPairFlatMapFunction(),
- true)
- // move to bytes representation and use coders for deserialization
- // because of checkpointing.
- .mapPartitionsToPair(
- TranslationUtils.pairFunctionToPairFlatMapFunction(
- CoderHelpers.toByteFunction(keyCoder, itrWvCoder)),
- true);
+ TranslationUtils.functionToFlatMapFunction(
+ WindowingHelpers
+ .<KV<K, Iterable<WindowedValue<InputT>>>>unwindowFunction()),
+ true)
+ .mapPartitionsToPair(
+ TranslationUtils
+ .<K, Iterable<WindowedValue<InputT>>>toPairFlatMapFunction(),
+ true)
+ .mapValues(new Function<Iterable<WindowedValue<InputT>>, KV<Long,
+ Iterable<WindowedValue<InputT>>>>() {
+
+ @Override
+ public KV<Long, Iterable<WindowedValue<InputT>>> call
+ (Iterable<WindowedValue<InputT>> values)
+ throws Exception {
+ // add the batch timestamp for visibility (e.g., debugging)
+ return KV.of(time.milliseconds(), values);
+ }
+ })
+ // move to bytes representation and use coders for deserialization
+ // because of checkpointing.
+ .mapPartitionsToPair(
+ TranslationUtils.pairFunctionToPairFlatMapFunction(
+ CoderHelpers.toByteFunction(keyCoder,
+ KvCoder.of(VarLongCoder.of(),
+ itrWvCoder))),
+ true);
}
})
.dstream();
@@ -219,9 +239,10 @@ public class SparkGroupAlsoByWindowViaWindowSet {
GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER));
AbstractIterator<
- Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>
+ Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, KV<Long(Time),Itr<I>>>>*/
+ List<byte[]>>>>
outIter = new AbstractIterator<Tuple2</*K*/ ByteArray,
- Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>() {
+ Tuple2<StateAndTimers, /*WV<KV<K, KV<Long(Time),Itr<I>>>>*/ List<byte[]>>>>() {
@Override
protected Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers,
/*WV<KV<K, Itr<I>>>*/ List<byte[]>>> computeNext() {
@@ -240,8 +261,11 @@ public class SparkGroupAlsoByWindowViaWindowSet {
List<byte[]>>> prevStateAndTimersOpt = next._3();
SparkStateInternals<K> stateInternals;
+ Map<Integer, GlobalWatermarkHolder.SparkWatermarks> watermarks =
+ GlobalWatermarkHolder.get(batchDurationMillis);
SparkTimerInternals timerInternals = SparkTimerInternals.forStreamFromSources(
- sourceIds, GlobalWatermarkHolder.get(batchDurationMillis));
+ sourceIds, watermarks);
+
// get state(internals) per key.
if (prevStateAndTimersOpt.isEmpty()) {
// no previous state.
@@ -271,20 +295,49 @@ public class SparkGroupAlsoByWindowViaWindowSet {
options.get());
outputHolder.clear(); // clear before potential use.
+
if (!seq.isEmpty()) {
// new input for key.
try {
- Iterable<WindowedValue<InputT>> elementsIterable =
- CoderHelpers.fromByteArray(seq.head(), itrWvCoder);
- Iterable<WindowedValue<InputT>> validElements =
- LateDataUtils
- .dropExpiredWindows(
- key,
- elementsIterable,
- timerInternals,
- windowingStrategy,
- droppedDueToLateness);
- reduceFnRunner.processElements(validElements);
+ final KV<Long, Iterable<WindowedValue<InputT>>> keyedElements =
+ CoderHelpers.fromByteArray(seq.head(),
+ KvCoder.of(VarLongCoder.of(), itrWvCoder));
+
+ final Long rddTimestamp = keyedElements.getKey();
+
+ LOG.debug(
+ transformFullName
+ + ": processing RDD with timestamp: {}, watermarks: {}",
+ rddTimestamp,
+ watermarks);
+
+ final Iterable<WindowedValue<InputT>> elements = keyedElements.getValue();
+
+ LOG.trace(transformFullName + ": input elements: {}", elements);
+
+ /*
+ Incoming expired windows are filtered based on
+ timerInternals.currentInputWatermarkTime() and the configured allowed
+ lateness. Note that this is done prior to calling
+ timerInternals.advanceWatermark so essentially the inputWatermark is
+ the highWatermark of the previous batch and the lowWatermark of the
+ current batch.
+ The highWatermark of the current batch will only affect filtering
+ as of the next batch.
+ */
+ final Iterable<WindowedValue<InputT>> nonExpiredElements =
+ Lists.newArrayList(LateDataUtils
+ .dropExpiredWindows(
+ key,
+ elements,
+ timerInternals,
+ windowingStrategy,
+ droppedDueToLateness));
+
+ LOG.trace(transformFullName + ": non expired input elements: {}",
+ elements);
+
+ reduceFnRunner.processElements(nonExpiredElements);
} catch (Exception e) {
throw new RuntimeException(
"Failed to process element with ReduceFnRunner", e);
@@ -295,9 +348,28 @@ public class SparkGroupAlsoByWindowViaWindowSet {
}
try {
// advance the watermark to HWM to fire by timers.
+ LOG.debug(transformFullName + ": timerInternals before advance are {}",
+ timerInternals.toString());
+
+ // store the highWatermark as the new inputWatermark to calculate triggers
timerInternals.advanceWatermark();
+
+ LOG.debug(transformFullName + ": timerInternals after advance are {}",
+ timerInternals.toString());
+
// call on timers that are ready.
- reduceFnRunner.onTimers(timerInternals.getTimersReadyToProcess());
+ final Collection<TimerInternals.TimerData> readyToProcess =
+ timerInternals.getTimersReadyToProcess();
+
+ LOG.debug(transformFullName + ": ready timers are {}", readyToProcess);
+
+ /*
+ Note that at this point, the watermark has already advanced since
+ timerInternals.advanceWatermark() has been called and the highWatermark
+ is now stored as the new inputWatermark, according to which triggers are
+ calculated.
+ */
+ reduceFnRunner.onTimers(readyToProcess);
} catch (Exception e) {
throw new RuntimeException(
"Failed to process ReduceFnRunner onTimer.", e);
@@ -306,10 +378,20 @@ public class SparkGroupAlsoByWindowViaWindowSet {
reduceFnRunner.persist();
// obtain output, if fired.
List<WindowedValue<KV<K, Iterable<InputT>>>> outputs = outputHolder.get();
+
if (!outputs.isEmpty() || !stateInternals.getState().isEmpty()) {
+ // empty outputs are filtered later using DStream filtering
StateAndTimers updated = new StateAndTimers(stateInternals.getState(),
SparkTimerInternals.serializeTimers(
timerInternals.getTimers(), timerDataCoder));
+
+ /*
+ Not something we want to happen in production, but is very helpful
+ when debugging - TRACE.
+ */
+ LOG.trace(transformFullName + ": output elements are {}",
+ Joiner.on(", ").join(outputs));
+
// persist Spark's state by outputting.
List<byte[]> serOutput = CoderHelpers.toByteArrays(outputs, wvKvIterCoder);
return new Tuple2<>(encodedKey, new Tuple2<>(updated, serOutput));
http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
index a68da55..c998328 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
@@ -188,4 +188,10 @@ public class SparkTimerInternals implements TimerInternals {
return CoderHelpers.fromByteArrays(serTimers, timerDataCoder);
}
+ @Override
+ public String toString() {
+ return "SparkTimerInternals{" + "highWatermark=" + highWatermark
+ + ", synchronizedProcessingTime=" + synchronizedProcessingTime + ", timers=" + timers
+ + ", inputWatermark=" + inputWatermark + '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 38d6119..4114803 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -82,6 +82,7 @@ import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
@@ -139,18 +140,41 @@ public final class StreamingTransformTranslator {
return new TransformEvaluator<CreateStream<T>>() {
@Override
public void evaluate(CreateStream<T> transform, EvaluationContext context) {
- Coder<T> coder = context.getOutput(transform).getCoder();
- JavaStreamingContext jssc = context.getStreamingContext();
- Queue<Iterable<TimestampedValue<T>>> values = transform.getBatches();
- WindowedValue.FullWindowedValueCoder<T> windowCoder =
+
+ final Queue<JavaRDD<WindowedValue<T>>> rddQueue =
+ buildRdds(
+ transform.getBatches(),
+ context.getStreamingContext(),
+ context.getOutput(transform).getCoder());
+
+ final JavaInputDStream<WindowedValue<T>> javaInputDStream =
+ buildInputStream(rddQueue, transform, context);
+
+ final UnboundedDataset<T> unboundedDataset =
+ new UnboundedDataset<>(
+ javaInputDStream, Collections.singletonList(javaInputDStream.inputDStream().id()));
+
+ // add pre-baked Watermarks for the pre-baked batches.
+ GlobalWatermarkHolder.addAll(
+ ImmutableMap.of(unboundedDataset.getStreamSources().get(0), transform.getTimes()));
+
+ context.putDataset(transform, unboundedDataset);
+ }
+
+ private Queue<JavaRDD<WindowedValue<T>>> buildRdds(
+ Queue<Iterable<TimestampedValue<T>>> batches, JavaStreamingContext jssc, Coder<T> coder) {
+
+ final WindowedValue.FullWindowedValueCoder<T> windowCoder =
WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
- // create the DStream from queue.
- Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
- for (Iterable<TimestampedValue<T>> tv : values) {
- Iterable<WindowedValue<T>> windowedValues =
+
+ final Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
+
+ for (final Iterable<TimestampedValue<T>> timestampedValues : batches) {
+ final Iterable<WindowedValue<T>> windowedValues =
Iterables.transform(
- tv,
+ timestampedValues,
new com.google.common.base.Function<TimestampedValue<T>, WindowedValue<T>>() {
+
@Override
public WindowedValue<T> apply(@Nonnull TimestampedValue<T> timestampedValue) {
return WindowedValue.of(
@@ -159,22 +183,28 @@ public final class StreamingTransformTranslator {
GlobalWindow.INSTANCE,
PaneInfo.NO_FIRING);
}
- });
- JavaRDD<WindowedValue<T>> rdd =
+ });
+
+ final JavaRDD<WindowedValue<T>> rdd =
jssc.sparkContext()
.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
.map(CoderHelpers.fromByteFunction(windowCoder));
+
rddQueue.offer(rdd);
}
+ return rddQueue;
+ }
- JavaInputDStream<WindowedValue<T>> inputDStream = jssc.queueStream(rddQueue, true);
- UnboundedDataset<T> unboundedDataset = new UnboundedDataset<T>(
- inputDStream, Collections.singletonList(inputDStream.inputDStream().id()));
- // add pre-baked Watermarks for the pre-baked batches.
- Queue<GlobalWatermarkHolder.SparkWatermarks> times = transform.getTimes();
- GlobalWatermarkHolder.addAll(
- ImmutableMap.of(unboundedDataset.getStreamSources().get(0), times));
- context.putDataset(transform, unboundedDataset);
+ private JavaInputDStream<WindowedValue<T>> buildInputStream(
+ Queue<JavaRDD<WindowedValue<T>>> rddQueue,
+ CreateStream<T> transform,
+ EvaluationContext context) {
+ return transform.isForceWatermarkSync()
+ ? new JavaInputDStream<>(
+ new WatermarkSyncedDStream<>(
+ rddQueue, transform.getBatchDuration(), context.getStreamingContext().ssc()),
+ JavaSparkContext$.MODULE$.<WindowedValue<T>>fakeClassTag())
+ : context.getStreamingContext().queueStream(rddQueue, true);
}
@Override
@@ -301,7 +331,8 @@ public final class StreamingTransformTranslator {
wvCoder,
windowingStrategy,
context.getSerializableOptions(),
- streamSources);
+ streamSources,
+ context.getCurrentTransform().getFullName());
context.putDataset(transform, new UnboundedDataset<>(outStream, streamSources));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java
new file mode 100644
index 0000000..e2a7b44
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming;
+
+import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext$;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.streaming.StreamingContext;
+import org.apache.spark.streaming.Time;
+import org.apache.spark.streaming.dstream.InputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link InputDStream} that keeps track of the {@link GlobalWatermarkHolder} status and only
+ * generates RDDs when they are in sync. If an RDD for time <code>CURRENT_BATCH_TIME</code> is
+ * requested, this input source will wait until the time of the batch which set the watermark has
+ * caught up and the following holds:
+ *
+ * {@code
+ * CURRENT_BATCH_TIME - TIME_OF_BATCH_WHICH_SET_THE_WATERMARK <= BATCH_DURATION
+ * }
+ *
+ * <p>In other words, this input source will stall and will NOT generate RDDs when the watermark is
+ * too far behind. Once the watermark has caught up with the current batch time, an RDD will be
+ * generated and emitted downstream.
+ *
+ * <p>NOTE: This input source is intended for test-use only, where one needs to be able to simulate
+ * non-trivial scenarios under a deterministic execution even at the cost incorporating test-only
+ * code. Unlike tests, in production <code>InputDStream</code>s will not be synchronous with the
+ * watermark, and the watermark is allowed to lag behind in a non-deterministic manner (since at
+ * this point in time we are reluctant to apply complex and possibly overly synchronous mechanisms
+ * at large scale).
+ *
+ * <p>See also <a href="https://issues.apache.org/jira/browse/BEAM-2671">BEAM-2671</a>, <a
+ * href="https://issues.apache.org/jira/browse/BEAM-2789">BEAM-2789</a>.
+ */
+class WatermarkSyncedDStream<T> extends InputDStream<WindowedValue<T>> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WatermarkSyncedDStream.class.getCanonicalName() + "#compute");
+
+ private static final int SLEEP_DURATION_MILLIS = 10;
+
+ private final Queue<JavaRDD<WindowedValue<T>>> rdds;
+ private final Long batchDuration;
+ private volatile boolean isFirst = true;
+
+ public WatermarkSyncedDStream(final Queue<JavaRDD<WindowedValue<T>>> rdds,
+ final Long batchDuration,
+ final StreamingContext ssc) {
+ super(ssc, JavaSparkContext$.MODULE$.<WindowedValue<T>>fakeClassTag());
+ this.rdds = rdds;
+ this.batchDuration = batchDuration;
+ }
+
+ private void awaitWatermarkSyncWith(final long batchTime) {
+ while (!isFirstBatch() && watermarkOutOfSync(batchTime)) {
+ Uninterruptibles.sleepUninterruptibly(SLEEP_DURATION_MILLIS, TimeUnit.MILLISECONDS);
+ }
+
+ checkState(
+ isFirstBatch() || watermarkIsOneBatchBehind(batchTime),
+ String.format(
+ "Watermark batch time:[%d] should be exactly one batch behind current batch time:[%d]",
+ GlobalWatermarkHolder.getLastWatermarkedBatchTime(), batchTime));
+ }
+
+ private boolean watermarkOutOfSync(final long batchTime) {
+ return batchTime - GlobalWatermarkHolder.getLastWatermarkedBatchTime() > batchDuration;
+ }
+
+ private boolean isFirstBatch() {
+ return isFirst;
+ }
+
+ private RDD<WindowedValue<T>> generateRdd() {
+ return rdds.size() > 0
+ ? rdds.poll().rdd()
+ : ssc().sparkContext().emptyRDD(JavaSparkContext$.MODULE$.<WindowedValue<T>>fakeClassTag());
+ }
+
+ private boolean watermarkIsOneBatchBehind(final long batchTime) {
+ return GlobalWatermarkHolder.getLastWatermarkedBatchTime() == batchTime - batchDuration;
+ }
+
+ @Override
+ public scala.Option<RDD<WindowedValue<T>>> compute(final Time validTime) {
+ final long batchTime = validTime.milliseconds();
+
+ LOG.trace("BEFORE waiting for watermark sync, "
+ + "LastWatermarkedBatchTime: {}, current batch time: {}",
+ GlobalWatermarkHolder.getLastWatermarkedBatchTime(),
+ batchTime);
+
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+
+ awaitWatermarkSyncWith(batchTime);
+
+ stopwatch.stop();
+
+ LOG.info("Waited {} millis for watermarks to sync up with the current batch ({})",
+ stopwatch.elapsed(TimeUnit.MILLISECONDS),
+ batchTime);
+
+ LOG.info("Watermarks are now: {}", GlobalWatermarkHolder.get(batchDuration));
+
+ LOG.trace("AFTER waiting for watermark sync, "
+ + "LastWatermarkedBatchTime: {}, current batch time: {}",
+ GlobalWatermarkHolder.getLastWatermarkedBatchTime(),
+ batchTime);
+
+ final RDD<WindowedValue<T>> rdd = generateRdd();
+ isFirst = false;
+ return scala.Option.apply(rdd);
+ }
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
index 2cb6f26..8ad3ca4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
@@ -41,9 +41,12 @@ import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.BlockResult;
import org.apache.spark.storage.BlockStore;
import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaBatchInfo;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Option;
/**
@@ -53,11 +56,18 @@ import scala.Option;
* and advances the watermarks according to the queue (first-in-first-out).
*/
public class GlobalWatermarkHolder {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GlobalWatermarkHolder.class);
+
private static final Map<Integer, Queue<SparkWatermarks>> sourceTimes = new HashMap<>();
private static final BlockId WATERMARKS_BLOCK_ID = BlockId.apply("broadcast_0WATERMARKS");
- private static volatile Map<Integer, SparkWatermarks> driverWatermarks = null;
+ // a local copy of the watermarks is stored on the driver node so that it can be
+ // accessed in test mode instead of fetching blocks remotely
+ private static volatile Map<Integer, SparkWatermarks> driverNodeWatermarks = null;
+
private static volatile LoadingCache<String, Map<Integer, SparkWatermarks>> watermarkCache = null;
+ private static volatile long lastWatermarkedBatchTime = 0;
public static void add(int sourceId, SparkWatermarks sparkWatermarks) {
Queue<SparkWatermarks> timesQueue = sourceTimes.get(sourceId);
@@ -79,18 +89,33 @@ public class GlobalWatermarkHolder {
}
}
+ public static long getLastWatermarkedBatchTime() {
+ return lastWatermarkedBatchTime;
+ }
+
/**
* Returns the {@link Broadcast} containing the {@link SparkWatermarks} mapped
* to their sources.
*/
@SuppressWarnings("unchecked")
public static Map<Integer, SparkWatermarks> get(Long cacheInterval) {
- if (driverWatermarks != null) {
- // if we are executing in local mode simply return the local values.
- return driverWatermarks;
+ if (canBypassRemoteWatermarkFetching()) {
+ /*
+ driverNodeWatermarks != null =>
+ => advance() was called
+ => WatermarkAdvancingStreamingListener#onBatchCompleted() was called
+ => we are currently running on the driver node
+ => we can get the watermarks from the driver local copy instead of fetching their block
+ remotely using block manger
+ /------------------------------------------------------------------------------------------/
+ In test mode, the system is running inside a single JVM, and thus both driver and executors
+ "canBypassWatermarkBlockFetching" by using the static driverNodeWatermarks copy.
+ This allows tests to avoid the asynchronous nature of using the BlockManager directly.
+ */
+ return getLocalWatermarkCopy();
} else {
if (watermarkCache == null) {
- initWatermarkCache(cacheInterval);
+ watermarkCache = createWatermarkCache(cacheInterval);
}
try {
return watermarkCache.get("SINGLETON");
@@ -100,103 +125,178 @@ public class GlobalWatermarkHolder {
}
}
- private static synchronized void initWatermarkCache(Long batchDuration) {
- if (watermarkCache == null) {
- watermarkCache =
- CacheBuilder.newBuilder()
- // expire watermarks every half batch duration to ensure they update in every batch.
- .expireAfterWrite(batchDuration / 2, TimeUnit.MILLISECONDS)
- .build(new WatermarksLoader());
- }
+ private static boolean canBypassRemoteWatermarkFetching() {
+ return driverNodeWatermarks != null;
+ }
+
+ private static synchronized LoadingCache<String, Map<Integer, SparkWatermarks>>
+ createWatermarkCache(final Long batchDuration) {
+ return CacheBuilder.newBuilder()
+ // expire watermarks every half batch duration to ensure they update in every batch.
+ .expireAfterWrite(batchDuration / 2, TimeUnit.MILLISECONDS)
+ .build(new WatermarksLoader());
}
/**
* Advances the watermarks to the next-in-line watermarks.
* SparkWatermarks are monotonically increasing.
*/
- @SuppressWarnings("unchecked")
- public static void advance() {
+ public static void advance(final String batchId) {
synchronized (GlobalWatermarkHolder.class) {
- BlockManager blockManager = SparkEnv.get().blockManager();
+ final BlockManager blockManager = SparkEnv.get().blockManager();
+ final Map<Integer, SparkWatermarks> newWatermarks = computeNewWatermarks(blockManager);
- if (sourceTimes.isEmpty()) {
- return;
+ if (!newWatermarks.isEmpty()) {
+ writeRemoteWatermarkBlock(newWatermarks, blockManager);
+ writeLocalWatermarkCopy(newWatermarks);
+ } else {
+ LOG.info("No new watermarks could be computed upon completion of batch: {}", batchId);
}
+ }
+ }
+
+ private static void writeLocalWatermarkCopy(Map<Integer, SparkWatermarks> newWatermarks) {
+ driverNodeWatermarks = newWatermarks;
+ }
- // update all sources' watermarks into the new broadcast.
- Map<Integer, SparkWatermarks> newValues = new HashMap<>();
-
- for (Map.Entry<Integer, Queue<SparkWatermarks>> en: sourceTimes.entrySet()) {
- if (en.getValue().isEmpty()) {
- continue;
- }
- Integer sourceId = en.getKey();
- Queue<SparkWatermarks> timesQueue = en.getValue();
-
- // current state, if exists.
- Instant currentLowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
- Instant currentHighWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
- Instant currentSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
- Option<BlockResult> currentOption = blockManager.getRemote(WATERMARKS_BLOCK_ID);
- Map<Integer, SparkWatermarks> current;
- if (currentOption.isDefined()) {
- current = (Map<Integer, SparkWatermarks>) currentOption.get().data().next();
- } else {
- current = Maps.newHashMap();
- blockManager.putSingle(
- WATERMARKS_BLOCK_ID,
- current,
- StorageLevel.MEMORY_ONLY(),
- true);
- }
-
- if (current.containsKey(sourceId)) {
- SparkWatermarks currentTimes = current.get(sourceId);
- currentLowWatermark = currentTimes.getLowWatermark();
- currentHighWatermark = currentTimes.getHighWatermark();
- currentSynchronizedProcessingTime = currentTimes.getSynchronizedProcessingTime();
- }
-
- SparkWatermarks next = timesQueue.poll();
- // advance watermarks monotonically.
- Instant nextLowWatermark = next.getLowWatermark().isAfter(currentLowWatermark)
- ? next.getLowWatermark() : currentLowWatermark;
- Instant nextHighWatermark = next.getHighWatermark().isAfter(currentHighWatermark)
- ? next.getHighWatermark() : currentHighWatermark;
- Instant nextSynchronizedProcessingTime = next.getSynchronizedProcessingTime();
- checkState(!nextLowWatermark.isAfter(nextHighWatermark),
- String.format(
- "Low watermark %s cannot be later then high watermark %s",
- nextLowWatermark, nextHighWatermark));
- checkState(nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime),
- "Synchronized processing time must advance.");
- newValues.put(
- sourceId,
- new SparkWatermarks(
- nextLowWatermark, nextHighWatermark, nextSynchronizedProcessingTime));
+ private static Map<Integer, SparkWatermarks> getLocalWatermarkCopy() {
+ return driverNodeWatermarks;
+ }
+
+ /** See {@link GlobalWatermarkHolder#advance(String)}. */
+ public static void advance() {
+ advance("N/A");
+ }
+
+ /**
+ * Computes the next watermark values per source id.
+ *
+ * @return The new watermarks values or null if no source has reported its progress.
+ */
+ private static Map<Integer, SparkWatermarks> computeNewWatermarks(BlockManager blockManager) {
+
+ if (sourceTimes.isEmpty()) {
+ return new HashMap<>();
+ }
+
+ // update all sources' watermarks into the new broadcast.
+ final Map<Integer, SparkWatermarks> newValues = new HashMap<>();
+
+ for (final Map.Entry<Integer, Queue<SparkWatermarks>> watermarkInfo: sourceTimes.entrySet()) {
+
+ if (watermarkInfo.getValue().isEmpty()) {
+ continue;
}
- // update the watermarks broadcast only if something has changed.
- if (!newValues.isEmpty()) {
- driverWatermarks = newValues;
- blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
- blockManager.putSingle(
- WATERMARKS_BLOCK_ID,
- newValues,
- StorageLevel.MEMORY_ONLY(),
- true);
+ final Integer sourceId = watermarkInfo.getKey();
+
+ // current state, if exists.
+ Instant currentLowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ Instant currentHighWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ Instant currentSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+ final Map<Integer, SparkWatermarks> currentWatermarks = initWatermarks(blockManager);
+
+ if (currentWatermarks.containsKey(sourceId)) {
+ final SparkWatermarks currentTimes = currentWatermarks.get(sourceId);
+ currentLowWatermark = currentTimes.getLowWatermark();
+ currentHighWatermark = currentTimes.getHighWatermark();
+ currentSynchronizedProcessingTime = currentTimes.getSynchronizedProcessingTime();
}
+
+ final Queue<SparkWatermarks> timesQueue = watermarkInfo.getValue();
+ final SparkWatermarks next = timesQueue.poll();
+
+ // advance watermarks monotonically.
+
+ final Instant nextLowWatermark =
+ next.getLowWatermark().isAfter(currentLowWatermark)
+ ? next.getLowWatermark()
+ : currentLowWatermark;
+
+ final Instant nextHighWatermark =
+ next.getHighWatermark().isAfter(currentHighWatermark)
+ ? next.getHighWatermark()
+ : currentHighWatermark;
+
+ final Instant nextSynchronizedProcessingTime = next.getSynchronizedProcessingTime();
+
+ checkState(
+ !nextLowWatermark.isAfter(nextHighWatermark),
+ String.format(
+ "Low watermark %s cannot be later then high watermark %s",
+ nextLowWatermark, nextHighWatermark));
+
+ checkState(
+ nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime),
+ "Synchronized processing time must advance.");
+
+ newValues.put(
+ sourceId,
+ new SparkWatermarks(
+ nextLowWatermark, nextHighWatermark, nextSynchronizedProcessingTime));
+ }
+
+ return newValues;
+ }
+
+ private static void writeRemoteWatermarkBlock(
+ final Map<Integer, SparkWatermarks> newWatermarks, final BlockManager blockManager) {
+ blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
+ // if an executor tries to fetch the watermark block here, it will fail to do so since
+ // the watermark block has just been removed, but the new copy has not been put yet.
+ blockManager.putSingle(WATERMARKS_BLOCK_ID, newWatermarks, StorageLevel.MEMORY_ONLY(), true);
+ // if an executor tries to fetch the watermark block here, it still may fail to do so since
+ // the put operation might not have been executed yet
+ // see also https://issues.apache.org/jira/browse/BEAM-2789
+ LOG.info("Put new watermark block: {}", newWatermarks);
+ }
+
+ private static Map<Integer, SparkWatermarks> initWatermarks(final BlockManager blockManager) {
+
+ final Map<Integer, SparkWatermarks> watermarks = fetchSparkWatermarks(blockManager);
+
+ if (watermarks == null) {
+ final HashMap<Integer, SparkWatermarks> empty = Maps.newHashMap();
+ blockManager.putSingle(
+ WATERMARKS_BLOCK_ID,
+ empty,
+ StorageLevel.MEMORY_ONLY(),
+ true);
+ return empty;
+ } else {
+ return watermarks;
+ }
+ }
+
+ private static Map<Integer, SparkWatermarks> fetchSparkWatermarks(BlockManager blockManager) {
+ final Option<BlockResult> blockResultOption = blockManager.getRemote(WATERMARKS_BLOCK_ID);
+ if (blockResultOption.isDefined()) {
+ return (Map<Integer, SparkWatermarks>) blockResultOption.get().data().next();
+ } else {
+ return null;
+ }
+ }
+
+ private static class WatermarksLoader extends CacheLoader<String, Map<Integer, SparkWatermarks>> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Map<Integer, SparkWatermarks> load(@Nonnull String key) throws Exception {
+ final BlockManager blockManager = SparkEnv.get().blockManager();
+ final Map<Integer, SparkWatermarks> watermarks = fetchSparkWatermarks(blockManager);
+ return watermarks != null ? watermarks : Maps.<Integer, SparkWatermarks>newHashMap();
}
}
@VisibleForTesting
public static synchronized void clear() {
sourceTimes.clear();
- driverWatermarks = null;
- SparkEnv sparkEnv = SparkEnv.get();
+ lastWatermarkedBatchTime = 0;
+ writeLocalWatermarkCopy(null);
+ final SparkEnv sparkEnv = SparkEnv.get();
if (sparkEnv != null) {
- BlockManager blockManager = sparkEnv.blockManager();
+ final BlockManager blockManager = sparkEnv.blockManager();
blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
}
}
@@ -242,25 +342,33 @@ public class GlobalWatermarkHolder {
}
/** Advance the WMs onBatchCompleted event. */
- public static class WatermarksListener extends JavaStreamingListener {
- @Override
- public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
- GlobalWatermarkHolder.advance();
+ public static class WatermarkAdvancingStreamingListener extends JavaStreamingListener {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WatermarkAdvancingStreamingListener.class);
+
+ private long timeOf(JavaBatchInfo info) {
+ return info.batchTime().milliseconds();
}
- }
- private static class WatermarksLoader extends CacheLoader<String, Map<Integer, SparkWatermarks>> {
+ private long laterOf(long t1, long t2) {
+ return Math.max(t1, t2);
+ }
- @SuppressWarnings("unchecked")
@Override
- public Map<Integer, SparkWatermarks> load(@Nonnull String key) throws Exception {
- Option<BlockResult> blockResultOption =
- SparkEnv.get().blockManager().getRemote(WATERMARKS_BLOCK_ID);
- if (blockResultOption.isDefined()) {
- return (Map<Integer, SparkWatermarks>) blockResultOption.get().data().next();
- } else {
- return Maps.newHashMap();
- }
+ public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
+
+ final long currentBatchTime = timeOf(batchCompleted.batchInfo());
+
+ GlobalWatermarkHolder.advance(Long.toString(currentBatchTime));
+
+ // make sure to update the last watermarked batch time AFTER the watermarks have already
+ // been updated (i.e., after the call to GlobalWatermarkHolder.advance(...))
+ // in addition, the watermark's block in the BlockManager is updated in an asynchronous manner
+ lastWatermarkedBatchTime =
+ laterOf(lastWatermarkedBatchTime, currentBatchTime);
+
+ LOG.info("Batch with timestamp: {} has completed, watermarks have been updated.",
+ lastWatermarkedBatchTime);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
index cfbad01..a5455da 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
@@ -73,8 +73,10 @@ public class SparkPipelineStateTest implements Serializable {
}
private PTransform<PBegin, PCollection<String>> getValues(final SparkPipelineOptions options) {
+ final boolean doNotSyncWithWatermark = false;
return options.isStreaming()
- ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1)).nextBatch("one", "two")
+ ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1), doNotSyncWithWatermark)
+ .nextBatch("one", "two")
: Create.of("one", "two");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
index 770e0c0..a432fda 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
@@ -163,16 +163,16 @@ public class CreateStreamTest implements Serializable {
public void testDiscardingMode() throws IOException {
CreateStream<String> source =
CreateStream.of(StringUtf8Coder.of(), batchDuration())
- .nextBatch(
- TimestampedValue.of("firstPane", new Instant(100)),
- TimestampedValue.of("alsoFirstPane", new Instant(200)))
- .advanceWatermarkForNextBatch(new Instant(1001L))
- .nextBatch(
- TimestampedValue.of("onTimePane", new Instant(500)))
- .advanceNextBatchWatermarkToInfinity()
- .nextBatch(
- TimestampedValue.of("finalLatePane", new Instant(750)),
- TimestampedValue.of("alsoFinalLatePane", new Instant(250)));
+ .nextBatch(
+ TimestampedValue.of("firstPane", new Instant(100)),
+ TimestampedValue.of("alsoFirstPane", new Instant(200)))
+ .advanceWatermarkForNextBatch(new Instant(1001L))
+ .nextBatch(
+ TimestampedValue.of("onTimePane", new Instant(500)))
+ .advanceNextBatchWatermarkToInfinity()
+ .nextBatch(
+ TimestampedValue.of("finalLatePane", new Instant(750)),
+ TimestampedValue.of("alsoFinalLatePane", new Instant(250)));
FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
Duration allowedLateness = Duration.millis(5000L);
@@ -212,12 +212,13 @@ public class CreateStreamTest implements Serializable {
Instant lateElementTimestamp = new Instant(-1_000_000);
CreateStream<String> source =
CreateStream.of(StringUtf8Coder.of(), batchDuration())
- .emptyBatch()
- .advanceWatermarkForNextBatch(new Instant(0))
- .nextBatch(
- TimestampedValue.of("late", lateElementTimestamp),
- TimestampedValue.of("onTime", new Instant(100)))
- .advanceNextBatchWatermarkToInfinity();
+ .emptyBatch()
+ .advanceWatermarkForNextBatch(new Instant(0))
+ .emptyBatch()
+ .nextBatch(
+ TimestampedValue.of("late", lateElementTimestamp),
+ TimestampedValue.of("onTime", new Instant(100)))
+ .advanceNextBatchWatermarkToInfinity();
FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
Duration allowedLateness = Duration.millis(5000L);
http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/resources/log4j.properties b/runners/spark/src/test/resources/log4j.properties
index 66e83c8..010c7df 100644
--- a/runners/spark/src/test/resources/log4j.properties
+++ b/runners/spark/src/test/resources/log4j.properties
@@ -24,7 +24,16 @@ log4j.rootLogger=ERROR, testlogger
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.testlogger.layout.ConversionPattern=%d [%t] %-5p %c %x - %m%n
# TestSparkRunner prints general information abut test pipelines execution.
log4j.logger.org.apache.beam.runners.spark.TestSparkRunner=INFO
+
+# in case of an emergency - uncomment (or better yet, stay calm and uncomment).
+#log4j.logger.org.apache.beam=TRACE
+#log4j.logger.org.apache.beam.sdk.Pipeline=INFO
+#log4j.logger.org.apache.beam.sdk.coders=INFO
+#log4j.logger.org.apache.beam.sdk.runners.TransformHierarchy=ERROR
+#log4j.logger.org.apache.beam.runners.spark.SparkRunner$Evaluator=ERROR
+#log4j.logger.org.apache.beam.runners.spark.translation.streaming.WatermarkSyncedDStream#compute=INFO
+#log4j.logger.org.apache.beam.runners.spark.translation.streaming.WatermarkSyncedDStream=ERROR
[2/2] beam git commit: This closes #3738
Posted by st...@apache.org.
This closes #3738
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5181e619
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5181e619
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5181e619
Branch: refs/heads/master
Commit: 5181e619f17e1f69fabe8d5bdfc7a3a6a2142cde
Parents: c4517d0 15472b2
Author: Stas Levin <st...@apache.org>
Authored: Thu Aug 24 09:43:16 2017 +0300
Committer: Stas Levin <st...@apache.org>
Committed: Thu Aug 24 09:43:16 2017 +0300
----------------------------------------------------------------------
.../apache/beam/runners/spark/SparkRunner.java | 5 +-
.../beam/runners/spark/io/CreateStream.java | 104 ++++---
.../SparkGroupAlsoByWindowViaWindowSet.java | 158 +++++++---
.../spark/stateful/SparkTimerInternals.java | 6 +
.../streaming/StreamingTransformTranslator.java | 71 +++--
.../streaming/WatermarkSyncedDStream.java | 149 +++++++++
.../spark/util/GlobalWatermarkHolder.java | 302 +++++++++++++------
.../runners/spark/SparkPipelineStateTest.java | 4 +-
.../translation/streaming/CreateStreamTest.java | 33 +-
.../spark/src/test/resources/log4j.properties | 11 +-
10 files changed, 633 insertions(+), 210 deletions(-)
----------------------------------------------------------------------