You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by st...@apache.org on 2017/08/24 06:43:42 UTC

[1/2] beam git commit: [BEAM-2671] Implemented an InputDStream that syncs up with the watermark values, this should help with streaming tests in spark-runner.

Repository: beam
Updated Branches:
  refs/heads/master c4517d04c -> 5181e619f


[BEAM-2671] Implemented an InputDStream that syncs up with the watermark values, this should help with streaming tests in spark-runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/15472b28
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/15472b28
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/15472b28

Branch: refs/heads/master
Commit: 15472b28c649381b90a0405d80012aa8523d13c5
Parents: c4517d0
Author: Stas Levin <st...@apache.org>
Authored: Sun Aug 20 16:48:57 2017 +0300
Committer: Stas Levin <st...@apache.org>
Committed: Thu Aug 24 09:42:12 2017 +0300

----------------------------------------------------------------------
 .../apache/beam/runners/spark/SparkRunner.java  |   5 +-
 .../beam/runners/spark/io/CreateStream.java     | 104 ++++---
 .../SparkGroupAlsoByWindowViaWindowSet.java     | 158 +++++++---
 .../spark/stateful/SparkTimerInternals.java     |   6 +
 .../streaming/StreamingTransformTranslator.java |  71 +++--
 .../streaming/WatermarkSyncedDStream.java       | 149 +++++++++
 .../spark/util/GlobalWatermarkHolder.java       | 302 +++++++++++++------
 .../runners/spark/SparkPipelineStateTest.java   |   4 +-
 .../translation/streaming/CreateStreamTest.java |  33 +-
 .../spark/src/test/resources/log4j.properties   |  11 +-
 10 files changed, 633 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 595521f..98ca1be 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -40,7 +40,7 @@ import org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
 import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
-import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarksListener;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarkAdvancingStreamingListener;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.io.Read;
@@ -171,7 +171,8 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
       }
 
       // register Watermarks listener to broadcast the advanced WMs.
-      jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener()));
+      jssc.addStreamingListener(
+          new JavaStreamingListenerWrapper(new WatermarkAdvancingStreamingListener()));
 
       // The reason we call initAccumulators here even though it is called in
       // SparkRunnerStreamingContextFactory is because the factory is not called when resuming

http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index d485d25..4c73d95 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -41,34 +41,34 @@ import org.joda.time.Instant;
 /**
  * Create an input stream from Queue. For SparkRunner tests only.
  *
- * <p>To properly compose a stream of micro-batches with their Watermarks, please keep in mind
- * that eventually there a two queues here - one for batches and another for Watermarks.
+ * <p>To properly compose a stream of micro-batches with their Watermarks, please keep in mind that
+ * eventually there a two queues here - one for batches and another for Watermarks.
  *
- * <p>While both queues advance according to Spark's batch-interval, there is a slight difference
- * in how data is pushed into the stream compared to the advancement of Watermarks since Watermarks
+ * <p>While both queues advance according to Spark's batch-interval, there is a slight difference in
+ * how data is pushed into the stream compared to the advancement of Watermarks since Watermarks
  * advance onBatchCompleted hook call so if you'd want to set the watermark advance for a specific
- * batch it should be called before that batch.
- * Also keep in mind that being a queue that is polled per batch interval, if there is a need to
- * "hold" the same Watermark without advancing it it should be stated explicitly or the Watermark
- * will advance as soon as it can (in the next batch completed hook).
+ * batch it should be called before that batch. Also keep in mind that being a queue that is polled
+ * per batch interval, if there is a need to "hold" the same Watermark without advancing it it
+ * should be stated explicitly or the Watermark will advance as soon as it can (in the next batch
+ * completed hook).
  *
  * <p>Example 1:
  *
- * {@code
- * CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration)
- *     .nextBatch(
- *         TimestampedValue.of("foo", endOfGlobalWindow),
- *         TimestampedValue.of("bar", endOfGlobalWindow))
- *     .advanceNextBatchWatermarkToInfinity();
- * }
- * The first batch will see the default start-of-time WM of
- * {@link BoundedWindow#TIMESTAMP_MIN_VALUE} and any following batch will see
- * the end-of-time WM {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+ * <pre>{@code
+ * CreateStream.of(StringUtf8Coder.of(), batchDuration)
+ *   .nextBatch(
+ *     TimestampedValue.of("foo", endOfGlobalWindow),
+ *     TimestampedValue.of("bar", endOfGlobalWindow))
+ *   .advanceNextBatchWatermarkToInfinity();
+ * }</pre>
+ * The first batch will see the default start-of-time WM of {@link
+ * BoundedWindow#TIMESTAMP_MIN_VALUE} and any following batch will see the end-of-time WM {@link
+ * BoundedWindow#TIMESTAMP_MAX_VALUE}.
  *
  * <p>Example 2:
  *
- * {@code
- * CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration)
+ * <pre>{@code
+ * CreateStream.of(VarIntCoder.of(), batchDuration)
  *     .nextBatch(
  *         TimestampedValue.of(1, instant))
  *     .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20)))
@@ -77,32 +77,59 @@ import org.joda.time.Instant;
  *     .nextBatch(
  *         TimestampedValue.of(3, instant))
  *     .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(30)))
- * }
- * The first batch will see the start-of-time WM and the second will see the advanced (+20 min.) WM.
- * The third WM will see the WM advanced to +30 min, because this is the next advancement of the WM
- * regardless of where it ws called in the construction of CreateStream.
- * //TODO: write a proper Builder enforcing all those rules mentioned.
- * @param <T> stream type.
+ * }</pre>
+ *
+ * <p>
+ *   The first batch will see the start-of-time WM and the second will see the advanced (+20 min.)
+ *   WM. The third WM will see the WM advanced to +30 min, because this is the next advancement
+ *   of the WM regardless of where it ws called in the construction of CreateStream.
+ * </p>
+ *
+ * @param <T> The type of the element in this stream.
  */
+//TODO: write a proper Builder enforcing all those rules mentioned.
 public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
 
-  private final Duration batchInterval;
+  private final Duration batchDuration;
   private final Queue<Iterable<TimestampedValue<T>>> batches = new LinkedList<>();
   private final Deque<SparkWatermarks> times = new LinkedList<>();
   private final Coder<T> coder;
   private Instant initialSystemTime;
+  private final boolean forceWatermarkSync;
 
   private Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; //for test purposes.
 
-  private CreateStream(Duration batchInterval, Instant initialSystemTime, Coder<T> coder) {
-    this.batchInterval = batchInterval;
+  private CreateStream(Duration batchDuration,
+                       Instant initialSystemTime,
+                       Coder<T> coder,
+                       boolean forceWatermarkSync) {
+    this.batchDuration = batchDuration;
     this.initialSystemTime = initialSystemTime;
     this.coder = coder;
+    this.forceWatermarkSync = forceWatermarkSync;
   }
 
-  /** Set the batch interval for the stream. */
-  public static <T> CreateStream<T> of(Coder<T> coder, Duration batchInterval) {
-    return new CreateStream<>(batchInterval, new Instant(0), coder);
+  /**
+   * Creates a new Spark based stream intended for test purposes.
+   *
+   * @param batchDuration the batch duration (interval) to be used for creating this stream.
+   * @param coder the coder to be used for this stream.
+   * @param forceWatermarkSync whether this stream should be synced with the advancement of the
+   *                           watermark maintained by the
+   *                           {@link org.apache.beam.runners.spark.util.GlobalWatermarkHolder}.
+   */
+  public static <T> CreateStream<T> of(Coder<T> coder,
+                                       Duration batchDuration,
+                                       boolean forceWatermarkSync) {
+    return new CreateStream<>(batchDuration, new Instant(0), coder, forceWatermarkSync);
+  }
+
+  /**
+   * Creates a new Spark based stream without forced watermark sync, intended for test purposes.
+   * See also {@link CreateStream#of(Coder, Duration, boolean)}.
+   */
+  public static <T> CreateStream<T> of(Coder<T> coder, Duration batchDuration) {
+    return of(coder, batchDuration, true);
   }
 
   /**
@@ -112,8 +139,7 @@ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
   @SafeVarargs
   public final CreateStream<T> nextBatch(TimestampedValue<T>... batchElements) {
     // validate timestamps if timestamped elements.
-    for (TimestampedValue<T> element: batchElements) {
-      TimestampedValue timestampedValue = (TimestampedValue) element;
+    for (final TimestampedValue<T> timestampedValue: batchElements) {
       checkArgument(
           timestampedValue.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
           "Elements must have timestamps before %s. Got: %s",
@@ -177,7 +203,7 @@ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
     // advance the system time.
     Instant currentSynchronizedProcessingTime = times.peekLast() == null ? initialSystemTime
         : times.peekLast().getSynchronizedProcessingTime();
-    Instant nextSynchronizedProcessingTime = currentSynchronizedProcessingTime.plus(batchInterval);
+    Instant nextSynchronizedProcessingTime = currentSynchronizedProcessingTime.plus(batchDuration);
     checkArgument(
         nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime),
         "Synchronized processing time must always advance.");
@@ -186,6 +212,10 @@ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
     return this;
   }
 
+  public long getBatchDuration() {
+    return batchDuration.getMillis();
+  }
+
   /** Get the underlying queue representing the mock stream of micro-batches. */
   public Queue<Iterable<TimestampedValue<T>>> getBatches() {
     return batches;
@@ -199,6 +229,10 @@ public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> {
     return times;
   }
 
+  public boolean isForceWatermarkSync() {
+    return forceWatermarkSync;
+  }
+
   @Override
   public PCollection<T> expand(PBegin input) {
     return PCollection.createPrimitiveOutputInternal(

http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 1263618..52f7376 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -17,12 +17,15 @@
  */
 package org.apache.beam.runners.spark.stateful;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Table;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.runners.core.LateDataUtils;
@@ -46,6 +49,7 @@ import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -61,6 +65,7 @@ import org.apache.spark.api.java.JavaSparkContext$;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Time;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.dstream.DStream;
@@ -104,12 +109,13 @@ public class SparkGroupAlsoByWindowViaWindowSet {
 
   public static <K, InputT, W extends BoundedWindow>
       JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> groupAlsoByWindow(
-          final JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream,
-          final Coder<K> keyCoder,
-          final Coder<WindowedValue<InputT>> wvCoder,
-          final WindowingStrategy<?, W> windowingStrategy,
-          final SerializablePipelineOptions options,
-          final List<Integer> sourceIds) {
+      final JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream,
+      final Coder<K> keyCoder,
+      final Coder<WindowedValue<InputT>> wvCoder,
+      final WindowingStrategy<?, W> windowingStrategy,
+      final SerializablePipelineOptions options,
+      final List<Integer> sourceIds,
+      final String transformFullName) {
 
     final long batchDurationMillis =
         options.get().as(SparkPipelineOptions.class).getBatchIntervalMillis();
@@ -140,30 +146,44 @@ public class SparkGroupAlsoByWindowViaWindowSet {
     DStream<Tuple2</*K*/ ByteArray, /*Itr<WV<I>>*/ byte[]>> pairDStream =
         inputDStream
             .transformToPair(
-                new Function<
+                new org.apache.spark.api.java.function.Function2<
                     JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>>,
-                    JavaPairRDD<ByteArray, byte[]>>() {
+                    Time, JavaPairRDD<ByteArray, byte[]>>() {
                   // we use mapPartitions with the RDD API because its the only available API
                   // that allows to preserve partitioning.
                   @Override
                   public JavaPairRDD<ByteArray, byte[]> call(
-                      JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> rdd)
+                      JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> rdd,
+                      final Time time)
                       throws Exception {
                     return rdd.mapPartitions(
-                            TranslationUtils.functionToFlatMapFunction(
-                                WindowingHelpers
-                                    .<KV<K, Iterable<WindowedValue<InputT>>>>unwindowFunction()),
-                            true)
-                        .mapPartitionsToPair(
-                            TranslationUtils
-                                .<K, Iterable<WindowedValue<InputT>>>toPairFlatMapFunction(),
-                            true)
-                        // move to bytes representation and use coders for deserialization
-                        // because of checkpointing.
-                        .mapPartitionsToPair(
-                            TranslationUtils.pairFunctionToPairFlatMapFunction(
-                                CoderHelpers.toByteFunction(keyCoder, itrWvCoder)),
-                            true);
+                        TranslationUtils.functionToFlatMapFunction(
+                            WindowingHelpers
+                                .<KV<K, Iterable<WindowedValue<InputT>>>>unwindowFunction()),
+                        true)
+                              .mapPartitionsToPair(
+                                  TranslationUtils
+                                      .<K, Iterable<WindowedValue<InputT>>>toPairFlatMapFunction(),
+                                  true)
+                              .mapValues(new Function<Iterable<WindowedValue<InputT>>, KV<Long,
+                                  Iterable<WindowedValue<InputT>>>>() {
+
+                                @Override
+                                public KV<Long, Iterable<WindowedValue<InputT>>> call
+                                    (Iterable<WindowedValue<InputT>> values)
+                                    throws Exception {
+                                  // add the batch timestamp for visibility (e.g., debugging)
+                                  return KV.of(time.milliseconds(), values);
+                                }
+                              })
+                              // move to bytes representation and use coders for deserialization
+                              // because of checkpointing.
+                              .mapPartitionsToPair(
+                                  TranslationUtils.pairFunctionToPairFlatMapFunction(
+                                      CoderHelpers.toByteFunction(keyCoder,
+                                                                  KvCoder.of(VarLongCoder.of(),
+                                                                             itrWvCoder))),
+                                  true);
                   }
                 })
             .dstream();
@@ -219,9 +239,10 @@ public class SparkGroupAlsoByWindowViaWindowSet {
                 GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER));
 
         AbstractIterator<
-            Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>
+            Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, KV<Long(Time),Itr<I>>>>*/
+                List<byte[]>>>>
                 outIter = new AbstractIterator<Tuple2</*K*/ ByteArray,
-                    Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>>() {
+                    Tuple2<StateAndTimers, /*WV<KV<K, KV<Long(Time),Itr<I>>>>*/ List<byte[]>>>>() {
                   @Override
                   protected Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers,
                       /*WV<KV<K, Itr<I>>>*/ List<byte[]>>> computeNext() {
@@ -240,8 +261,11 @@ public class SparkGroupAlsoByWindowViaWindowSet {
                           List<byte[]>>> prevStateAndTimersOpt = next._3();
 
                       SparkStateInternals<K> stateInternals;
+                      Map<Integer, GlobalWatermarkHolder.SparkWatermarks> watermarks =
+                          GlobalWatermarkHolder.get(batchDurationMillis);
                       SparkTimerInternals timerInternals = SparkTimerInternals.forStreamFromSources(
-                          sourceIds, GlobalWatermarkHolder.get(batchDurationMillis));
+                          sourceIds, watermarks);
+
                       // get state(internals) per key.
                       if (prevStateAndTimersOpt.isEmpty()) {
                         // no previous state.
@@ -271,20 +295,49 @@ public class SparkGroupAlsoByWindowViaWindowSet {
                               options.get());
 
                       outputHolder.clear(); // clear before potential use.
+
                       if (!seq.isEmpty()) {
                         // new input for key.
                         try {
-                          Iterable<WindowedValue<InputT>> elementsIterable =
-                              CoderHelpers.fromByteArray(seq.head(), itrWvCoder);
-                          Iterable<WindowedValue<InputT>> validElements =
-                              LateDataUtils
-                                  .dropExpiredWindows(
-                                      key,
-                                      elementsIterable,
-                                      timerInternals,
-                                      windowingStrategy,
-                                      droppedDueToLateness);
-                          reduceFnRunner.processElements(validElements);
+                          final KV<Long, Iterable<WindowedValue<InputT>>> keyedElements =
+                              CoderHelpers.fromByteArray(seq.head(),
+                                                         KvCoder.of(VarLongCoder.of(), itrWvCoder));
+
+                          final Long rddTimestamp = keyedElements.getKey();
+
+                          LOG.debug(
+                              transformFullName
+                                  + ": processing RDD with timestamp: {}, watermarks: {}",
+                              rddTimestamp,
+                              watermarks);
+
+                          final Iterable<WindowedValue<InputT>> elements = keyedElements.getValue();
+
+                          LOG.trace(transformFullName + ": input elements: {}", elements);
+
+                          /*
+                          Incoming expired windows are filtered based on
+                          timerInternals.currentInputWatermarkTime() and the configured allowed
+                          lateness. Note that this is done prior to calling
+                          timerInternals.advanceWatermark so essentially the inputWatermark is
+                          the highWatermark of the previous batch and the lowWatermark of the
+                          current batch.
+                          The highWatermark of the current batch will only affect filtering
+                          as of the next batch.
+                           */
+                          final Iterable<WindowedValue<InputT>> nonExpiredElements =
+                              Lists.newArrayList(LateDataUtils
+                                                     .dropExpiredWindows(
+                                                         key,
+                                                         elements,
+                                                         timerInternals,
+                                                         windowingStrategy,
+                                                         droppedDueToLateness));
+
+                          LOG.trace(transformFullName + ": non expired input elements: {}",
+                                    elements);
+
+                          reduceFnRunner.processElements(nonExpiredElements);
                         } catch (Exception e) {
                           throw new RuntimeException(
                               "Failed to process element with ReduceFnRunner", e);
@@ -295,9 +348,28 @@ public class SparkGroupAlsoByWindowViaWindowSet {
                       }
                       try {
                         // advance the watermark to HWM to fire by timers.
+                        LOG.debug(transformFullName + ": timerInternals before advance are {}",
+                                  timerInternals.toString());
+
+                        // store the highWatermark as the new inputWatermark to calculate triggers
                         timerInternals.advanceWatermark();
+
+                        LOG.debug(transformFullName + ": timerInternals after advance are {}",
+                                  timerInternals.toString());
+
                         // call on timers that are ready.
-                        reduceFnRunner.onTimers(timerInternals.getTimersReadyToProcess());
+                        final Collection<TimerInternals.TimerData> readyToProcess =
+                            timerInternals.getTimersReadyToProcess();
+
+                        LOG.debug(transformFullName + ": ready timers are {}", readyToProcess);
+
+                        /*
+                        Note that at this point, the watermark has already advanced since
+                        timerInternals.advanceWatermark() has been called and the highWatermark
+                        is now stored as the new inputWatermark, according to which triggers are
+                        calculated.
+                         */
+                        reduceFnRunner.onTimers(readyToProcess);
                       } catch (Exception e) {
                         throw new RuntimeException(
                             "Failed to process ReduceFnRunner onTimer.", e);
@@ -306,10 +378,20 @@ public class SparkGroupAlsoByWindowViaWindowSet {
                       reduceFnRunner.persist();
                       // obtain output, if fired.
                       List<WindowedValue<KV<K, Iterable<InputT>>>> outputs = outputHolder.get();
+
                       if (!outputs.isEmpty() || !stateInternals.getState().isEmpty()) {
+                        // empty outputs are filtered later using DStream filtering
                         StateAndTimers updated = new StateAndTimers(stateInternals.getState(),
                             SparkTimerInternals.serializeTimers(
                                 timerInternals.getTimers(), timerDataCoder));
+
+                        /*
+                        Not something we want to happen in production, but is very helpful
+                        when debugging - TRACE.
+                         */
+                        LOG.trace(transformFullName + ": output elements are {}",
+                                  Joiner.on(", ").join(outputs));
+
                         // persist Spark's state by outputting.
                         List<byte[]> serOutput = CoderHelpers.toByteArrays(outputs, wvKvIterCoder);
                         return new Tuple2<>(encodedKey, new Tuple2<>(updated, serOutput));

http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
index a68da55..c998328 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
@@ -188,4 +188,10 @@ public class SparkTimerInternals implements TimerInternals {
     return CoderHelpers.fromByteArrays(serTimers, timerDataCoder);
   }
 
+  @Override
+  public String toString() {
+    return "SparkTimerInternals{" + "highWatermark=" + highWatermark
+        + ", synchronizedProcessingTime=" + synchronizedProcessingTime + ", timers=" + timers
+        + ", inputWatermark=" + inputWatermark + '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 38d6119..4114803 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -82,6 +82,7 @@ import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.JavaSparkContext$;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaInputDStream;
@@ -139,18 +140,41 @@ public final class StreamingTransformTranslator {
     return new TransformEvaluator<CreateStream<T>>() {
       @Override
       public void evaluate(CreateStream<T> transform, EvaluationContext context) {
-        Coder<T> coder = context.getOutput(transform).getCoder();
-        JavaStreamingContext jssc = context.getStreamingContext();
-        Queue<Iterable<TimestampedValue<T>>> values = transform.getBatches();
-        WindowedValue.FullWindowedValueCoder<T> windowCoder =
+
+        final Queue<JavaRDD<WindowedValue<T>>> rddQueue =
+            buildRdds(
+                transform.getBatches(),
+                context.getStreamingContext(),
+                context.getOutput(transform).getCoder());
+
+        final JavaInputDStream<WindowedValue<T>> javaInputDStream =
+            buildInputStream(rddQueue, transform, context);
+
+        final UnboundedDataset<T> unboundedDataset =
+            new UnboundedDataset<>(
+                javaInputDStream, Collections.singletonList(javaInputDStream.inputDStream().id()));
+
+        // add pre-baked Watermarks for the pre-baked batches.
+        GlobalWatermarkHolder.addAll(
+            ImmutableMap.of(unboundedDataset.getStreamSources().get(0), transform.getTimes()));
+
+        context.putDataset(transform, unboundedDataset);
+      }
+
+      private Queue<JavaRDD<WindowedValue<T>>> buildRdds(
+          Queue<Iterable<TimestampedValue<T>>> batches, JavaStreamingContext jssc, Coder<T> coder) {
+
+        final WindowedValue.FullWindowedValueCoder<T> windowCoder =
             WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE);
-        // create the DStream from queue.
-        Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
-        for (Iterable<TimestampedValue<T>> tv : values) {
-          Iterable<WindowedValue<T>> windowedValues =
+
+        final Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>();
+
+        for (final Iterable<TimestampedValue<T>> timestampedValues : batches) {
+          final Iterable<WindowedValue<T>> windowedValues =
               Iterables.transform(
-                  tv,
+                  timestampedValues,
                   new com.google.common.base.Function<TimestampedValue<T>, WindowedValue<T>>() {
+
                     @Override
                     public WindowedValue<T> apply(@Nonnull TimestampedValue<T> timestampedValue) {
                       return WindowedValue.of(
@@ -159,22 +183,28 @@ public final class StreamingTransformTranslator {
                           GlobalWindow.INSTANCE,
                           PaneInfo.NO_FIRING);
                     }
-              });
-          JavaRDD<WindowedValue<T>> rdd =
+                  });
+
+          final JavaRDD<WindowedValue<T>> rdd =
               jssc.sparkContext()
                   .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
                   .map(CoderHelpers.fromByteFunction(windowCoder));
+
           rddQueue.offer(rdd);
         }
+        return rddQueue;
+      }
 
-        JavaInputDStream<WindowedValue<T>> inputDStream = jssc.queueStream(rddQueue, true);
-        UnboundedDataset<T> unboundedDataset = new UnboundedDataset<T>(
-            inputDStream, Collections.singletonList(inputDStream.inputDStream().id()));
-        // add pre-baked Watermarks for the pre-baked batches.
-        Queue<GlobalWatermarkHolder.SparkWatermarks> times = transform.getTimes();
-        GlobalWatermarkHolder.addAll(
-            ImmutableMap.of(unboundedDataset.getStreamSources().get(0), times));
-        context.putDataset(transform, unboundedDataset);
+      private JavaInputDStream<WindowedValue<T>> buildInputStream(
+          Queue<JavaRDD<WindowedValue<T>>> rddQueue,
+          CreateStream<T> transform,
+          EvaluationContext context) {
+        return transform.isForceWatermarkSync()
+            ? new JavaInputDStream<>(
+                new WatermarkSyncedDStream<>(
+                    rddQueue, transform.getBatchDuration(), context.getStreamingContext().ssc()),
+                JavaSparkContext$.MODULE$.<WindowedValue<T>>fakeClassTag())
+            : context.getStreamingContext().queueStream(rddQueue, true);
       }
 
       @Override
@@ -301,7 +331,8 @@ public final class StreamingTransformTranslator {
                 wvCoder,
                 windowingStrategy,
                 context.getSerializableOptions(),
-                streamSources);
+                streamSources,
+                context.getCurrentTransform().getFullName());
 
         context.putDataset(transform, new UnboundedDataset<>(outStream, streamSources));
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java
new file mode 100644
index 0000000..e2a7b44
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/WatermarkSyncedDStream.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming;
+
+import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext$;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.streaming.StreamingContext;
+import org.apache.spark.streaming.Time;
+import org.apache.spark.streaming.dstream.InputDStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link InputDStream} that keeps track of the {@link GlobalWatermarkHolder} status and only
+ * generates RDDs when they are in sync. If an RDD for time <code>CURRENT_BATCH_TIME</code> is
+ * requested, this input source will wait until the time of the batch which set the watermark has
+ * caught up and the following holds:
+ *
+ * {@code
+ * CURRENT_BATCH_TIME - TIME_OF_BATCH_WHICH_SET_THE_WATERMARK <= BATCH_DURATION
+ * }
+ *
+ * <p>In other words, this input source will stall and will NOT generate RDDs when the watermark is
+ * too far behind. Once the watermark has caught up with the current batch time, an RDD will be
+ * generated and emitted downstream.
+ *
+ * <p>NOTE: This input source is intended for test-use only, where one needs to be able to simulate
+ * non-trivial scenarios under a deterministic execution even at the cost incorporating test-only
+ * code. Unlike tests, in production <code>InputDStream</code>s will not be synchronous with the
+ * watermark, and the watermark is allowed to lag behind in a non-deterministic manner (since at
+ * this point in time we are reluctant to apply complex and possibly overly synchronous mechanisms
+ * at large scale).
+ *
+ * <p>See also <a href="https://issues.apache.org/jira/browse/BEAM-2671">BEAM-2671</a>, <a
+ * href="https://issues.apache.org/jira/browse/BEAM-2789">BEAM-2789</a>.
+ */
+class WatermarkSyncedDStream<T> extends InputDStream<WindowedValue<T>> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(WatermarkSyncedDStream.class.getCanonicalName() + "#compute");
+
+  private static final int SLEEP_DURATION_MILLIS = 10;
+
+  private final Queue<JavaRDD<WindowedValue<T>>> rdds;
+  private final Long batchDuration;
+  private volatile boolean isFirst = true;
+
+  public WatermarkSyncedDStream(final Queue<JavaRDD<WindowedValue<T>>> rdds,
+                                final Long batchDuration,
+                                final StreamingContext ssc) {
+    super(ssc, JavaSparkContext$.MODULE$.<WindowedValue<T>>fakeClassTag());
+    this.rdds = rdds;
+    this.batchDuration = batchDuration;
+  }
+
+  private void awaitWatermarkSyncWith(final long batchTime) {
+    while (!isFirstBatch() && watermarkOutOfSync(batchTime)) {
+      Uninterruptibles.sleepUninterruptibly(SLEEP_DURATION_MILLIS, TimeUnit.MILLISECONDS);
+    }
+
+    checkState(
+        isFirstBatch() || watermarkIsOneBatchBehind(batchTime),
+        String.format(
+            "Watermark batch time:[%d] should be exactly one batch behind current batch time:[%d]",
+            GlobalWatermarkHolder.getLastWatermarkedBatchTime(), batchTime));
+  }
+
+  private boolean watermarkOutOfSync(final long batchTime) {
+    return batchTime - GlobalWatermarkHolder.getLastWatermarkedBatchTime() > batchDuration;
+  }
+
+  private boolean isFirstBatch() {
+    return isFirst;
+  }
+
+  private RDD<WindowedValue<T>> generateRdd() {
+    return rdds.size() > 0
+        ? rdds.poll().rdd()
+        : ssc().sparkContext().emptyRDD(JavaSparkContext$.MODULE$.<WindowedValue<T>>fakeClassTag());
+  }
+
+  private boolean watermarkIsOneBatchBehind(final long batchTime) {
+    return GlobalWatermarkHolder.getLastWatermarkedBatchTime() == batchTime - batchDuration;
+  }
+
+  @Override
+  public scala.Option<RDD<WindowedValue<T>>> compute(final Time validTime) {
+    final long batchTime = validTime.milliseconds();
+
+    LOG.trace("BEFORE waiting for watermark sync, "
+                  + "LastWatermarkedBatchTime: {}, current batch time: {}",
+              GlobalWatermarkHolder.getLastWatermarkedBatchTime(),
+              batchTime);
+
+    final Stopwatch stopwatch = Stopwatch.createStarted();
+
+    awaitWatermarkSyncWith(batchTime);
+
+    stopwatch.stop();
+
+    LOG.info("Waited {} millis for watermarks to sync up with the current batch ({})",
+             stopwatch.elapsed(TimeUnit.MILLISECONDS),
+             batchTime);
+
+    LOG.info("Watermarks are now: {}", GlobalWatermarkHolder.get(batchDuration));
+
+    LOG.trace("AFTER waiting for watermark sync, "
+                  + "LastWatermarkedBatchTime: {}, current batch time: {}",
+              GlobalWatermarkHolder.getLastWatermarkedBatchTime(),
+              batchTime);
+
+    final RDD<WindowedValue<T>> rdd = generateRdd();
+    isFirst = false;
+    return scala.Option.apply(rdd);
+  }
+
+  @Override
+  public void start() {
+
+  }
+
+  @Override
+  public void stop() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
index 2cb6f26..8ad3ca4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java
@@ -41,9 +41,12 @@ import org.apache.spark.storage.BlockManager;
 import org.apache.spark.storage.BlockResult;
 import org.apache.spark.storage.BlockStore;
 import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaBatchInfo;
 import org.apache.spark.streaming.api.java.JavaStreamingListener;
 import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Option;
 
 /**
@@ -53,11 +56,18 @@ import scala.Option;
  * and advances the watermarks according to the queue (first-in-first-out).
  */
 public class GlobalWatermarkHolder {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GlobalWatermarkHolder.class);
+
   private static final Map<Integer, Queue<SparkWatermarks>> sourceTimes = new HashMap<>();
   private static final BlockId WATERMARKS_BLOCK_ID = BlockId.apply("broadcast_0WATERMARKS");
 
-  private static volatile Map<Integer, SparkWatermarks> driverWatermarks = null;
+  // a local copy of the watermarks is stored on the driver node so that it can be
+  // accessed in test mode instead of fetching blocks remotely
+  private static volatile Map<Integer, SparkWatermarks> driverNodeWatermarks = null;
+
   private static volatile LoadingCache<String, Map<Integer, SparkWatermarks>> watermarkCache = null;
+  private static volatile long lastWatermarkedBatchTime = 0;
 
   public static void add(int sourceId, SparkWatermarks sparkWatermarks) {
     Queue<SparkWatermarks> timesQueue = sourceTimes.get(sourceId);
@@ -79,18 +89,33 @@ public class GlobalWatermarkHolder {
     }
   }
 
+  public static long getLastWatermarkedBatchTime() {
+    return lastWatermarkedBatchTime;
+  }
+
   /**
    * Returns the {@link Broadcast} containing the {@link SparkWatermarks} mapped
    * to their sources.
    */
   @SuppressWarnings("unchecked")
   public static Map<Integer, SparkWatermarks> get(Long cacheInterval) {
-    if (driverWatermarks != null) {
-      // if we are executing in local mode simply return the local values.
-      return driverWatermarks;
+    if (canBypassRemoteWatermarkFetching()) {
+      /*
+      driverNodeWatermarks != null =>
+      => advance() was called
+      => WatermarkAdvancingStreamingListener#onBatchCompleted() was called
+      => we are currently running on the driver node
+      => we can get the watermarks from the driver local copy instead of fetching their block
+      remotely using block manger
+      /------------------------------------------------------------------------------------------/
+      In test mode, the system is running inside a single JVM, and thus both driver and executors
+      "canBypassWatermarkBlockFetching" by using the static driverNodeWatermarks copy.
+      This allows tests to avoid the asynchronous nature of using the BlockManager directly.
+      */
+      return getLocalWatermarkCopy();
     } else {
       if (watermarkCache == null) {
-        initWatermarkCache(cacheInterval);
+        watermarkCache = createWatermarkCache(cacheInterval);
       }
       try {
         return watermarkCache.get("SINGLETON");
@@ -100,103 +125,178 @@ public class GlobalWatermarkHolder {
     }
   }
 
-  private static synchronized void initWatermarkCache(Long batchDuration) {
-    if (watermarkCache == null) {
-      watermarkCache =
-          CacheBuilder.newBuilder()
-              // expire watermarks every half batch duration to ensure they update in every batch.
-              .expireAfterWrite(batchDuration / 2, TimeUnit.MILLISECONDS)
-              .build(new WatermarksLoader());
-    }
+  private static boolean canBypassRemoteWatermarkFetching() {
+    return driverNodeWatermarks != null;
+  }
+
+  private static synchronized LoadingCache<String, Map<Integer, SparkWatermarks>>
+      createWatermarkCache(final Long batchDuration) {
+    return CacheBuilder.newBuilder()
+        // expire watermarks every half batch duration to ensure they update in every batch.
+        .expireAfterWrite(batchDuration / 2, TimeUnit.MILLISECONDS)
+        .build(new WatermarksLoader());
   }
 
   /**
    * Advances the watermarks to the next-in-line watermarks.
    * SparkWatermarks are monotonically increasing.
    */
-  @SuppressWarnings("unchecked")
-  public static void advance() {
+  public static void advance(final String batchId) {
     synchronized (GlobalWatermarkHolder.class) {
-      BlockManager blockManager = SparkEnv.get().blockManager();
+      final BlockManager blockManager = SparkEnv.get().blockManager();
+      final Map<Integer, SparkWatermarks> newWatermarks = computeNewWatermarks(blockManager);
 
-      if (sourceTimes.isEmpty()) {
-        return;
+      if (!newWatermarks.isEmpty()) {
+        writeRemoteWatermarkBlock(newWatermarks, blockManager);
+        writeLocalWatermarkCopy(newWatermarks);
+      } else {
+        LOG.info("No new watermarks could be computed upon completion of batch: {}", batchId);
       }
+    }
+  }
+
+  private static void writeLocalWatermarkCopy(Map<Integer, SparkWatermarks> newWatermarks) {
+    driverNodeWatermarks = newWatermarks;
+  }
 
-      // update all sources' watermarks into the new broadcast.
-      Map<Integer, SparkWatermarks> newValues = new HashMap<>();
-
-      for (Map.Entry<Integer, Queue<SparkWatermarks>> en: sourceTimes.entrySet()) {
-        if (en.getValue().isEmpty()) {
-          continue;
-        }
-        Integer sourceId = en.getKey();
-        Queue<SparkWatermarks> timesQueue = en.getValue();
-
-        // current state, if exists.
-        Instant currentLowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
-        Instant currentHighWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
-        Instant currentSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-        Option<BlockResult> currentOption = blockManager.getRemote(WATERMARKS_BLOCK_ID);
-        Map<Integer, SparkWatermarks> current;
-        if (currentOption.isDefined()) {
-          current = (Map<Integer, SparkWatermarks>) currentOption.get().data().next();
-        } else {
-          current = Maps.newHashMap();
-          blockManager.putSingle(
-              WATERMARKS_BLOCK_ID,
-              current,
-              StorageLevel.MEMORY_ONLY(),
-              true);
-        }
-
-        if (current.containsKey(sourceId)) {
-          SparkWatermarks currentTimes = current.get(sourceId);
-          currentLowWatermark = currentTimes.getLowWatermark();
-          currentHighWatermark = currentTimes.getHighWatermark();
-          currentSynchronizedProcessingTime = currentTimes.getSynchronizedProcessingTime();
-        }
-
-        SparkWatermarks next = timesQueue.poll();
-        // advance watermarks monotonically.
-        Instant nextLowWatermark = next.getLowWatermark().isAfter(currentLowWatermark)
-            ? next.getLowWatermark() : currentLowWatermark;
-        Instant nextHighWatermark = next.getHighWatermark().isAfter(currentHighWatermark)
-            ? next.getHighWatermark() : currentHighWatermark;
-        Instant nextSynchronizedProcessingTime = next.getSynchronizedProcessingTime();
-        checkState(!nextLowWatermark.isAfter(nextHighWatermark),
-            String.format(
-                "Low watermark %s cannot be later then high watermark %s",
-                nextLowWatermark, nextHighWatermark));
-        checkState(nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime),
-            "Synchronized processing time must advance.");
-        newValues.put(
-            sourceId,
-            new SparkWatermarks(
-                nextLowWatermark, nextHighWatermark, nextSynchronizedProcessingTime));
+  private static Map<Integer, SparkWatermarks> getLocalWatermarkCopy() {
+    return driverNodeWatermarks;
+  }
+
+  /** See {@link GlobalWatermarkHolder#advance(String)}. */
+  public static void advance() {
+    advance("N/A");
+  }
+
+  /**
+   * Computes the next watermark values per source id.
+   *
+   * @return The new watermarks values or null if no source has reported its progress.
+   */
+  private static Map<Integer, SparkWatermarks> computeNewWatermarks(BlockManager blockManager) {
+
+    if (sourceTimes.isEmpty()) {
+      return new HashMap<>();
+    }
+
+    // update all sources' watermarks into the new broadcast.
+    final Map<Integer, SparkWatermarks> newValues = new HashMap<>();
+
+    for (final Map.Entry<Integer, Queue<SparkWatermarks>> watermarkInfo: sourceTimes.entrySet()) {
+
+      if (watermarkInfo.getValue().isEmpty()) {
+        continue;
       }
 
-      // update the watermarks broadcast only if something has changed.
-      if (!newValues.isEmpty()) {
-        driverWatermarks = newValues;
-        blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
-        blockManager.putSingle(
-            WATERMARKS_BLOCK_ID,
-            newValues,
-            StorageLevel.MEMORY_ONLY(),
-            true);
+      final Integer sourceId = watermarkInfo.getKey();
+
+      // current state, if exists.
+      Instant currentLowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      Instant currentHighWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      Instant currentSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+      final Map<Integer, SparkWatermarks> currentWatermarks = initWatermarks(blockManager);
+
+      if (currentWatermarks.containsKey(sourceId)) {
+        final SparkWatermarks currentTimes = currentWatermarks.get(sourceId);
+        currentLowWatermark = currentTimes.getLowWatermark();
+        currentHighWatermark = currentTimes.getHighWatermark();
+        currentSynchronizedProcessingTime = currentTimes.getSynchronizedProcessingTime();
       }
+
+      final Queue<SparkWatermarks> timesQueue = watermarkInfo.getValue();
+      final SparkWatermarks next = timesQueue.poll();
+
+      // advance watermarks monotonically.
+
+      final Instant nextLowWatermark =
+          next.getLowWatermark().isAfter(currentLowWatermark)
+              ? next.getLowWatermark()
+              : currentLowWatermark;
+
+      final Instant nextHighWatermark =
+          next.getHighWatermark().isAfter(currentHighWatermark)
+              ? next.getHighWatermark()
+              : currentHighWatermark;
+
+      final Instant nextSynchronizedProcessingTime = next.getSynchronizedProcessingTime();
+
+      checkState(
+          !nextLowWatermark.isAfter(nextHighWatermark),
+          String.format(
+              "Low watermark %s cannot be later then high watermark %s",
+              nextLowWatermark, nextHighWatermark));
+
+      checkState(
+          nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime),
+          "Synchronized processing time must advance.");
+
+      newValues.put(
+          sourceId,
+          new SparkWatermarks(
+              nextLowWatermark, nextHighWatermark, nextSynchronizedProcessingTime));
+    }
+
+    return newValues;
+  }
+
+  private static void writeRemoteWatermarkBlock(
+      final Map<Integer, SparkWatermarks> newWatermarks, final BlockManager blockManager) {
+    blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
+    // if an executor tries to fetch the watermark block here, it will fail to do so since
+    // the watermark block has just been removed, but the new copy has not been put yet.
+    blockManager.putSingle(WATERMARKS_BLOCK_ID, newWatermarks, StorageLevel.MEMORY_ONLY(), true);
+    // if an executor tries to fetch the watermark block here, it still may fail to do so since
+    // the put operation might not have been executed yet
+    // see also https://issues.apache.org/jira/browse/BEAM-2789
+    LOG.info("Put new watermark block: {}", newWatermarks);
+  }
+
+  private static Map<Integer, SparkWatermarks> initWatermarks(final BlockManager blockManager) {
+
+    final Map<Integer, SparkWatermarks> watermarks = fetchSparkWatermarks(blockManager);
+
+    if (watermarks == null) {
+      final HashMap<Integer, SparkWatermarks> empty = Maps.newHashMap();
+      blockManager.putSingle(
+          WATERMARKS_BLOCK_ID,
+          empty,
+          StorageLevel.MEMORY_ONLY(),
+          true);
+      return empty;
+    } else {
+      return watermarks;
+    }
+  }
+
+  private static Map<Integer, SparkWatermarks> fetchSparkWatermarks(BlockManager blockManager) {
+    final Option<BlockResult> blockResultOption = blockManager.getRemote(WATERMARKS_BLOCK_ID);
+    if (blockResultOption.isDefined()) {
+      return (Map<Integer, SparkWatermarks>) blockResultOption.get().data().next();
+    } else {
+      return null;
+    }
+  }
+
+  private static class WatermarksLoader extends CacheLoader<String, Map<Integer, SparkWatermarks>> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Map<Integer, SparkWatermarks> load(@Nonnull String key) throws Exception {
+      final BlockManager blockManager = SparkEnv.get().blockManager();
+      final Map<Integer, SparkWatermarks> watermarks = fetchSparkWatermarks(blockManager);
+      return watermarks != null ? watermarks : Maps.<Integer, SparkWatermarks>newHashMap();
     }
   }
 
   @VisibleForTesting
   public static synchronized void clear() {
     sourceTimes.clear();
-    driverWatermarks = null;
-    SparkEnv sparkEnv = SparkEnv.get();
+    lastWatermarkedBatchTime = 0;
+    writeLocalWatermarkCopy(null);
+    final SparkEnv sparkEnv = SparkEnv.get();
     if (sparkEnv != null) {
-      BlockManager blockManager = sparkEnv.blockManager();
+      final BlockManager blockManager = sparkEnv.blockManager();
       blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
     }
   }
@@ -242,25 +342,33 @@ public class GlobalWatermarkHolder {
   }
 
   /** Advance the WMs onBatchCompleted event. */
-  public static class WatermarksListener extends JavaStreamingListener {
-    @Override
-    public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
-      GlobalWatermarkHolder.advance();
+  public static class WatermarkAdvancingStreamingListener extends JavaStreamingListener {
+    private static final Logger LOG =
+        LoggerFactory.getLogger(WatermarkAdvancingStreamingListener.class);
+
+    private long timeOf(JavaBatchInfo info) {
+      return info.batchTime().milliseconds();
     }
-  }
 
-  private static class WatermarksLoader extends CacheLoader<String, Map<Integer, SparkWatermarks>> {
+    private long laterOf(long t1, long t2) {
+      return Math.max(t1, t2);
+    }
 
-    @SuppressWarnings("unchecked")
     @Override
-    public Map<Integer, SparkWatermarks> load(@Nonnull String key) throws Exception {
-      Option<BlockResult> blockResultOption =
-          SparkEnv.get().blockManager().getRemote(WATERMARKS_BLOCK_ID);
-      if (blockResultOption.isDefined()) {
-        return (Map<Integer, SparkWatermarks>) blockResultOption.get().data().next();
-      } else {
-        return Maps.newHashMap();
-      }
+    public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
+
+      final long currentBatchTime = timeOf(batchCompleted.batchInfo());
+
+      GlobalWatermarkHolder.advance(Long.toString(currentBatchTime));
+
+      // make sure to update the last watermarked batch time AFTER the watermarks have already
+      // been updated (i.e., after the call to GlobalWatermarkHolder.advance(...))
+      // in addition, the watermark's block in the BlockManager is updated in an asynchronous manner
+      lastWatermarkedBatchTime =
+          laterOf(lastWatermarkedBatchTime, currentBatchTime);
+
+      LOG.info("Batch with timestamp: {} has completed, watermarks have been updated.",
+               lastWatermarkedBatchTime);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
index cfbad01..a5455da 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
@@ -73,8 +73,10 @@ public class SparkPipelineStateTest implements Serializable {
   }
 
   private PTransform<PBegin, PCollection<String>> getValues(final SparkPipelineOptions options) {
+    final boolean doNotSyncWithWatermark = false;
     return options.isStreaming()
-        ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1)).nextBatch("one", "two")
+        ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1), doNotSyncWithWatermark)
+                      .nextBatch("one", "two")
         : Create.of("one", "two");
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
index 770e0c0..a432fda 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
@@ -163,16 +163,16 @@ public class CreateStreamTest implements Serializable {
   public void testDiscardingMode() throws IOException {
     CreateStream<String> source =
         CreateStream.of(StringUtf8Coder.of(), batchDuration())
-            .nextBatch(
-                TimestampedValue.of("firstPane", new Instant(100)),
-                TimestampedValue.of("alsoFirstPane", new Instant(200)))
-            .advanceWatermarkForNextBatch(new Instant(1001L))
-            .nextBatch(
-                TimestampedValue.of("onTimePane", new Instant(500)))
-            .advanceNextBatchWatermarkToInfinity()
-            .nextBatch(
-                TimestampedValue.of("finalLatePane", new Instant(750)),
-                TimestampedValue.of("alsoFinalLatePane", new Instant(250)));
+                    .nextBatch(
+                        TimestampedValue.of("firstPane", new Instant(100)),
+                        TimestampedValue.of("alsoFirstPane", new Instant(200)))
+                    .advanceWatermarkForNextBatch(new Instant(1001L))
+                    .nextBatch(
+                        TimestampedValue.of("onTimePane", new Instant(500)))
+                    .advanceNextBatchWatermarkToInfinity()
+                    .nextBatch(
+                        TimestampedValue.of("finalLatePane", new Instant(750)),
+                        TimestampedValue.of("alsoFinalLatePane", new Instant(250)));
 
     FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
     Duration allowedLateness = Duration.millis(5000L);
@@ -212,12 +212,13 @@ public class CreateStreamTest implements Serializable {
     Instant lateElementTimestamp = new Instant(-1_000_000);
     CreateStream<String> source =
         CreateStream.of(StringUtf8Coder.of(), batchDuration())
-            .emptyBatch()
-            .advanceWatermarkForNextBatch(new Instant(0))
-            .nextBatch(
-                TimestampedValue.of("late", lateElementTimestamp),
-                TimestampedValue.of("onTime", new Instant(100)))
-            .advanceNextBatchWatermarkToInfinity();
+                    .emptyBatch()
+                    .advanceWatermarkForNextBatch(new Instant(0))
+                    .emptyBatch()
+                    .nextBatch(
+                        TimestampedValue.of("late", lateElementTimestamp),
+                        TimestampedValue.of("onTime", new Instant(100)))
+                    .advanceNextBatchWatermarkToInfinity();
 
     FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
     Duration allowedLateness = Duration.millis(5000L);

http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/resources/log4j.properties b/runners/spark/src/test/resources/log4j.properties
index 66e83c8..010c7df 100644
--- a/runners/spark/src/test/resources/log4j.properties
+++ b/runners/spark/src/test/resources/log4j.properties
@@ -24,7 +24,16 @@ log4j.rootLogger=ERROR, testlogger
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target = System.err
 log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.testlogger.layout.ConversionPattern=%d [%t] %-5p %c %x - %m%n
 
 # TestSparkRunner prints general information abut test pipelines execution.
 log4j.logger.org.apache.beam.runners.spark.TestSparkRunner=INFO
+
+# in case of an emergency - uncomment (or better yet, stay calm and uncomment).
+#log4j.logger.org.apache.beam=TRACE
+#log4j.logger.org.apache.beam.sdk.Pipeline=INFO
+#log4j.logger.org.apache.beam.sdk.coders=INFO
+#log4j.logger.org.apache.beam.sdk.runners.TransformHierarchy=ERROR
+#log4j.logger.org.apache.beam.runners.spark.SparkRunner$Evaluator=ERROR
+#log4j.logger.org.apache.beam.runners.spark.translation.streaming.WatermarkSyncedDStream#compute=INFO
+#log4j.logger.org.apache.beam.runners.spark.translation.streaming.WatermarkSyncedDStream=ERROR


[2/2] beam git commit: This closes #3738

Posted by st...@apache.org.
This closes #3738


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5181e619
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5181e619
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5181e619

Branch: refs/heads/master
Commit: 5181e619f17e1f69fabe8d5bdfc7a3a6a2142cde
Parents: c4517d0 15472b2
Author: Stas Levin <st...@apache.org>
Authored: Thu Aug 24 09:43:16 2017 +0300
Committer: Stas Levin <st...@apache.org>
Committed: Thu Aug 24 09:43:16 2017 +0300

----------------------------------------------------------------------
 .../apache/beam/runners/spark/SparkRunner.java  |   5 +-
 .../beam/runners/spark/io/CreateStream.java     | 104 ++++---
 .../SparkGroupAlsoByWindowViaWindowSet.java     | 158 +++++++---
 .../spark/stateful/SparkTimerInternals.java     |   6 +
 .../streaming/StreamingTransformTranslator.java |  71 +++--
 .../streaming/WatermarkSyncedDStream.java       | 149 +++++++++
 .../spark/util/GlobalWatermarkHolder.java       | 302 +++++++++++++------
 .../runners/spark/SparkPipelineStateTest.java   |   4 +-
 .../translation/streaming/CreateStreamTest.java |  33 +-
 .../spark/src/test/resources/log4j.properties   |  11 +-
 10 files changed, 633 insertions(+), 210 deletions(-)
----------------------------------------------------------------------