You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/03/27 04:50:28 UTC

[beam] 02/03: [BEAM-2393] Make BoundedSource fault-tolerant

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0e44feb120a73c36116b2bd6b89e4f1676d7266f
Author: Grzegorz KoĊ‚akowski <gr...@getindata.com>
AuthorDate: Wed Feb 21 11:11:53 2018 +0100

    [BEAM-2393] Make BoundedSource fault-tolerant
---
 .../UnboundedReadFromBoundedSource.java            |   5 +-
 .../flink/FlinkStreamingTransformTranslators.java  | 138 ++++++++++-
 .../streaming/io/BoundedSourceWrapper.java         | 259 ---------------------
 .../flink/streaming/BoundedSourceRestoreTest.java  | 236 +++++++++++++++++++
 4 files changed, 367 insertions(+), 271 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 09acc82..88119d1 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -169,8 +169,11 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
       return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
     }
 
+    /**
+     * A marker representing the progress and state of an {@link BoundedToUnboundedSourceAdapter}.
+     */
     @VisibleForTesting
-    static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
+    public static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
       private final @Nullable List<TimestampedValue<T>> residualElements;
       private final @Nullable BoundedSource<T> residualSource;
 
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 970ece1..74ca168 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -42,6 +42,7 @@ import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
@@ -51,7 +52,6 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKey
 import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.Coder;
@@ -87,18 +87,27 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
@@ -276,8 +285,7 @@ class FlinkStreamingTransformTranslators {
       PCollection<T> output = context.getOutput(transform);
 
       TypeInformation<WindowedValue<T>> outputTypeInfo =
-          context.getTypeInfo(context.getOutput(transform));
-
+                    context.getTypeInfo(context.getOutput(transform));
 
       BoundedSource<T> rawSource;
       try {
@@ -289,24 +297,26 @@ class FlinkStreamingTransformTranslators {
       }
 
       String fullName = getCurrentTransformName(context);
+      UnboundedSource<T, ?> adaptedRawSource = new BoundedToUnboundedSourceAdapter<>(rawSource);
       DataStream<WindowedValue<T>> source;
       try {
-        BoundedSourceWrapper<T> sourceWrapper =
-            new BoundedSourceWrapper<>(
+        UnboundedSourceWrapperNoValueWithRecordId<T, ?> sourceWrapper =
+            new UnboundedSourceWrapperNoValueWithRecordId<>(
+                new UnboundedSourceWrapper<>(
                 fullName,
                 context.getPipelineOptions(),
-                rawSource,
-                context.getExecutionEnvironment().getParallelism());
+                adaptedRawSource,
+                context.getExecutionEnvironment().getParallelism())
+            );
         source = context
             .getExecutionEnvironment()
             .addSource(sourceWrapper)
-            .name(fullName).uid(fullName)
+            .name(fullName)
+            .uid(fullName)
             .returns(outputTypeInfo);
       } catch (Exception e) {
-        throw new RuntimeException(
-            "Error while translating BoundedSource: " + rawSource, e);
+        throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e);
       }
-
       context.setOutputDataStream(output, source);
     }
   }
@@ -1259,4 +1269,110 @@ class FlinkStreamingTransformTranslators {
       return CreateStreamingFlinkView.CREATE_STREAMING_FLINK_VIEW_URN;
     }
   }
+
+  /**
+   * Wrapper for {@link UnboundedSourceWrapper}, which simplifies output type, namely, removes
+   * {@link ValueWithRecordId}.
+   */
+  private static class UnboundedSourceWrapperNoValueWithRecordId<
+      OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
+      extends RichParallelSourceFunction<WindowedValue<OutputT>>
+      implements ProcessingTimeCallback, StoppableFunction,
+      CheckpointListener, CheckpointedFunction {
+
+    private final UnboundedSourceWrapper<OutputT, CheckpointMarkT> unboundedSourceWrapper;
+
+    private UnboundedSourceWrapperNoValueWithRecordId(
+        UnboundedSourceWrapper<OutputT, CheckpointMarkT> unboundedSourceWrapper) {
+      this.unboundedSourceWrapper = unboundedSourceWrapper;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+      unboundedSourceWrapper.setRuntimeContext(getRuntimeContext());
+      unboundedSourceWrapper.open(parameters);
+    }
+
+    @Override
+    public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
+      unboundedSourceWrapper.run(new SourceContextWrapper(ctx));
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+      unboundedSourceWrapper.initializeState(context);
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+      unboundedSourceWrapper.snapshotState(context);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+      unboundedSourceWrapper.notifyCheckpointComplete(checkpointId);
+    }
+
+    @Override
+    public void stop() {
+      unboundedSourceWrapper.stop();
+    }
+
+    @Override
+    public void cancel() {
+      unboundedSourceWrapper.cancel();
+    }
+
+    @Override
+    public void onProcessingTime(long timestamp) throws Exception {
+      unboundedSourceWrapper.onProcessingTime(timestamp);
+    }
+
+    private final class SourceContextWrapper implements
+        SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> {
+
+      private final SourceContext<WindowedValue<OutputT>> ctx;
+
+      private SourceContextWrapper(SourceContext<WindowedValue<OutputT>> ctx) {
+        this.ctx = ctx;
+      }
+
+      @Override
+      public void collect(WindowedValue<ValueWithRecordId<OutputT>> element) {
+        OutputT originalValue = element.getValue().getValue();
+        WindowedValue<OutputT> output = WindowedValue
+            .of(originalValue, element.getTimestamp(), element.getWindows(), element.getPane());
+        ctx.collect(output);
+      }
+
+      @Override
+      public void collectWithTimestamp(WindowedValue<ValueWithRecordId<OutputT>> element,
+          long timestamp) {
+        OutputT originalValue = element.getValue().getValue();
+        WindowedValue<OutputT> output = WindowedValue
+            .of(originalValue, element.getTimestamp(), element.getWindows(), element.getPane());
+        ctx.collectWithTimestamp(output, timestamp);
+      }
+
+      @Override
+      public void emitWatermark(Watermark mark) {
+        ctx.emitWatermark(mark);
+      }
+
+      @Override
+      public void markAsTemporarilyIdle() {
+        ctx.markAsTemporarilyIdle();
+      }
+
+      @Override
+      public Object getCheckpointLock() {
+        return ctx.getCheckpointLock();
+      }
+
+      @Override
+      public void close() {
+        ctx.close();
+      }
+    }
+  }
 }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
deleted file mode 100644
index 6db5426..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
-import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
-import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.flink.api.common.functions.StoppableFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Wrapper for executing {@link BoundedSource BoundedSources} as a Flink Source.
- */
-public class BoundedSourceWrapper<OutputT>
-    extends RichParallelSourceFunction<WindowedValue<OutputT>>
-    implements StoppableFunction {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class);
-
-  private String stepName;
-  /**
-   * Keep the options so that we can initialize the readers.
-   */
-  private final SerializablePipelineOptions serializedOptions;
-
-  /**
-   * The split sources. We split them in the constructor to ensure that all parallel
-   * sources are consistent about the split sources.
-   */
-  private List<? extends BoundedSource<OutputT>> splitSources;
-
-  /**
-   * Make it a field so that we can access it in {@link #close()}.
-   */
-  private transient List<BoundedSource.BoundedReader<OutputT>> readers;
-
-  /**
-   * Initialize here and not in run() to prevent races where we cancel a job before run() is
-   * ever called or run() is called after cancel().
-   */
-  private volatile boolean isRunning = true;
-
-  @SuppressWarnings("unchecked")
-  public BoundedSourceWrapper(
-      String stepName,
-      PipelineOptions pipelineOptions,
-      BoundedSource<OutputT> source,
-      int parallelism) throws Exception {
-    this.stepName = stepName;
-    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
-
-    long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism;
-
-    // get the splits early. we assume that the generated splits are stable,
-    // this is necessary so that the mapping of state to source is correct
-    // when restoring
-    splitSources = source.split(desiredBundleSize, pipelineOptions);
-  }
-
-  @Override
-  public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
-
-    // figure out which split sources we're responsible for
-    int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-    int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
-
-    List<BoundedSource<OutputT>> localSources = new ArrayList<>();
-
-    for (int i = 0; i < splitSources.size(); i++) {
-      if (i % numSubtasks == subtaskIndex) {
-        localSources.add(splitSources.get(i));
-      }
-    }
-
-    LOG.info("Bounded Flink Source {}/{} is reading from sources: {}",
-        subtaskIndex,
-        numSubtasks,
-        localSources);
-
-    FlinkMetricContainer metricContainer = new FlinkMetricContainer(getRuntimeContext());
-
-    ReaderInvocationUtil<OutputT, BoundedSource.BoundedReader<OutputT>> readerInvoker =
-        new ReaderInvocationUtil<>(
-            stepName,
-            serializedOptions.get(),
-            metricContainer);
-
-    readers = new ArrayList<>();
-    // initialize readers from scratch
-    for (BoundedSource<OutputT> source : localSources) {
-      readers.add(source.createReader(serializedOptions.get()));
-    }
-
-   if (readers.size() == 1) {
-      // the easy case, we just read from one reader
-      BoundedSource.BoundedReader<OutputT> reader = readers.get(0);
-
-      boolean dataAvailable = readerInvoker.invokeStart(reader);
-      if (dataAvailable) {
-        emitElement(ctx, reader);
-      }
-
-      while (isRunning) {
-        dataAvailable = readerInvoker.invokeAdvance(reader);
-
-        if (dataAvailable)  {
-          emitElement(ctx, reader);
-        } else {
-          break;
-        }
-      }
-    } else {
-      // a bit more complicated, we are responsible for several readers
-      // loop through them and sleep if none of them had any data
-
-      int currentReader = 0;
-
-      // start each reader and emit data if immediately available
-      for (BoundedSource.BoundedReader<OutputT> reader : readers) {
-        boolean dataAvailable = readerInvoker.invokeStart(reader);
-        if (dataAvailable) {
-          emitElement(ctx, reader);
-        }
-      }
-
-      // a flag telling us whether any of the readers had data
-      // if no reader had data, sleep for bit
-      boolean hadData = false;
-      while (isRunning && !readers.isEmpty()) {
-        BoundedSource.BoundedReader<OutputT> reader = readers.get(currentReader);
-        boolean dataAvailable = readerInvoker.invokeAdvance(reader);
-
-        if (dataAvailable) {
-          emitElement(ctx, reader);
-          hadData = true;
-        } else {
-          readers.remove(currentReader);
-          currentReader--;
-          if (readers.isEmpty()) {
-            break;
-          }
-        }
-
-        currentReader = (currentReader + 1) % readers.size();
-        if (currentReader == 0 && !hadData) {
-          Thread.sleep(50);
-        } else if (currentReader == 0) {
-          hadData = false;
-        }
-      }
-
-    }
-
-    // emit final Long.MAX_VALUE watermark, just to be sure
-    ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
-
-    FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class);
-    if (!options.isShutdownSourcesOnFinalWatermark()) {
-      // do nothing, but still look busy ...
-      // we can't return here since Flink requires that all operators stay up,
-      // otherwise checkpointing would not work correctly anymore
-      //
-      // See https://issues.apache.org/jira/browse/FLINK-2491 for progress on this issue
-
-      // wait until this is canceled
-      final Object waitLock = new Object();
-      while (isRunning) {
-        try {
-          // Flink will interrupt us at some point
-          //noinspection SynchronizationOnLocalVariableOrMethodParameter
-          synchronized (waitLock) {
-            // don't wait indefinitely, in case something goes horribly wrong
-            waitLock.wait(1000);
-          }
-        } catch (InterruptedException e) {
-          if (!isRunning) {
-            // restore the interrupted state, and fall through the loop
-            Thread.currentThread().interrupt();
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Emit the current element from the given Reader. The reader is guaranteed to have data.
-   */
-  private void emitElement(
-      SourceContext<WindowedValue<OutputT>> ctx,
-      BoundedSource.BoundedReader<OutputT> reader) {
-    // make sure that reader state update and element emission are atomic
-    // with respect to snapshots
-    synchronized (ctx.getCheckpointLock()) {
-
-      OutputT item = reader.getCurrent();
-      Instant timestamp = reader.getCurrentTimestamp();
-
-      WindowedValue<OutputT> windowedValue =
-          WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
-      ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
-    }
-  }
-
-  @Override
-  public void close() throws Exception {
-    super.close();
-    if (readers != null) {
-      for (BoundedSource.BoundedReader<OutputT> reader: readers) {
-        reader.close();
-      }
-    }
-  }
-
-  @Override
-  public void cancel() {
-    isRunning = false;
-  }
-
-  @Override
-  public void stop() {
-    this.isRunning = false;
-  }
-
-  /**
-   * Visible so that we can check this in tests. Must not be used for anything else.
-   */
-  @VisibleForTesting
-  public List<? extends BoundedSource<OutputT>> getSplitSources() {
-    return splitSources;
-  }
-}
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java
new file mode 100644
index 0000000..7701602
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.util.OutputTag;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test for bounded source restore in streaming mode.
+ */
+@RunWith(Parameterized.class)
+public class BoundedSourceRestoreTest {
+
+  private final int numTasks;
+  private final int numSplits;
+
+  public BoundedSourceRestoreTest(int numTasks, int numSplits) {
+    this.numTasks = numTasks;
+    this.numSplits = numSplits;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    /* Parameters for initializing the tests: {numTasks, numSplits} */
+    return Arrays.asList(new Object[][]{
+        {1, 1},
+        {1, 2},
+        {1, 4},
+    });
+  }
+
+  @Test
+  public void testRestore() throws Exception {
+    final int numElements = 102;
+    final int firstBatchSize = 23;
+    final int secondBatchSize = numElements - firstBatchSize;
+    final Set<Long> emittedElements = new HashSet<>();
+    final Object checkpointLock = new Object();
+    PipelineOptions options = PipelineOptionsFactory.create();
+
+    // bounded source wrapped as unbounded source
+    BoundedSource<Long> source = CountingSource.upTo(numElements);
+    BoundedToUnboundedSourceAdapter<Long> unboundedSource =
+        new BoundedToUnboundedSourceAdapter<>(source);
+    UnboundedSourceWrapper<Long, Checkpoint<Long>> flinkWrapper = new UnboundedSourceWrapper<>(
+        "stepName", options, unboundedSource, numSplits);
+
+    StreamSource<WindowedValue<ValueWithRecordId<Long>>,
+        UnboundedSourceWrapper<Long, Checkpoint<Long>>> sourceOperator =
+        new StreamSource<>(flinkWrapper);
+
+    AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<Long>>> testHarness =
+        new AbstractStreamOperatorTestHarness<>(sourceOperator,
+            numTasks /* max parallelism */,
+            numTasks /* parallelism */,
+            0 /* subtask index */);
+    testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+    // the first half of elements is read
+    boolean readFirstBatchOfElements = false;
+    try {
+      testHarness.open();
+      sourceOperator.run(checkpointLock,
+          new TestStreamStatusMaintainer(),
+          new PartialCollector<>(emittedElements, firstBatchSize)
+      );
+    } catch (SuccessException e) {
+      // success
+      readFirstBatchOfElements = true;
+    }
+    assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements);
+
+    // draw a snapshot
+    OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+    // finalize checkpoint
+    final ArrayList<Integer> finalizeList = new ArrayList<>();
+    TestCountingSource.setFinalizeTracker(finalizeList);
+    testHarness.notifyOfCompletedCheckpoint(0);
+
+    // create a completely new source but restore from the snapshot
+    BoundedSource<Long> restoredSource = CountingSource.upTo(numElements);
+    BoundedToUnboundedSourceAdapter<Long> restoredUnboundedSource =
+        new BoundedToUnboundedSourceAdapter<>(restoredSource);
+    UnboundedSourceWrapper<Long, Checkpoint<Long>> restoredFlinkWrapper =
+        new UnboundedSourceWrapper<>("stepName", options, restoredUnboundedSource, numSplits);
+    StreamSource<WindowedValue<ValueWithRecordId<Long>>,
+        UnboundedSourceWrapper<Long, Checkpoint<Long>>> restoredSourceOperator =
+        new StreamSource<>(restoredFlinkWrapper);
+
+    // set parallelism to 1 to ensure that our testing operator gets all checkpointed state
+    AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<Long>>>
+        restoredTestHarness =
+        new AbstractStreamOperatorTestHarness<>(
+            restoredSourceOperator,
+            numTasks /* max parallelism */,
+            1 /* parallelism */,
+            0 /* subtask index */);
+
+    restoredTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+    // restore snapshot
+    restoredTestHarness.initializeState(snapshot);
+
+    // run again and verify that we see the other elements
+    boolean readSecondBatchOfElements = false;
+    try {
+      restoredTestHarness.open();
+      restoredSourceOperator.run(checkpointLock,
+          new TestStreamStatusMaintainer(),
+          new PartialCollector<>(emittedElements, secondBatchSize)
+      );
+    } catch (SuccessException e) {
+      // success
+      readSecondBatchOfElements = true;
+    }
+    assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
+
+    // verify that we saw all NUM_ELEMENTS elements
+    assertTrue(emittedElements.size() == numElements);
+  }
+
+  /**
+   * A special {@link RuntimeException} that we throw to signal that the test was successful.
+   */
+  private static class SuccessException extends RuntimeException {
+
+  }
+
+  /**
+   * A collector which consumes only specified number of elements.
+   */
+  private static class PartialCollector<T>
+      implements Output<StreamRecord<WindowedValue<ValueWithRecordId<T>>>> {
+
+    private final Set<T> emittedElements;
+    private final int elementsToConsumeLimit;
+
+    private int count = 0;
+
+    private PartialCollector(Set<T> emittedElements, int elementsToConsumeLimit) {
+      this.emittedElements = emittedElements;
+      this.elementsToConsumeLimit = elementsToConsumeLimit;
+    }
+
+    @Override
+    public void emitWatermark(Watermark watermark) {
+
+    }
+
+    @Override
+    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
+      collect((StreamRecord) streamRecord);
+    }
+
+    @Override
+    public void emitLatencyMarker(LatencyMarker latencyMarker) {
+
+    }
+
+    @Override
+    public void collect(StreamRecord<WindowedValue<ValueWithRecordId<T>>> record) {
+      emittedElements.add(record.getValue().getValue().getValue());
+      count++;
+      if (count >= elementsToConsumeLimit) {
+        throw new SuccessException();
+      }
+    }
+
+    @Override
+    public void close() {
+
+    }
+  }
+
+  private static final class TestStreamStatusMaintainer implements StreamStatusMaintainer {
+
+    StreamStatus currentStreamStatus = StreamStatus.ACTIVE;
+
+    @Override
+    public void toggleStreamStatus(StreamStatus streamStatus) {
+      if (!currentStreamStatus.equals(streamStatus)) {
+        currentStreamStatus = streamStatus;
+      }
+    }
+
+    @Override
+    public StreamStatus getStreamStatus() {
+      return currentStreamStatus;
+    }
+  }
+
+}

-- 
To stop receiving notification emails like this one, please contact
aljoscha@apache.org.