You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/05/10 20:27:02 UTC

[1/2] incubator-beam git commit: [BEAM-103][BEAM-130] Make Flink Source Parallel and Checkpointed

Repository: incubator-beam
Updated Branches:
  refs/heads/master 874ddef05 -> 4020e3645


[BEAM-103][BEAM-130] Make Flink Source Parallel and Checkpointed

This also changes how the setParallelism option of FlinkPipelineOptions
behaves. Now it defaults to either 1 or the default parallelism set in
the Flink configuration.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/336e90fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/336e90fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/336e90fa

Branch: refs/heads/master
Commit: 336e90fa795885e670ff3a523a6fefa6033c2663
Parents: 874ddef
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue May 3 13:35:35 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue May 10 22:20:08 2016 +0200

----------------------------------------------------------------------
 .../flink/DefaultParallelismFactory.java        |  39 +++
 .../runners/flink/FlinkPipelineOptions.java     |   2 +-
 .../FlinkStreamingTransformTranslators.java     |  18 +-
 .../streaming/io/UnboundedSourceWrapper.java    | 337 ++++++++++++++++---
 .../flink/streaming/TestCountingSource.java     | 256 ++++++++++++++
 .../flink/streaming/UnboundedSourceITCase.java  | 208 ------------
 .../streaming/UnboundedSourceWrapperTest.java   | 324 ++++++++++++++++++
 7 files changed, 916 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
new file mode 100644
index 0000000..e512db0
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+
+/**
+ * {@link DefaultValueFactory} for getting a default value for the parallelism option
+ * on {@link FlinkPipelineOptions}.
+ *
+ * <p>This will return either the default value from {@link GlobalConfiguration} or {@code 1}.
+ * A valid {@link GlobalConfiguration} is only available if the program is executed by the Flink
+ * run scripts.
+ */
+public class DefaultParallelismFactory implements DefaultValueFactory<Integer> {
+  @Override
+  public Integer create(PipelineOptions options) {
+    return GlobalConfiguration.getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 8c82abd..fd86bc9 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -70,7 +70,7 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp
   void setFlinkMaster(String value);
 
   @Description("The degree of parallelism to be used when distributing operations onto workers.")
-  @Default.Integer(-1)
+  @Default.InstanceFactory(DefaultParallelismFactory.class)
   Integer getParallelism();
   void setParallelism(Integer value);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 618727d..2778d5c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
@@ -284,8 +285,17 @@ public class FlinkStreamingTransformTranslators {
               }
             }).assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>());
       } else {
-        source = context.getExecutionEnvironment()
-            .addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));
+        try {
+          transform.getSource();
+          UnboundedSourceWrapper<T, ?> sourceWrapper =
+              new UnboundedSourceWrapper<>(
+                  context.getPipelineOptions(),
+                  transform.getSource(),
+                  context.getExecutionEnvironment().getParallelism());
+          source = context.getExecutionEnvironment().addSource(sourceWrapper).name(transform.getName());
+        } catch (Exception e) {
+          throw new RuntimeException("Error while translating UnboundedSource: " + transform.getSource(), e);
+        }
       }
 
       context.setOutputDataStream(output, source);
@@ -310,7 +320,9 @@ public class FlinkStreamingTransformTranslators {
       FlinkParDoBoundWrapper<IN, OUT> doFnWrapper = new FlinkParDoBoundWrapper<>(
           context.getPipelineOptions(), windowingStrategy, transform.getFn());
       DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform));
-      SingleOutputStreamOperator<WindowedValue<OUT>> outDataStream = inputDataStream.flatMap(doFnWrapper)
+      SingleOutputStreamOperator<WindowedValue<OUT>> outDataStream = inputDataStream
+          .flatMap(doFnWrapper)
+          .name(transform.getName())
           .returns(outputWindowedValueCoder);
 
       context.setOutputDataStream(context.getOutput(transform), outDataStream);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 9d15a33..b816e2a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -18,107 +18,286 @@
 package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
 
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
- * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the
- * {@link org.apache.beam.sdk.io.Read.Unbounded}  interface.
- *
- * For now we support non-parallel sources, checkpointing is WIP.
- * */
-public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements Triggerable {
+ * Wrapper for executing {@link UnboundedSource UnboundedSources} as a Flink Source.
+ */
+public class UnboundedSourceWrapper<
+    OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
+    extends RichParallelSourceFunction<WindowedValue<OutputT>>
+    implements Triggerable, Checkpointed<byte[]> {
 
-  private final String name;
-  private final UnboundedSource<T, ?> source;
+  private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class);
 
-  private StreamingRuntimeContext runtime = null;
-  private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;
+  /**
+   * Keep the options so that we can initialize the readers.
+   */
+  private final SerializedPipelineOptions serializedOptions;
 
-  private volatile boolean isRunning = false;
+  /**
+   * For snapshot and restore.
+   */
+  private final ListCoder<
+      KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> checkpointCoder;
 
-  private final SerializedPipelineOptions serializedOptions;
+  /**
+   * The split sources. We split them in the constructor to ensure that all parallel
+   * sources are consistent about the split sources.
+   */
+  private List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources;
 
-  /** Instantiated during runtime **/
-  private transient UnboundedSource.UnboundedReader<T> reader;
+  /**
+   * Make it a field so that we can access it in {@link #trigger(long)} for
+   * emitting watermarks.
+   */
+  private transient List<UnboundedSource.UnboundedReader<OutputT>> readers;
 
-  public UnboundedSourceWrapper(PipelineOptions pipelineOptions, Read.Unbounded<T> transform) {
-    this.name = transform.getName();
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-    this.source = transform.getSource();
-  }
+  /**
+   * Initialize here and not in run() to prevent races where we cancel a job before run() is
+   * ever called or run() is called after cancel().
+   */
+  private volatile boolean isRunning = true;
 
-  public String getName() {
-    return this.name;
-  }
+  /**
+   * Make it a field so that we can access it in {@link #trigger(long)} for registering new
+   * triggers.
+   */
+  private transient StreamingRuntimeContext runtimeContext;
 
-  WindowedValue<T> makeWindowedValue(T output, Instant timestamp) {
-    if (timestamp == null) {
-      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+  /**
+   * Make it a field so that we can access it in {@link #trigger(long)} for emitting
+   * watermarks.
+   */
+  private transient StreamSource.ManualWatermarkContext<WindowedValue<OutputT>> context;
+
+  /**
+   * When restoring from a snapshot we put the restored sources/checkpoint marks here
+   * and open in {@link #open(Configuration)}.
+   */
+  private transient List<
+      KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> restoredState;
+
+  @SuppressWarnings("unchecked")
+  public UnboundedSourceWrapper(
+      PipelineOptions pipelineOptions,
+      UnboundedSource<OutputT, CheckpointMarkT> source,
+      int parallelism) throws Exception {
+    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+
+    if (source.requiresDeduping()) {
+      LOG.warn("Source {} requires deduping but Flink runner doesn't support this yet.", source);
     }
-    return WindowedValue.of(output, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+
+    Coder<CheckpointMarkT> checkpointMarkCoder = source.getCheckpointMarkCoder();
+    Coder<? extends UnboundedSource<OutputT, CheckpointMarkT>> sourceCoder =
+        SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointMarkT>>() {});
+
+    checkpointCoder = (ListCoder) ListCoder.of(KvCoder.of(sourceCoder, checkpointMarkCoder));
+
+    // get the splits early. we assume that the generated splits are stable,
+    // this is necessary so that the mapping of state to source is correct
+    // when restoring
+    splitSources = source.generateInitialSplits(parallelism, pipelineOptions);
   }
 
   @Override
-  public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
+  public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
     if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
       throw new RuntimeException(
-          "We assume that all sources in Dataflow are EventTimeSourceFunction. " +
-              "Apparently " + this.name + " is not. " +
-              "Probably you should consider writing your own Wrapper for this source.");
+          "Cannot emit watermarks, this hints at a misconfiguration/bug.");
     }
 
-    context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
-    runtime = (StreamingRuntimeContext) getRuntimeContext();
+    context = (StreamSource.ManualWatermarkContext<WindowedValue<OutputT>>) ctx;
+    runtimeContext = (StreamingRuntimeContext) getRuntimeContext();
 
-    isRunning = true;
+    // figure out which split sources we're responsible for
+    int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+    int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
 
-    reader = source.createReader(serializedOptions.getPipelineOptions(), null);
+    List<UnboundedSource<OutputT, CheckpointMarkT>> localSources = new ArrayList<>();
 
-    boolean inputAvailable = reader.start();
+    for (int i = 0; i < splitSources.size(); i++) {
+      if (i % numSubtasks == subtaskIndex) {
+        localSources.add(splitSources.get(i));
+      }
+    }
+
+    LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}",
+        subtaskIndex,
+        numSubtasks,
+        localSources);
+
+    readers = new ArrayList<>();
+    if (restoredState != null) {
+
+      // restore the splitSources from the checkpoint to ensure consistent ordering
+      // do it using a transform because otherwise we would have to do
+      // unchecked casts
+      splitSources = Lists.transform(
+          restoredState,
+          new Function<
+              KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>,
+              UnboundedSource<OutputT, CheckpointMarkT>>() {
+        @Override
+        public UnboundedSource<OutputT, CheckpointMarkT> apply(
+            KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> input) {
+          return input.getKey();
+        }
+      });
+
+      for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored:
+          restoredState) {
+        readers.add(
+            restored.getKey().createReader(
+                serializedOptions.getPipelineOptions(), restored.getValue()));
+      }
+      restoredState = null;
+    } else {
+      // initialize readers from scratch
+      for (UnboundedSource<OutputT, CheckpointMarkT> source : localSources) {
+        readers.add(source.createReader(serializedOptions.getPipelineOptions(), null));
+      }
+    }
+
+    if (readers.size() == 0) {
+      // do nothing, but still look busy ...
+      // also, output a Long.MAX_VALUE watermark since we know that we're not
+      // going to emit anything
+      // we can't return here since Flink requires that all operators stay up,
+      // otherwise checkpointing would not work correctly anymore
+      ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
 
-    setNextWatermarkTimer(this.runtime);
+      // wait until this is canceled
+      final Object waitLock = new Object();
+      while (isRunning) {
+        try {
+          // Flink will interrupt us at some point
+          //noinspection SynchronizationOnLocalVariableOrMethodParameter
+          synchronized (waitLock) {
+            waitLock.wait();
+          }
+        } catch (InterruptedException e) {
+          if (!isRunning) {
+            // restore the interrupted state, and fall through the loop
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+    } else if (readers.size() == 1) {
+      // the easy case, we just read from one reader
+      UnboundedSource.UnboundedReader<OutputT> reader = readers.get(0);
 
+      boolean dataAvailable = reader.start();
+      if (dataAvailable) {
+        emitElement(ctx, reader);
+      }
 
-    try {
+      setNextWatermarkTimer(this.runtimeContext);
 
       while (isRunning) {
+        dataAvailable = reader.advance();
 
-        if (!inputAvailable && isRunning) {
-          // wait a bit until we retry to pull more records
+        if (dataAvailable)  {
+          emitElement(ctx, reader);
+        } else {
           Thread.sleep(50);
-          inputAvailable = reader.advance();
         }
+      }
+    } else {
+      // a bit more complicated, we are responsible for several readers
+      // loop through them and sleep if none of them had any data
 
-        if (inputAvailable) {
+      int numReaders = readers.size();
+      int currentReader = 0;
 
-          // get it and its timestamp from the source
-          T item = reader.getCurrent();
-          Instant timestamp = reader.getCurrentTimestamp();
+      // start each reader and emit data if immediately available
+      for (UnboundedSource.UnboundedReader<OutputT> reader : readers) {
+        boolean dataAvailable = reader.start();
+        if (dataAvailable) {
+          emitElement(ctx, reader);
+        }
+      }
 
-          // write it to the output collector
-          synchronized (ctx.getCheckpointLock()) {
-            context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
-          }
+      // a flag telling us whether any of the readers had data
+      // if no reader had data, sleep for bit
+      boolean hadData = false;
+      while (isRunning) {
+        UnboundedSource.UnboundedReader<OutputT> reader = readers.get(currentReader);
+        boolean dataAvailable = reader.advance();
 
-          inputAvailable = reader.advance();
+        if (dataAvailable) {
+          emitElement(ctx, reader);
+          hadData = true;
+        }
+
+        currentReader = (currentReader + 1) % numReaders;
+        if (currentReader == 0 && !hadData) {
+          Thread.sleep(50);
+        } else if (currentReader == 0) {
+          hadData = false;
         }
       }
 
-    } finally {
-      reader.close();
+    }
+  }
+
+  /**
+   * Emit the current element from the given Reader. The reader is guaranteed to have data.
+   */
+  private void emitElement(
+      SourceContext<WindowedValue<OutputT>> ctx,
+      UnboundedSource.UnboundedReader<OutputT> reader) {
+    // make sure that reader state update and element emission are atomic
+    // with respect to snapshots
+    synchronized (ctx.getCheckpointLock()) {
+
+      OutputT item = reader.getCurrent();
+      Instant timestamp = reader.getCurrentTimestamp();
+
+      WindowedValue<OutputT> windowedValue =
+          WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+      ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    if (readers != null) {
+      for (UnboundedSource.UnboundedReader<OutputT> reader: readers) {
+        reader.close();
+      }
     }
   }
 
@@ -128,13 +307,52 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
   }
 
   @Override
+  public byte[] snapshotState(long l, long l1) throws Exception {
+    // we checkpoint the sources along with the CheckpointMarkT to ensure
+    // than we have a correct mapping of checkpoints to sources when
+    // restoring
+    List<KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> checkpoints =
+        new ArrayList<>();
+
+    for (int i = 0; i < splitSources.size(); i++) {
+      UnboundedSource<OutputT, CheckpointMarkT> source = splitSources.get(i);
+      UnboundedSource.UnboundedReader<OutputT> reader = readers.get(i);
+
+      @SuppressWarnings("unchecked")
+      CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark();
+      KV<UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> kv =
+          KV.of(source, mark);
+      checkpoints.add(kv);
+    }
+
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      checkpointCoder.encode(checkpoints, baos, Coder.Context.OUTER);
+      return baos.toByteArray();
+    }
+  }
+
+  @Override
+  public void restoreState(byte[] bytes) throws Exception {
+    try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
+      restoredState = checkpointCoder.decode(bais, Coder.Context.OUTER);
+    }
+  }
+
+  @Override
   public void trigger(long timestamp) throws Exception {
     if (this.isRunning) {
       synchronized (context.getCheckpointLock()) {
-        long watermarkMillis = this.reader.getWatermark().getMillis();
+        // find minimum watermark over all readers
+        long watermarkMillis = Long.MAX_VALUE;
+        for (UnboundedSource.UnboundedReader<OutputT> reader: readers) {
+          Instant watermark = reader.getWatermark();
+          if (watermark != null) {
+            watermarkMillis = Math.min(watermark.getMillis(), watermarkMillis);
+          }
+        }
         context.emitWatermark(new Watermark(watermarkMillis));
       }
-      setNextWatermarkTimer(this.runtime);
+      setNextWatermarkTimer(this.runtimeContext);
     }
   }
 
@@ -150,4 +368,11 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
     return System.currentTimeMillis() + watermarkInterval;
   }
 
+  /**
+   * Visible so that we can check this in tests. Must not be used for anything else.
+   */
+  @VisibleForTesting
+  public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getSplitSources() {
+    return splitSources;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
new file mode 100644
index 0000000..3ced02e
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.DelegateCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+
+/**
+ * An unbounded source for testing the unbounded sources framework code.
+ *
+ * <p>Each split of this sources produces records of the form KV(split_id, i),
+ * where i counts up from 0.  Each record has a timestamp of i, and the watermark
+ * accurately tracks these timestamps.  The reader will occasionally return false
+ * from {@code advance}, in order to simulate a source where not all the data is
+ * available immediately.
+ */
+public class TestCountingSource
+    extends UnboundedSource<KV<Integer, Integer>, TestCountingSource.CounterMark> {
+  private static final Logger LOG = LoggerFactory.getLogger(TestCountingSource.class);
+
+  private static List<Integer> finalizeTracker;
+  private final int numMessagesPerShard;
+  private final int shardNumber;
+  private final boolean dedup;
+  private final boolean throwOnFirstSnapshot;
+  private final boolean allowSplitting;
+
+  /**
+   * We only allow an exception to be thrown from getCheckpointMark
+   * at most once. This must be static since the entire TestCountingSource
+   * instance may re-serialized when the pipeline recovers and retries.
+   */
+  private static boolean thrown = false;
+
+  public static void setFinalizeTracker(List<Integer> finalizeTracker) {
+    TestCountingSource.finalizeTracker = finalizeTracker;
+  }
+
+  public TestCountingSource(int numMessagesPerShard) {
+    this(numMessagesPerShard, 0, false, false, true);
+  }
+
+  public TestCountingSource withDedup() {
+    return new TestCountingSource(
+        numMessagesPerShard, shardNumber, true, throwOnFirstSnapshot, true);
+  }
+
+  private TestCountingSource withShardNumber(int shardNumber) {
+    return new TestCountingSource(
+        numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true);
+  }
+
+  public TestCountingSource withThrowOnFirstSnapshot(boolean throwOnFirstSnapshot) {
+    return new TestCountingSource(
+        numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true);
+  }
+
+  public TestCountingSource withoutSplitting() {
+    return new TestCountingSource(
+        numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, false);
+  }
+
+  private TestCountingSource(int numMessagesPerShard, int shardNumber, boolean dedup,
+                             boolean throwOnFirstSnapshot, boolean allowSplitting) {
+    this.numMessagesPerShard = numMessagesPerShard;
+    this.shardNumber = shardNumber;
+    this.dedup = dedup;
+    this.throwOnFirstSnapshot = throwOnFirstSnapshot;
+    this.allowSplitting = allowSplitting;
+  }
+
+  public int getShardNumber() {
+    return shardNumber;
+  }
+
+  @Override
+  public List<TestCountingSource> generateInitialSplits(
+      int desiredNumSplits, PipelineOptions options) {
+    List<TestCountingSource> splits = new ArrayList<>();
+    int numSplits = allowSplitting ? desiredNumSplits : 1;
+    for (int i = 0; i < numSplits; i++) {
+      splits.add(withShardNumber(i));
+    }
+    return splits;
+  }
+
+  class CounterMark implements UnboundedSource.CheckpointMark {
+    int current;
+
+    public CounterMark(int current) {
+      this.current = current;
+    }
+
+    @Override
+    public void finalizeCheckpoint() {
+      if (finalizeTracker != null) {
+        finalizeTracker.add(current);
+      }
+    }
+  }
+
+  @Override
+  public Coder<CounterMark> getCheckpointMarkCoder() {
+    return DelegateCoder.of(
+        VarIntCoder.of(),
+        new DelegateCoder.CodingFunction<CounterMark, Integer>() {
+          @Override
+          public Integer apply(CounterMark input) {
+            return input.current;
+          }
+        },
+        new DelegateCoder.CodingFunction<Integer, CounterMark>() {
+          @Override
+          public CounterMark apply(Integer input) {
+            return new CounterMark(input);
+          }
+        });
+  }
+
+  @Override
+  public boolean requiresDeduping() {
+    return dedup;
+  }
+
+  /**
+   * Public only so that the checkpoint can be conveyed from {@link #getCheckpointMark()} to
+   * {@link TestCountingSource#createReader(PipelineOptions, CounterMark)} without cast.
+   */
+  public class CountingSourceReader extends UnboundedReader<KV<Integer, Integer>> {
+    private int current;
+
+    public CountingSourceReader(int startingPoint) {
+      this.current = startingPoint;
+    }
+
+    @Override
+    public boolean start() {
+      return advance();
+    }
+
+    @Override
+    public boolean advance() {
+      if (current >= numMessagesPerShard - 1) {
+        return false;
+      }
+      // If testing dedup, occasionally insert a duplicate value;
+      if (current >= 0 && dedup && ThreadLocalRandom.current().nextInt(5) == 0) {
+        return true;
+      }
+      current++;
+      return true;
+    }
+
+    @Override
+    public KV<Integer, Integer> getCurrent() {
+      return KV.of(shardNumber, current);
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() {
+      return new Instant(current);
+    }
+
+    @Override
+    public byte[] getCurrentRecordId() {
+      try {
+        return encodeToByteArray(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()), getCurrent());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void close() {}
+
+    @Override
+    public TestCountingSource getCurrentSource() {
+      return TestCountingSource.this;
+    }
+
+    @Override
+    public Instant getWatermark() {
+      // The watermark is a promise about future elements, and the timestamps of elements are
+      // strictly increasing for this source.
+      return new Instant(current + 1);
+    }
+
+    @Override
+    public CounterMark getCheckpointMark() {
+      if (throwOnFirstSnapshot && !thrown) {
+        thrown = true;
+        LOG.error("Throwing exception while checkpointing counter");
+        throw new RuntimeException("failed during checkpoint");
+      }
+      // The checkpoint can assume all records read, including the current, have
+      // been commited.
+      return new CounterMark(current);
+    }
+
+    @Override
+    public long getSplitBacklogBytes() {
+      return 7L;
+    }
+  }
+
+  @Override
+  public CountingSourceReader createReader(
+      PipelineOptions options, @Nullable CounterMark checkpointMark) {
+    if (checkpointMark == null) {
+      LOG.debug("creating reader");
+    } else {
+      LOG.debug("restoring reader from checkpoint with current = {}", checkpointMark.current);
+    }
+    return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : -1);
+  }
+
+  @Override
+  public void validate() {}
+
+  @Override
+  public Coder<KV<Integer, Integer>> getDefaultOutputCoder() {
+    return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
deleted file mode 100644
index 8a5de15..0000000
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceITCase.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-import org.apache.beam.runners.flink.FlinkTestPipeline;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-
-public class UnboundedSourceITCase extends StreamingProgramTestBase {
-
-  protected static String resultPath;
-
-  public UnboundedSourceITCase() {
-  }
-
-  static final String[] EXPECTED_RESULT = new String[]{
-      "1", "2", "3", "4", "5", "6", "7", "8", "9"};
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-  }
-
-  @Override
-  protected void testProgram() throws Exception {
-    runProgram(resultPath);
-  }
-
-  private static void runProgram(String resultPath) {
-
-    Pipeline p = FlinkTestPipeline.createForStreaming();
-
-    PCollection<String> result = p
-        .apply(Read.from(new RangeReadSource(1, 10)))
-        .apply(Window.<Integer>into(new GlobalWindows())
-            .triggering(AfterPane.elementCountAtLeast(10))
-            .discardingFiredPanes())
-        .apply(ParDo.of(new DoFn<Integer, String>() {
-          @Override
-          public void processElement(ProcessContext c) throws Exception {
-            c.output(c.element().toString());
-          }
-        }));
-
-    result.apply(TextIO.Write.to(resultPath));
-
-    try {
-      p.run();
-      fail();
-    } catch(Exception e) {
-      assertEquals("The source terminates as expected.", e.getCause().getCause().getMessage());
-    }
-  }
-
-
-  private static class RangeReadSource extends UnboundedSource<Integer, UnboundedSource.CheckpointMark> {
-
-    final int from;
-    final int to;
-
-    RangeReadSource(int from, int to) {
-      this.from = from;
-      this.to = to;
-    }
-
-
-    @Override
-    public List<? extends UnboundedSource<Integer, CheckpointMark>> generateInitialSplits(
-        int desiredNumSplits, PipelineOptions options) throws Exception {
-      return ImmutableList.of(this);
-    }
-
-    @Override
-    public UnboundedReader<Integer> createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) {
-      return new RangeReadReader(options);
-    }
-
-    @Nullable
-    @Override
-    public Coder<CheckpointMark> getCheckpointMarkCoder() {
-      return null;
-    }
-
-    @Override
-    public void validate() {
-    }
-
-    @Override
-    public Coder<Integer> getDefaultOutputCoder() {
-      return BigEndianIntegerCoder.of();
-    }
-
-    private class RangeReadReader extends UnboundedReader<Integer> {
-
-      private int current;
-
-      private long watermark;
-
-      public RangeReadReader(PipelineOptions options) {
-        assertNotNull(options);
-        current = from;
-      }
-
-      @Override
-      public boolean start() throws IOException {
-        return true;
-      }
-
-      @Override
-      public boolean advance() throws IOException {
-        current++;
-        watermark++;
-
-        if (current >= to) {
-          try {
-            compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-            throw new IOException("The source terminates as expected.");
-          } catch (IOException e) {
-            // pass on the exception to terminate the source
-            throw e;
-          } catch (Throwable t) {
-            // expected here from the file check
-          }
-        }
-        return current < to;
-      }
-
-      @Override
-      public Integer getCurrent() throws NoSuchElementException {
-        return current;
-      }
-
-      @Override
-      public Instant getCurrentTimestamp() throws NoSuchElementException {
-        return new Instant(current);
-      }
-
-      @Override
-      public void close() throws IOException {
-      }
-
-      @Override
-      public Instant getWatermark() {
-        return new Instant(watermark);
-      }
-
-      @Override
-      public CheckpointMark getCheckpointMark() {
-        return null;
-      }
-
-      @Override
-      public UnboundedSource<Integer, ?> getCurrentSource() {
-        return RangeReadSource.this;
-      }
-    }
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336e90fa/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
new file mode 100644
index 0000000..f5a52f5
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Tests for {@link UnboundedSourceWrapper}.
+ */
+public class UnboundedSourceWrapperTest {
+
+  /**
+   * Creates a {@link UnboundedSourceWrapper} that has exactly one reader per source, since we
+   * specify a parallelism of 1 and also at runtime tell the source that it has 1 parallel subtask.
+   */
+  @Test
+  public void testWithOneReader() throws Exception {
+    final int NUM_ELEMENTS = 20;
+    final Object checkpointLock = new Object();
+    PipelineOptions options = PipelineOptionsFactory.create();
+
+    // this source will emit exactly NUM_ELEMENTS across all parallel readers,
+    // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
+    // elements later.
+    TestCountingSource source = new TestCountingSource(NUM_ELEMENTS);
+    UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
+        new UnboundedSourceWrapper<>(options, source, 1);
+
+    assertEquals(1, flinkWrapper.getSplitSources().size());
+
+    StreamSource<
+        WindowedValue<KV<Integer, Integer>>,
+        UnboundedSourceWrapper<
+            KV<Integer, Integer>,
+            TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
+
+    setupSourceOperator(sourceOperator);
+
+
+    try {
+      sourceOperator.run(checkpointLock,
+          new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+            private int count = 0;
+
+            @Override
+            public void emitWatermark(Watermark watermark) {
+            }
+
+            @Override
+            public void collect(
+                StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+
+              count++;
+              if (count >= NUM_ELEMENTS) {
+                throw new SuccessException();
+              }
+            }
+
+            @Override
+            public void close() {
+
+            }
+          });
+    } catch (SuccessException e) {
+      // success
+    } catch (Exception e) {
+      fail("We caught " + e);
+    }
+  }
+
+  /**
+   * Creates a {@link UnboundedSourceWrapper} that has multiple readers per source, since we
+   * specify a parallelism higher than 1 and at runtime tell the source that it has 1 parallel
+   * this means that one source will manage multiple readers.
+   */
+  @Test
+  public void testWithMultipleReaders() throws Exception {
+    final int NUM_ELEMENTS = 20;
+    final Object checkpointLock = new Object();
+    PipelineOptions options = PipelineOptionsFactory.create();
+
+    // this source will emit exactly NUM_ELEMENTS across all parallel readers,
+    // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
+    // elements later.
+    TestCountingSource source = new TestCountingSource(NUM_ELEMENTS);
+    UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
+        new UnboundedSourceWrapper<>(options, source, 4);
+
+    assertEquals(4, flinkWrapper.getSplitSources().size());
+
+    StreamSource<WindowedValue<
+        KV<Integer, Integer>>,
+        UnboundedSourceWrapper<
+            KV<Integer, Integer>,
+            TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
+
+    setupSourceOperator(sourceOperator);
+
+
+    try {
+      sourceOperator.run(checkpointLock,
+          new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+            private int count = 0;
+
+            @Override
+            public void emitWatermark(Watermark watermark) {
+            }
+
+            @Override
+            public void collect(
+                StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+
+              count++;
+              if (count >= NUM_ELEMENTS) {
+                throw new SuccessException();
+              }
+            }
+
+            @Override
+            public void close() {
+
+            }
+          });
+    } catch (SuccessException e) {
+      // success
+      return;
+    }
+    fail("Read terminated without producing expected number of outputs");
+  }
+
+  /**
+   * Verify that snapshot/restore work as expected. We bring up a source and cancel
+   * after seeing a certain number of elements. Then we snapshot that source,
+   * bring up a completely new source that we restore from the snapshot and verify
+   * that we see all expected elements in the end.
+   */
+  @Test
+  public void testRestore() throws Exception {
+    final int NUM_ELEMENTS = 20;
+    final Object checkpointLock = new Object();
+    PipelineOptions options = PipelineOptionsFactory.create();
+
+    // this source will emit exactly NUM_ELEMENTS across all parallel readers,
+    // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
+    // elements later.
+    TestCountingSource source = new TestCountingSource(NUM_ELEMENTS);
+    UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
+        new UnboundedSourceWrapper<>(options, source, 1);
+
+    assertEquals(1, flinkWrapper.getSplitSources().size());
+
+    StreamSource<
+        WindowedValue<KV<Integer, Integer>>,
+        UnboundedSourceWrapper<
+            KV<Integer, Integer>,
+            TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
+
+    setupSourceOperator(sourceOperator);
+
+    final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
+
+    boolean readFirstBatchOfElements = false;
+
+    try {
+      sourceOperator.run(checkpointLock,
+          new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+            private int count = 0;
+
+            @Override
+            public void emitWatermark(Watermark watermark) {
+            }
+
+            @Override
+            public void collect(
+                StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+
+              emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+              count++;
+              if (count >= NUM_ELEMENTS / 2) {
+                throw new SuccessException();
+              }
+            }
+
+            @Override
+            public void close() {
+
+            }
+          });
+    } catch (SuccessException e) {
+      // success
+      readFirstBatchOfElements = true;
+    }
+
+    assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements);
+
+    // draw a snapshot
+    byte[] snapshot = flinkWrapper.snapshotState(0, 0);
+
+    // create a completely new source but restore from the snapshot
+    TestCountingSource restoredSource = new TestCountingSource(NUM_ELEMENTS);
+    UnboundedSourceWrapper<
+        KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
+        new UnboundedSourceWrapper<>(options, restoredSource, 1);
+
+    assertEquals(1, restoredFlinkWrapper.getSplitSources().size());
+
+    StreamSource<
+        WindowedValue<KV<Integer, Integer>>,
+        UnboundedSourceWrapper<
+            KV<Integer, Integer>,
+            TestCountingSource.CounterMark>> restoredSourceOperator =
+        new StreamSource<>(restoredFlinkWrapper);
+
+    setupSourceOperator(restoredSourceOperator);
+
+    // restore snapshot
+    restoredFlinkWrapper.restoreState(snapshot);
+
+    boolean readSecondBatchOfElements = false;
+
+    // run again and verify that we see the other elements
+    try {
+      restoredSourceOperator.run(checkpointLock,
+          new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+            private int count = 0;
+
+            @Override
+            public void emitWatermark(Watermark watermark) {
+            }
+
+            @Override
+            public void collect(
+                StreamRecord<WindowedValue<KV<Integer,Integer>>> windowedValueStreamRecord) {
+              emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+              count++;
+              if (count >= NUM_ELEMENTS / 2) {
+                throw new SuccessException();
+              }
+            }
+
+            @Override
+            public void close() {
+
+            }
+          });
+    } catch (SuccessException e) {
+      // success
+      readSecondBatchOfElements = true;
+    }
+
+    assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
+
+    // verify that we saw all NUM_ELEMENTS elements
+    assertTrue(emittedElements.size() == NUM_ELEMENTS);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> void setupSourceOperator(StreamSource<T, ?> operator) {
+    ExecutionConfig executionConfig = new ExecutionConfig();
+    StreamConfig cfg = new StreamConfig(new Configuration());
+
+    cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+    Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
+
+    StreamTask<?, ?> mockTask = mock(StreamTask.class);
+    when(mockTask.getName()).thenReturn("Mock Task");
+    when(mockTask.getCheckpointLock()).thenReturn(new Object());
+    when(mockTask.getConfiguration()).thenReturn(cfg);
+    when(mockTask.getEnvironment()).thenReturn(env);
+    when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+    when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
+
+    operator.setup(mockTask, cfg, (Output< StreamRecord<T>>) mock(Output.class));
+  }
+
+  /**
+   * A special {@link RuntimeException} that we throw to signal that the test was successful.
+   */
+  private static class SuccessException extends RuntimeException {}
+}


[2/2] incubator-beam git commit: This Closes #274

Posted by al...@apache.org.
This Closes #274


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4020e364
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4020e364
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4020e364

Branch: refs/heads/master
Commit: 4020e364598befbd68458f1c6eada6d90a986358
Parents: 874ddef 336e90f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue May 10 22:20:47 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue May 10 22:20:47 2016 +0200

----------------------------------------------------------------------
 .../flink/DefaultParallelismFactory.java        |  39 +++
 .../runners/flink/FlinkPipelineOptions.java     |   2 +-
 .../FlinkStreamingTransformTranslators.java     |  18 +-
 .../streaming/io/UnboundedSourceWrapper.java    | 337 ++++++++++++++++---
 .../flink/streaming/TestCountingSource.java     | 256 ++++++++++++++
 .../flink/streaming/UnboundedSourceITCase.java  | 208 ------------
 .../streaming/UnboundedSourceWrapperTest.java   | 324 ++++++++++++++++++
 7 files changed, 916 insertions(+), 268 deletions(-)
----------------------------------------------------------------------