You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/24 11:59:48 UTC
[1/2] flink git commit: [hotfix] [tests] Minor code cleanups in
SavepointITCase
Repository: flink
Updated Branches:
refs/heads/master 6e54f107b -> 9bdd70cad
[hotfix] [tests] Minor code cleanups in SavepointITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9bdd70ca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9bdd70ca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9bdd70ca
Branch: refs/heads/master
Commit: 9bdd70cad9cea82210173afc3bbd7d7aa14ebff5
Parents: e1d1234
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 23 17:24:54 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 24 13:14:13 2018 +0200
----------------------------------------------------------------------
.../test/checkpointing/SavepointITCase.java | 30 ++++----------------
1 file changed, 6 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9bdd70ca/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 9549dc7..2805144 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -314,24 +314,12 @@ public class SavepointITCase extends TestLogger {
env.setParallelism(parallelism);
env.addSource(new InfiniteTestSource())
.shuffle()
- .map(new MapFunction<Integer, Integer>() {
-
- @Override
- public Integer map(Integer value) throws Exception {
- return 4 * value;
- }
- })
+ .map(value -> 4 * value)
.shuffle()
.map(statefulCounter).uid("statefulCounter")
.shuffle()
- .map(new MapFunction<Integer, Integer>() {
-
- @Override
- public Integer map(Integer value) throws Exception {
- return 2 * value;
- }
- })
- .addSink(new DiscardingSink<Integer>());
+ .map(value -> 2 * value)
+ .addSink(new DiscardingSink<>());
JobGraph originalJobGraph = env.getStreamGraph().getJobGraph();
@@ -375,14 +363,8 @@ public class SavepointITCase extends TestLogger {
.shuffle()
.map(new StatefulCounter()).uid("statefulCounter")
.shuffle()
- .map(new MapFunction<Integer, Integer>() {
-
- @Override
- public Integer map(Integer value) throws Exception {
- return value;
- }
- })
- .addSink(new DiscardingSink<Integer>());
+ .map(value -> value)
+ .addSink(new DiscardingSink<>());
JobGraph modifiedJobGraph = env.getStreamGraph().getJobGraph();
@@ -428,7 +410,7 @@ public class SavepointITCase extends TestLogger {
.shuffle()
.map(new StatefulCounter());
- stream.addSink(new DiscardingSink<Integer>());
+ stream.addSink(new DiscardingSink<>());
return env.getStreamGraph().getJobGraph();
}
[2/2] flink git commit: [FLINK-9428] [runtime] Add a
'pre-barrier-emit' checkpoint notification to stream operators.
Posted by se...@apache.org.
[FLINK-9428] [runtime] Add a 'pre-barrier-emit' checkpoint notification to stream operators.
This allows operators with small transient unmanaged state (for example pre-aggregates) to
flush the state prior to the checkpoint barrier.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1d12344
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1d12344
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1d12344
Branch: refs/heads/master
Commit: e1d1234477c731fe3f398c7f3f12123f73764242
Parents: 6e54f10
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 23 17:24:40 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 24 13:14:13 2018 +0200
----------------------------------------------------------------------
.../api/operators/AbstractStreamOperator.java | 8 +-
.../streaming/api/operators/StreamOperator.java | 20 +++
.../streaming/runtime/tasks/OperatorChain.java | 40 ++++-
.../streaming/runtime/tasks/StreamTask.java | 16 +-
.../AbstractUdfStreamOperatorLifecycleTest.java | 32 ++--
.../runtime/tasks/OperatorChainTest.java | 148 +++++++++++++++++++
6 files changed, 245 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d12344/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index e447cbe..9a6e8db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -347,6 +347,12 @@ public abstract class AbstractStreamOperator<OUT>
}
@Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ // the default implementation does nothing and accepts the checkpoint
+ // this is purely for subclasses to override
+ }
+
+ @Override
public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory) throws Exception {
@@ -654,7 +660,7 @@ public abstract class AbstractStreamOperator<OUT>
/**
* Wrapping {@link Output} that updates metrics on the number of emitted elements.
*/
- public class CountingOutput implements Output<StreamRecord<OUT>> {
+ public static class CountingOutput<OUT> implements Output<StreamRecord<OUT>> {
private final Output<StreamRecord<OUT>> output;
private final Counter numRecordsOut;
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d12344/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 71f508b..c6c77ba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -94,6 +94,26 @@ public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Dis
// ------------------------------------------------------------------------
/**
+ * This method is called when the operator should do a snapshot, before it emits its
+ * own checkpoint barrier.
+ *
+ * <p>This method is intended not for any actual state persistence, but only for emitting some
+ * data before emitting the checkpoint barrier. Operators that maintain some small transient state
+ * that is inefficient to checkpoint (especially when it would need to be checkpointed in a
+ * re-scalable way) but can simply be sent downstream before the checkpoint. An example are
+ * opportunistic pre-aggregation operators, which have small the pre-aggregation state that is
+ * frequently flushed downstream.
+ *
+ * <p><b>Important:</b> This method should not be used for any actual state snapshot logic, because
+ * it will inherently be within the synchronous part of the operator's checkpoint. If heavy work is done
+ * within this method, it will affect latency and downstream checkpoint alignments.
+ *
+ * @param checkpointId The ID of the checkpoint.
+ * @throws Exception Throwing an exception here causes the operator to fail and go into recovery.
+ */
+ void prepareSnapshotPreBarrier(long checkpointId) throws Exception;
+
+ /**
* Called to draw a state snapshot from the operator.
*
* @return a runnable future to the state handle that points to the snapshotted state. For synchronous implementations,
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d12344/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 1db4346..8d1cbde 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
@@ -54,6 +55,8 @@ import org.apache.flink.util.XORShiftRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -61,6 +64,8 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* The {@code OperatorChain} contains all operators that are executed as one chain within a single
* {@link StreamTask}.
@@ -158,7 +163,19 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
}
}
+ }
+ @VisibleForTesting
+ OperatorChain(
+ StreamOperator<?>[] allOperators,
+ RecordWriterOutput<?>[] streamOutputs,
+ WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint,
+ OP headOperator) {
+
+ this.allOperators = checkNotNull(allOperators);
+ this.streamOutputs = checkNotNull(streamOutputs);
+ this.chainEntryPoint = checkNotNull(chainEntryPoint);
+ this.headOperator = checkNotNull(headOperator);
}
@Override
@@ -192,6 +209,18 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
}
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ // go forward through the operator chain and tell each operator
+ // to prepare the checkpoint
+ final StreamOperator<?>[] operators = this.allOperators;
+ for (int i = operators.length - 1; i >= 0; --i) {
+ final StreamOperator<?> op = operators[i];
+ if (op != null) {
+ op.prepareSnapshotPreBarrier(checkpointId);
+ }
+ }
+ }
+
public RecordWriterOutput<?>[] getStreamOutputs() {
return streamOutputs;
}
@@ -392,7 +421,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
Gauge<Long> getWatermarkGauge();
}
- private static class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
+ static class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
protected final OneInputStreamOperator<T, ?> operator;
protected final Counter numRecordsIn;
@@ -400,12 +429,13 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
protected final StreamStatusProvider streamStatusProvider;
+ @Nullable
protected final OutputTag<T> outputTag;
public ChainingOutput(
OneInputStreamOperator<T, ?> operator,
StreamStatusProvider streamStatusProvider,
- OutputTag<T> outputTag) {
+ @Nullable OutputTag<T> outputTag) {
this.operator = operator;
{
@@ -502,7 +532,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
}
- private static final class CopyingChainingOutput<T> extends ChainingOutput<T> {
+ static final class CopyingChainingOutput<T> extends ChainingOutput<T> {
private final TypeSerializer<T> serializer;
@@ -570,7 +600,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
}
}
- private static class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
+ static class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
protected final Output<StreamRecord<T>>[] outputs;
@@ -640,7 +670,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
* Special version of {@link BroadcastingOutputCollector} that performs a shallow copy of the
* {@link StreamRecord} to ensure that multi-chaining works correctly.
*/
- private static final class CopyingBroadcastingOutputCollector<T> extends BroadcastingOutputCollector<T> {
+ static final class CopyingBroadcastingOutputCollector<T> extends BroadcastingOutputCollector<T> {
public CopyingBroadcastingOutputCollector(
Output<StreamRecord<T>>[] outputs,
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d12344/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d6e088d..629dcd9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -611,15 +611,23 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
if (isRunning) {
// we can do a checkpoint
- // Since both state checkpointing and downstream barrier emission occurs in this
- // lock scope, they are an atomic operation regardless of the order in which they occur.
- // Given this, we immediately emit the checkpoint barriers, so the downstream operators
- // can start their checkpoint work as soon as possible
+ // All of the following steps happen as an atomic step from the perspective of barriers and
+ // records/watermarks/timers/callbacks.
+ // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
+ // checkpoint alignments
+
+ // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
+ // The pre-barrier work should be nothing or minimal in the common case.
+ operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
+
+ // Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastCheckpointBarrier(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions);
+ // Step (3): Take the state snapshot. This should be largely asynchronous, to not
+ // impact progress of the streaming topology
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d12344/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index f82e403..4fcc813 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -64,6 +64,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
"UDF::open",
"OPERATOR::run",
"UDF::run",
+ "OPERATOR::prepareSnapshotPreBarrier",
"OPERATOR::snapshotState",
"OPERATOR::close",
"UDF::close",
@@ -82,15 +83,22 @@ public class AbstractUdfStreamOperatorLifecycleTest {
"OPERATOR::dispose",
"UDF::close");
- private static final String ALL_METHODS_STREAM_OPERATOR = "[close[], dispose[], getChainingStrategy[], " +
- "getCurrentKey[], getMetricGroup[], getOperatorID[], initializeState[], " +
- "notifyCheckpointComplete[long], open[], setChainingStrategy[class " +
- "org.apache.flink.streaming.api.operators.ChainingStrategy], setCurrentKey[class java.lang.Object], " +
+ private static final String ALL_METHODS_STREAM_OPERATOR = "[" +
+ "close[], " +
+ "dispose[], " +
+ "getChainingStrategy[], " +
+ "getCurrentKey[], " +
+ "getMetricGroup[], " +
+ "getOperatorID[], " +
+ "initializeState[], " +
+ "notifyCheckpointComplete[long], " +
+ "open[], " +
+ "prepareSnapshotPreBarrier[long], " +
+ "setChainingStrategy[class org.apache.flink.streaming.api.operators.ChainingStrategy], " +
+ "setCurrentKey[class java.lang.Object], " +
"setKeyContextElement1[class org.apache.flink.streaming.runtime.streamrecord.StreamRecord], " +
"setKeyContextElement2[class org.apache.flink.streaming.runtime.streamrecord.StreamRecord], " +
- "setup[class org.apache.flink.streaming.runtime.tasks.StreamTask, class " +
- "org.apache.flink.streaming.api.graph.StreamConfig, interface " +
- "org.apache.flink.streaming.api.operators.Output], " +
+ "setup[class org.apache.flink.streaming.runtime.tasks.StreamTask, class org.apache.flink.streaming.api.graph.StreamConfig, interface org.apache.flink.streaming.api.operators.Output], " +
"snapshotState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions, interface org.apache.flink.runtime.state.CheckpointStreamFactory]]";
private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[], getRuntimeContext[]" +
@@ -129,7 +137,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
StreamConfig cfg = new StreamConfig(new Configuration());
MockSourceFunction srcFun = new MockSourceFunction();
- cfg.setStreamOperator(new LifecycleTrackingStreamSource(srcFun, true));
+ cfg.setStreamOperator(new LifecycleTrackingStreamSource<>(srcFun, true));
cfg.setOperatorID(new OperatorID());
cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
@@ -152,7 +160,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
Configuration taskManagerConfig = new Configuration();
StreamConfig cfg = new StreamConfig(new Configuration());
MockSourceFunction srcFun = new MockSourceFunction();
- cfg.setStreamOperator(new LifecycleTrackingStreamSource(srcFun, false));
+ cfg.setStreamOperator(new LifecycleTrackingStreamSource<>(srcFun, false));
cfg.setOperatorID(new OperatorID());
cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
@@ -269,6 +277,12 @@ public class AbstractUdfStreamOperatorLifecycleTest {
}
@Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ ACTUAL_ORDER_TRACKING.add("OPERATOR::prepareSnapshotPreBarrier");
+ super.prepareSnapshotPreBarrier(checkpointId);
+ }
+
+ @Override
public void open() throws Exception {
ACTUAL_ORDER_TRACKING.add("OPERATOR::open");
super.open();
http://git-wip-us.apache.org/repos/asf/flink/blob/e1d12344/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
new file mode 100644
index 0000000..be51aa6
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
+import org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingOutput;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This class test the {@link OperatorChain}.
+ *
+ * <p>It takes a different (simpler) approach at testing the operator chain than
+ * {@link StreamOperatorChainingTest}.
+ */
+public class OperatorChainTest {
+
+ @Test
+ public void testPrepareCheckpointPreBarrier() throws Exception {
+ final AtomicInteger intRef = new AtomicInteger();
+
+ final OneInputStreamOperator<String, String> one = new ValidatingOperator(intRef, 0);
+ final OneInputStreamOperator<String, String> two = new ValidatingOperator(intRef, 1);
+ final OneInputStreamOperator<String, String> three = new ValidatingOperator(intRef, 2);
+
+ final OperatorChain<?, ?> chain = setupOperatorChain(one, two, three);
+ chain.prepareSnapshotPreBarrier(ValidatingOperator.CHECKPOINT_ID);
+
+ assertEquals(3, intRef.get());
+ }
+
+ // ------------------------------------------------------------------------
+ // Operator Chain Setup Utils
+ // ------------------------------------------------------------------------
+
+ @SafeVarargs
+ private static <T, OP extends StreamOperator<T>> OperatorChain<T, OP> setupOperatorChain(
+ OneInputStreamOperator<T, T>... operators) {
+
+ checkNotNull(operators);
+ checkArgument(operators.length > 0);
+
+ try (MockEnvironment env = MockEnvironment.builder().build()) {
+
+ final StreamTask<?, ?> containingTask = new OneInputStreamTask<T, OneInputStreamOperator<T, T>>(env);
+
+ final StreamStatusProvider statusProvider = mock(StreamStatusProvider.class);
+ final StreamConfig cfg = new StreamConfig(new Configuration());
+
+ final StreamOperator<?>[] ops = new StreamOperator<?>[operators.length];
+
+ // initial output goes to nowhere
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ WatermarkGaugeExposingOutput<StreamRecord<T>> lastWriter = new BroadcastingOutputCollector<>(
+ new Output[0], statusProvider);
+
+ // build the reverse operators array
+ for (int i = 0; i < ops.length; i++) {
+ OneInputStreamOperator<T, T> op = operators[ops.length - i - 1];
+ op.setup(containingTask, cfg, lastWriter);
+ lastWriter = new ChainingOutput<>(op, statusProvider, null);
+ ops[i] = op;
+ }
+
+ @SuppressWarnings("unchecked")
+ final OP head = (OP) operators[0];
+
+ return new OperatorChain<>(
+ ops,
+ new RecordWriterOutput<?>[0],
+ lastWriter,
+ head);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Test Operator Implementations
+ // ------------------------------------------------------------------------
+
+ private static class ValidatingOperator
+ extends AbstractStreamOperator<String>
+ implements OneInputStreamOperator<String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ static final long CHECKPOINT_ID = 5765167L;
+
+ final AtomicInteger toUpdate;
+ final int expected;
+
+ public ValidatingOperator(AtomicInteger toUpdate, int expected) {
+ this.toUpdate = toUpdate;
+ this.expected = expected;
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ assertEquals("wrong checkpointId", CHECKPOINT_ID, checkpointId);
+ assertEquals("wrong order", expected, toUpdate.getAndIncrement());
+ }
+
+ @Override
+ public void processElement(StreamRecord<String> element) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OperatorID getOperatorID() {
+ return new OperatorID();
+ }
+ }
+}