You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/03/27 04:50:28 UTC
[beam] 02/03: [BEAM-2393] Make BoundedSource fault-tolerant
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 0e44feb120a73c36116b2bd6b89e4f1676d7266f
Author: Grzegorz KoĊakowski <gr...@getindata.com>
AuthorDate: Wed Feb 21 11:11:53 2018 +0100
[BEAM-2393] Make BoundedSource fault-tolerant
---
.../UnboundedReadFromBoundedSource.java | 5 +-
.../flink/FlinkStreamingTransformTranslators.java | 138 ++++++++++-
.../streaming/io/BoundedSourceWrapper.java | 259 ---------------------
.../flink/streaming/BoundedSourceRestoreTest.java | 236 +++++++++++++++++++
4 files changed, 367 insertions(+), 271 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 09acc82..88119d1 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -169,8 +169,11 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
}
+ /**
+ * A marker representing the progress and state of an {@link BoundedToUnboundedSourceAdapter}.
+ */
@VisibleForTesting
- static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
+ public static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
private final @Nullable List<TimestampedValue<T>> residualElements;
private final @Nullable BoundedSource<T> residualSource;
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 970ece1..74ca168 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -42,6 +42,7 @@ import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
@@ -51,7 +52,6 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKey
import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.coders.Coder;
@@ -87,18 +87,27 @@ import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
@@ -276,8 +285,7 @@ class FlinkStreamingTransformTranslators {
PCollection<T> output = context.getOutput(transform);
TypeInformation<WindowedValue<T>> outputTypeInfo =
- context.getTypeInfo(context.getOutput(transform));
-
+ context.getTypeInfo(context.getOutput(transform));
BoundedSource<T> rawSource;
try {
@@ -289,24 +297,26 @@ class FlinkStreamingTransformTranslators {
}
String fullName = getCurrentTransformName(context);
+ UnboundedSource<T, ?> adaptedRawSource = new BoundedToUnboundedSourceAdapter<>(rawSource);
DataStream<WindowedValue<T>> source;
try {
- BoundedSourceWrapper<T> sourceWrapper =
- new BoundedSourceWrapper<>(
+ UnboundedSourceWrapperNoValueWithRecordId<T, ?> sourceWrapper =
+ new UnboundedSourceWrapperNoValueWithRecordId<>(
+ new UnboundedSourceWrapper<>(
fullName,
context.getPipelineOptions(),
- rawSource,
- context.getExecutionEnvironment().getParallelism());
+ adaptedRawSource,
+ context.getExecutionEnvironment().getParallelism())
+ );
source = context
.getExecutionEnvironment()
.addSource(sourceWrapper)
- .name(fullName).uid(fullName)
+ .name(fullName)
+ .uid(fullName)
.returns(outputTypeInfo);
} catch (Exception e) {
- throw new RuntimeException(
- "Error while translating BoundedSource: " + rawSource, e);
+ throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e);
}
-
context.setOutputDataStream(output, source);
}
}
@@ -1259,4 +1269,110 @@ class FlinkStreamingTransformTranslators {
return CreateStreamingFlinkView.CREATE_STREAMING_FLINK_VIEW_URN;
}
}
+
+ /**
+ * Wrapper for {@link UnboundedSourceWrapper}, which simplifies output type, namely, removes
+ * {@link ValueWithRecordId}.
+ */
+ private static class UnboundedSourceWrapperNoValueWithRecordId<
+ OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
+ extends RichParallelSourceFunction<WindowedValue<OutputT>>
+ implements ProcessingTimeCallback, StoppableFunction,
+ CheckpointListener, CheckpointedFunction {
+
+ private final UnboundedSourceWrapper<OutputT, CheckpointMarkT> unboundedSourceWrapper;
+
+ private UnboundedSourceWrapperNoValueWithRecordId(
+ UnboundedSourceWrapper<OutputT, CheckpointMarkT> unboundedSourceWrapper) {
+ this.unboundedSourceWrapper = unboundedSourceWrapper;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ unboundedSourceWrapper.setRuntimeContext(getRuntimeContext());
+ unboundedSourceWrapper.open(parameters);
+ }
+
+ @Override
+ public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
+ unboundedSourceWrapper.run(new SourceContextWrapper(ctx));
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ unboundedSourceWrapper.initializeState(context);
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ unboundedSourceWrapper.snapshotState(context);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ unboundedSourceWrapper.notifyCheckpointComplete(checkpointId);
+ }
+
+ @Override
+ public void stop() {
+ unboundedSourceWrapper.stop();
+ }
+
+ @Override
+ public void cancel() {
+ unboundedSourceWrapper.cancel();
+ }
+
+ @Override
+ public void onProcessingTime(long timestamp) throws Exception {
+ unboundedSourceWrapper.onProcessingTime(timestamp);
+ }
+
+ private final class SourceContextWrapper implements
+ SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> {
+
+ private final SourceContext<WindowedValue<OutputT>> ctx;
+
+ private SourceContextWrapper(SourceContext<WindowedValue<OutputT>> ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void collect(WindowedValue<ValueWithRecordId<OutputT>> element) {
+ OutputT originalValue = element.getValue().getValue();
+ WindowedValue<OutputT> output = WindowedValue
+ .of(originalValue, element.getTimestamp(), element.getWindows(), element.getPane());
+ ctx.collect(output);
+ }
+
+ @Override
+ public void collectWithTimestamp(WindowedValue<ValueWithRecordId<OutputT>> element,
+ long timestamp) {
+ OutputT originalValue = element.getValue().getValue();
+ WindowedValue<OutputT> output = WindowedValue
+ .of(originalValue, element.getTimestamp(), element.getWindows(), element.getPane());
+ ctx.collectWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ ctx.emitWatermark(mark);
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ ctx.markAsTemporarilyIdle();
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return ctx.getCheckpointLock();
+ }
+
+ @Override
+ public void close() {
+ ctx.close();
+ }
+ }
+ }
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
deleted file mode 100644
index 6db5426..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
-import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
-import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.flink.api.common.functions.StoppableFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Wrapper for executing {@link BoundedSource BoundedSources} as a Flink Source.
- */
-public class BoundedSourceWrapper<OutputT>
- extends RichParallelSourceFunction<WindowedValue<OutputT>>
- implements StoppableFunction {
-
- private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class);
-
- private String stepName;
- /**
- * Keep the options so that we can initialize the readers.
- */
- private final SerializablePipelineOptions serializedOptions;
-
- /**
- * The split sources. We split them in the constructor to ensure that all parallel
- * sources are consistent about the split sources.
- */
- private List<? extends BoundedSource<OutputT>> splitSources;
-
- /**
- * Make it a field so that we can access it in {@link #close()}.
- */
- private transient List<BoundedSource.BoundedReader<OutputT>> readers;
-
- /**
- * Initialize here and not in run() to prevent races where we cancel a job before run() is
- * ever called or run() is called after cancel().
- */
- private volatile boolean isRunning = true;
-
- @SuppressWarnings("unchecked")
- public BoundedSourceWrapper(
- String stepName,
- PipelineOptions pipelineOptions,
- BoundedSource<OutputT> source,
- int parallelism) throws Exception {
- this.stepName = stepName;
- this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
-
- long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism;
-
- // get the splits early. we assume that the generated splits are stable,
- // this is necessary so that the mapping of state to source is correct
- // when restoring
- splitSources = source.split(desiredBundleSize, pipelineOptions);
- }
-
- @Override
- public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
-
- // figure out which split sources we're responsible for
- int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
-
- List<BoundedSource<OutputT>> localSources = new ArrayList<>();
-
- for (int i = 0; i < splitSources.size(); i++) {
- if (i % numSubtasks == subtaskIndex) {
- localSources.add(splitSources.get(i));
- }
- }
-
- LOG.info("Bounded Flink Source {}/{} is reading from sources: {}",
- subtaskIndex,
- numSubtasks,
- localSources);
-
- FlinkMetricContainer metricContainer = new FlinkMetricContainer(getRuntimeContext());
-
- ReaderInvocationUtil<OutputT, BoundedSource.BoundedReader<OutputT>> readerInvoker =
- new ReaderInvocationUtil<>(
- stepName,
- serializedOptions.get(),
- metricContainer);
-
- readers = new ArrayList<>();
- // initialize readers from scratch
- for (BoundedSource<OutputT> source : localSources) {
- readers.add(source.createReader(serializedOptions.get()));
- }
-
- if (readers.size() == 1) {
- // the easy case, we just read from one reader
- BoundedSource.BoundedReader<OutputT> reader = readers.get(0);
-
- boolean dataAvailable = readerInvoker.invokeStart(reader);
- if (dataAvailable) {
- emitElement(ctx, reader);
- }
-
- while (isRunning) {
- dataAvailable = readerInvoker.invokeAdvance(reader);
-
- if (dataAvailable) {
- emitElement(ctx, reader);
- } else {
- break;
- }
- }
- } else {
- // a bit more complicated, we are responsible for several readers
- // loop through them and sleep if none of them had any data
-
- int currentReader = 0;
-
- // start each reader and emit data if immediately available
- for (BoundedSource.BoundedReader<OutputT> reader : readers) {
- boolean dataAvailable = readerInvoker.invokeStart(reader);
- if (dataAvailable) {
- emitElement(ctx, reader);
- }
- }
-
- // a flag telling us whether any of the readers had data
- // if no reader had data, sleep for bit
- boolean hadData = false;
- while (isRunning && !readers.isEmpty()) {
- BoundedSource.BoundedReader<OutputT> reader = readers.get(currentReader);
- boolean dataAvailable = readerInvoker.invokeAdvance(reader);
-
- if (dataAvailable) {
- emitElement(ctx, reader);
- hadData = true;
- } else {
- readers.remove(currentReader);
- currentReader--;
- if (readers.isEmpty()) {
- break;
- }
- }
-
- currentReader = (currentReader + 1) % readers.size();
- if (currentReader == 0 && !hadData) {
- Thread.sleep(50);
- } else if (currentReader == 0) {
- hadData = false;
- }
- }
-
- }
-
- // emit final Long.MAX_VALUE watermark, just to be sure
- ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
-
- FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class);
- if (!options.isShutdownSourcesOnFinalWatermark()) {
- // do nothing, but still look busy ...
- // we can't return here since Flink requires that all operators stay up,
- // otherwise checkpointing would not work correctly anymore
- //
- // See https://issues.apache.org/jira/browse/FLINK-2491 for progress on this issue
-
- // wait until this is canceled
- final Object waitLock = new Object();
- while (isRunning) {
- try {
- // Flink will interrupt us at some point
- //noinspection SynchronizationOnLocalVariableOrMethodParameter
- synchronized (waitLock) {
- // don't wait indefinitely, in case something goes horribly wrong
- waitLock.wait(1000);
- }
- } catch (InterruptedException e) {
- if (!isRunning) {
- // restore the interrupted state, and fall through the loop
- Thread.currentThread().interrupt();
- }
- }
- }
- }
- }
-
- /**
- * Emit the current element from the given Reader. The reader is guaranteed to have data.
- */
- private void emitElement(
- SourceContext<WindowedValue<OutputT>> ctx,
- BoundedSource.BoundedReader<OutputT> reader) {
- // make sure that reader state update and element emission are atomic
- // with respect to snapshots
- synchronized (ctx.getCheckpointLock()) {
-
- OutputT item = reader.getCurrent();
- Instant timestamp = reader.getCurrentTimestamp();
-
- WindowedValue<OutputT> windowedValue =
- WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
- ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
- }
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (readers != null) {
- for (BoundedSource.BoundedReader<OutputT> reader: readers) {
- reader.close();
- }
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
- @Override
- public void stop() {
- this.isRunning = false;
- }
-
- /**
- * Visible so that we can check this in tests. Must not be used for anything else.
- */
- @VisibleForTesting
- public List<? extends BoundedSource<OutputT>> getSplitSources() {
- return splitSources;
- }
-}
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java
new file mode 100644
index 0000000..7701602
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.util.OutputTag;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test for bounded source restore in streaming mode.
+ */
+@RunWith(Parameterized.class)
+public class BoundedSourceRestoreTest {
+
+ private final int numTasks;
+ private final int numSplits;
+
+ public BoundedSourceRestoreTest(int numTasks, int numSplits) {
+ this.numTasks = numTasks;
+ this.numSplits = numSplits;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ /* Parameters for initializing the tests: {numTasks, numSplits} */
+ return Arrays.asList(new Object[][]{
+ {1, 1},
+ {1, 2},
+ {1, 4},
+ });
+ }
+
+ @Test
+ public void testRestore() throws Exception {
+ final int numElements = 102;
+ final int firstBatchSize = 23;
+ final int secondBatchSize = numElements - firstBatchSize;
+ final Set<Long> emittedElements = new HashSet<>();
+ final Object checkpointLock = new Object();
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ // bounded source wrapped as unbounded source
+ BoundedSource<Long> source = CountingSource.upTo(numElements);
+ BoundedToUnboundedSourceAdapter<Long> unboundedSource =
+ new BoundedToUnboundedSourceAdapter<>(source);
+ UnboundedSourceWrapper<Long, Checkpoint<Long>> flinkWrapper = new UnboundedSourceWrapper<>(
+ "stepName", options, unboundedSource, numSplits);
+
+ StreamSource<WindowedValue<ValueWithRecordId<Long>>,
+ UnboundedSourceWrapper<Long, Checkpoint<Long>>> sourceOperator =
+ new StreamSource<>(flinkWrapper);
+
+ AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<Long>>> testHarness =
+ new AbstractStreamOperatorTestHarness<>(sourceOperator,
+ numTasks /* max parallelism */,
+ numTasks /* parallelism */,
+ 0 /* subtask index */);
+ testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ // the first half of elements is read
+ boolean readFirstBatchOfElements = false;
+ try {
+ testHarness.open();
+ sourceOperator.run(checkpointLock,
+ new TestStreamStatusMaintainer(),
+ new PartialCollector<>(emittedElements, firstBatchSize)
+ );
+ } catch (SuccessException e) {
+ // success
+ readFirstBatchOfElements = true;
+ }
+ assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements);
+
+ // draw a snapshot
+ OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+ // finalize checkpoint
+ final ArrayList<Integer> finalizeList = new ArrayList<>();
+ TestCountingSource.setFinalizeTracker(finalizeList);
+ testHarness.notifyOfCompletedCheckpoint(0);
+
+ // create a completely new source but restore from the snapshot
+ BoundedSource<Long> restoredSource = CountingSource.upTo(numElements);
+ BoundedToUnboundedSourceAdapter<Long> restoredUnboundedSource =
+ new BoundedToUnboundedSourceAdapter<>(restoredSource);
+ UnboundedSourceWrapper<Long, Checkpoint<Long>> restoredFlinkWrapper =
+ new UnboundedSourceWrapper<>("stepName", options, restoredUnboundedSource, numSplits);
+ StreamSource<WindowedValue<ValueWithRecordId<Long>>,
+ UnboundedSourceWrapper<Long, Checkpoint<Long>>> restoredSourceOperator =
+ new StreamSource<>(restoredFlinkWrapper);
+
+ // set parallelism to 1 to ensure that our testing operator gets all checkpointed state
+ AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<Long>>>
+ restoredTestHarness =
+ new AbstractStreamOperatorTestHarness<>(
+ restoredSourceOperator,
+ numTasks /* max parallelism */,
+ 1 /* parallelism */,
+ 0 /* subtask index */);
+
+ restoredTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ // restore snapshot
+ restoredTestHarness.initializeState(snapshot);
+
+ // run again and verify that we see the other elements
+ boolean readSecondBatchOfElements = false;
+ try {
+ restoredTestHarness.open();
+ restoredSourceOperator.run(checkpointLock,
+ new TestStreamStatusMaintainer(),
+ new PartialCollector<>(emittedElements, secondBatchSize)
+ );
+ } catch (SuccessException e) {
+ // success
+ readSecondBatchOfElements = true;
+ }
+ assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
+
+ // verify that we saw all NUM_ELEMENTS elements
+ assertTrue(emittedElements.size() == numElements);
+ }
+
+ /**
+ * A special {@link RuntimeException} that we throw to signal that the test was successful.
+ */
+ private static class SuccessException extends RuntimeException {
+
+ }
+
+ /**
+ * A collector which consumes only specified number of elements.
+ */
+ private static class PartialCollector<T>
+ implements Output<StreamRecord<WindowedValue<ValueWithRecordId<T>>>> {
+
+ private final Set<T> emittedElements;
+ private final int elementsToConsumeLimit;
+
+ private int count = 0;
+
+ private PartialCollector(Set<T> emittedElements, int elementsToConsumeLimit) {
+ this.emittedElements = emittedElements;
+ this.elementsToConsumeLimit = elementsToConsumeLimit;
+ }
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+
+ }
+
+ @Override
+ public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
+ collect((StreamRecord) streamRecord);
+ }
+
+ @Override
+ public void emitLatencyMarker(LatencyMarker latencyMarker) {
+
+ }
+
+ @Override
+ public void collect(StreamRecord<WindowedValue<ValueWithRecordId<T>>> record) {
+ emittedElements.add(record.getValue().getValue().getValue());
+ count++;
+ if (count >= elementsToConsumeLimit) {
+ throw new SuccessException();
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+
+ private static final class TestStreamStatusMaintainer implements StreamStatusMaintainer {
+
+ StreamStatus currentStreamStatus = StreamStatus.ACTIVE;
+
+ @Override
+ public void toggleStreamStatus(StreamStatus streamStatus) {
+ if (!currentStreamStatus.equals(streamStatus)) {
+ currentStreamStatus = streamStatus;
+ }
+ }
+
+ @Override
+ public StreamStatus getStreamStatus() {
+ return currentStreamStatus;
+ }
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
aljoscha@apache.org.