You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/24 11:59:48 UTC

[1/2] flink git commit: [hotfix] [tests] Minor code cleanups in SavepointITCase

Repository: flink
Updated Branches:
  refs/heads/master 6e54f107b -> 9bdd70cad


[hotfix] [tests] Minor code cleanups in SavepointITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9bdd70ca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9bdd70ca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9bdd70ca

Branch: refs/heads/master
Commit: 9bdd70cad9cea82210173afc3bbd7d7aa14ebff5
Parents: e1d1234
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 23 17:24:54 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 24 13:14:13 2018 +0200

----------------------------------------------------------------------
 .../test/checkpointing/SavepointITCase.java     | 30 ++++----------------
 1 file changed, 6 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9bdd70ca/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 9549dc7..2805144 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -314,24 +314,12 @@ public class SavepointITCase extends TestLogger {
 			env.setParallelism(parallelism);
 			env.addSource(new InfiniteTestSource())
 					.shuffle()
-					.map(new MapFunction<Integer, Integer>() {
-
-						@Override
-						public Integer map(Integer value) throws Exception {
-							return 4 * value;
-						}
-					})
+					.map(value -> 4 * value)
 					.shuffle()
 					.map(statefulCounter).uid("statefulCounter")
 					.shuffle()
-					.map(new MapFunction<Integer, Integer>() {
-
-						@Override
-						public Integer map(Integer value) throws Exception {
-							return 2 * value;
-						}
-					})
-					.addSink(new DiscardingSink<Integer>());
+					.map(value -> 2 * value)
+					.addSink(new DiscardingSink<>());
 
 			JobGraph originalJobGraph = env.getStreamGraph().getJobGraph();
 
@@ -375,14 +363,8 @@ public class SavepointITCase extends TestLogger {
 					.shuffle()
 					.map(new StatefulCounter()).uid("statefulCounter")
 					.shuffle()
-					.map(new MapFunction<Integer, Integer>() {
-
-						@Override
-						public Integer map(Integer value) throws Exception {
-							return value;
-						}
-					})
-					.addSink(new DiscardingSink<Integer>());
+					.map(value -> value)
+					.addSink(new DiscardingSink<>());
 
 			JobGraph modifiedJobGraph = env.getStreamGraph().getJobGraph();
 
@@ -428,7 +410,7 @@ public class SavepointITCase extends TestLogger {
 			.shuffle()
 			.map(new StatefulCounter());
 
-		stream.addSink(new DiscardingSink<Integer>());
+		stream.addSink(new DiscardingSink<>());
 
 		return env.getStreamGraph().getJobGraph();
 	}


[2/2] flink git commit: [FLINK-9428] [runtime] Add a 'pre-barrier-emit' checkpoint notification to stream operators.

Posted by se...@apache.org.
[FLINK-9428] [runtime] Add a 'pre-barrier-emit' checkpoint notification to stream operators.

This allows operators with small transient unmanaged state (for example pre-aggregates) to
flush the state prior to the checkpoint barrier.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1d12344
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1d12344
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1d12344

Branch: refs/heads/master
Commit: e1d1234477c731fe3f398c7f3f12123f73764242
Parents: 6e54f10
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 23 17:24:40 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 24 13:14:13 2018 +0200

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   |   8 +-
 .../streaming/api/operators/StreamOperator.java |  20 +++
 .../streaming/runtime/tasks/OperatorChain.java  |  40 ++++-
 .../streaming/runtime/tasks/StreamTask.java     |  16 +-
 .../AbstractUdfStreamOperatorLifecycleTest.java |  32 ++--
 .../runtime/tasks/OperatorChainTest.java        | 148 +++++++++++++++++++
 6 files changed, 245 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1d12344/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index e447cbe..9a6e8db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -347,6 +347,12 @@ public abstract class AbstractStreamOperator<OUT>
 	}
 
 	@Override
+	public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+		// the default implementation does nothing and accepts the checkpoint
+		// this is purely for subclasses to override
+	}
+
+	@Override
 	public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
 			CheckpointStreamFactory factory) throws Exception {
 
@@ -654,7 +660,7 @@ public abstract class AbstractStreamOperator<OUT>
 	/**
 	 * Wrapping {@link Output} that updates metrics on the number of emitted elements.
 	 */
-	public class CountingOutput implements Output<StreamRecord<OUT>> {
+	public static class CountingOutput<OUT> implements Output<StreamRecord<OUT>> {
 		private final Output<StreamRecord<OUT>> output;
 		private final Counter numRecordsOut;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d12344/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 71f508b..c6c77ba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -94,6 +94,26 @@ public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Dis
 	// ------------------------------------------------------------------------
 
 	/**
+	 * This method is called when the operator should do a snapshot, before it emits its
+	 * own checkpoint barrier.
+	 *
+	 * <p>This method is intended not for any actual state persistence, but only for emitting some
+	 * data before emitting the checkpoint barrier. Operators that maintain some small transient state
+	 * that is inefficient to checkpoint (especially when it would need to be checkpointed in a
+	 * re-scalable way) but can simply be sent downstream before the checkpoint. An example are
+	 * opportunistic pre-aggregation operators, which have small the pre-aggregation state that is
+	 * frequently flushed downstream.
+	 *
+	 * <p><b>Important:</b> This method should not be used for any actual state snapshot logic, because
+	 * it will inherently be within the synchronous part of the operator's checkpoint. If heavy work is done
+	 * within this method, it will affect latency and downstream checkpoint alignments.
+	 *
+	 * @param checkpointId The ID of the checkpoint.
+	 * @throws Exception Throwing an exception here causes the operator to fail and go into recovery.
+	 */
+	void prepareSnapshotPreBarrier(long checkpointId) throws Exception;
+
+	/**
 	 * Called to draw a state snapshot from the operator.
 	 *
 	 * @return a runnable future to the state handle that points to the snapshotted state. For synchronous implementations,

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d12344/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 1db4346..8d1cbde 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
@@ -54,6 +55,8 @@ import org.apache.flink.util.XORShiftRandom;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -61,6 +64,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The {@code OperatorChain} contains all operators that are executed as one chain within a single
  * {@link StreamTask}.
@@ -158,7 +163,19 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 				}
 			}
 		}
+	}
 
+	@VisibleForTesting
+	OperatorChain(
+			StreamOperator<?>[] allOperators,
+			RecordWriterOutput<?>[] streamOutputs,
+			WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint,
+			OP headOperator) {
+
+		this.allOperators = checkNotNull(allOperators);
+		this.streamOutputs = checkNotNull(streamOutputs);
+		this.chainEntryPoint = checkNotNull(chainEntryPoint);
+		this.headOperator = checkNotNull(headOperator);
 	}
 
 	@Override
@@ -192,6 +209,18 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		}
 	}
 
+	public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+		// go forward through the operator chain and tell each operator
+		// to prepare the checkpoint
+		final StreamOperator<?>[] operators = this.allOperators;
+		for (int i = operators.length - 1; i >= 0; --i) {
+			final StreamOperator<?> op = operators[i];
+			if (op != null) {
+				op.prepareSnapshotPreBarrier(checkpointId);
+			}
+		}
+	}
+
 	public RecordWriterOutput<?>[] getStreamOutputs() {
 		return streamOutputs;
 	}
@@ -392,7 +421,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		Gauge<Long> getWatermarkGauge();
 	}
 
-	private static class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
+	static class ChainingOutput<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
 		protected final OneInputStreamOperator<T, ?> operator;
 		protected final Counter numRecordsIn;
@@ -400,12 +429,13 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 
 		protected final StreamStatusProvider streamStatusProvider;
 
+		@Nullable
 		protected final OutputTag<T> outputTag;
 
 		public ChainingOutput(
 				OneInputStreamOperator<T, ?> operator,
 				StreamStatusProvider streamStatusProvider,
-				OutputTag<T> outputTag) {
+				@Nullable OutputTag<T> outputTag) {
 			this.operator = operator;
 
 			{
@@ -502,7 +532,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		}
 	}
 
-	private static final class CopyingChainingOutput<T> extends ChainingOutput<T> {
+	static final class CopyingChainingOutput<T> extends ChainingOutput<T> {
 
 		private final TypeSerializer<T> serializer;
 
@@ -570,7 +600,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		}
 	}
 
-	private static class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
+	static class BroadcastingOutputCollector<T> implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
 
 		protected final Output<StreamRecord<T>>[] outputs;
 
@@ -640,7 +670,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 	 * Special version of {@link BroadcastingOutputCollector} that performs a shallow copy of the
 	 * {@link StreamRecord} to ensure that multi-chaining works correctly.
 	 */
-	private static final class CopyingBroadcastingOutputCollector<T> extends BroadcastingOutputCollector<T> {
+	static final class CopyingBroadcastingOutputCollector<T> extends BroadcastingOutputCollector<T> {
 
 		public CopyingBroadcastingOutputCollector(
 				Output<StreamRecord<T>>[] outputs,

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d12344/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d6e088d..629dcd9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -611,15 +611,23 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			if (isRunning) {
 				// we can do a checkpoint
 
-				// Since both state checkpointing and downstream barrier emission occurs in this
-				// lock scope, they are an atomic operation regardless of the order in which they occur.
-				// Given this, we immediately emit the checkpoint barriers, so the downstream operators
-				// can start their checkpoint work as soon as possible
+				// All of the following steps happen as an atomic step from the perspective of barriers and
+				// records/watermarks/timers/callbacks.
+				// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
+				// checkpoint alignments
+
+				// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
+				//           The pre-barrier work should be nothing or minimal in the common case.
+				operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
+
+				// Step (2): Send the checkpoint barrier downstream
 				operatorChain.broadcastCheckpointBarrier(
 						checkpointMetaData.getCheckpointId(),
 						checkpointMetaData.getTimestamp(),
 						checkpointOptions);
 
+				// Step (3): Take the state snapshot. This should be largely asynchronous, to not
+				//           impact progress of the streaming topology
 				checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
 				return true;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d12344/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index f82e403..4fcc813 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -64,6 +64,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 			"UDF::open",
 			"OPERATOR::run",
 			"UDF::run",
+			"OPERATOR::prepareSnapshotPreBarrier",
 			"OPERATOR::snapshotState",
 			"OPERATOR::close",
 			"UDF::close",
@@ -82,15 +83,22 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 			"OPERATOR::dispose",
 			"UDF::close");
 
-	private static final String ALL_METHODS_STREAM_OPERATOR = "[close[], dispose[], getChainingStrategy[], " +
-			"getCurrentKey[], getMetricGroup[], getOperatorID[], initializeState[], " +
-			"notifyCheckpointComplete[long], open[], setChainingStrategy[class " +
-			"org.apache.flink.streaming.api.operators.ChainingStrategy], setCurrentKey[class java.lang.Object], " +
+	private static final String ALL_METHODS_STREAM_OPERATOR = "[" +
+			"close[], " +
+			"dispose[], " +
+			"getChainingStrategy[], " +
+			"getCurrentKey[], " +
+			"getMetricGroup[], " +
+			"getOperatorID[], " +
+			"initializeState[], " +
+			"notifyCheckpointComplete[long], " +
+			"open[], " +
+			"prepareSnapshotPreBarrier[long], " +
+			"setChainingStrategy[class org.apache.flink.streaming.api.operators.ChainingStrategy], " +
+			"setCurrentKey[class java.lang.Object], " +
 			"setKeyContextElement1[class org.apache.flink.streaming.runtime.streamrecord.StreamRecord], " +
 			"setKeyContextElement2[class org.apache.flink.streaming.runtime.streamrecord.StreamRecord], " +
-			"setup[class org.apache.flink.streaming.runtime.tasks.StreamTask, class " +
-			"org.apache.flink.streaming.api.graph.StreamConfig, interface " +
-			"org.apache.flink.streaming.api.operators.Output], " +
+			"setup[class org.apache.flink.streaming.runtime.tasks.StreamTask, class org.apache.flink.streaming.api.graph.StreamConfig, interface org.apache.flink.streaming.api.operators.Output], " +
 			"snapshotState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions, interface org.apache.flink.runtime.state.CheckpointStreamFactory]]";
 
 	private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[], getRuntimeContext[]" +
@@ -129,7 +137,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		MockSourceFunction srcFun = new MockSourceFunction();
 
-		cfg.setStreamOperator(new LifecycleTrackingStreamSource(srcFun, true));
+		cfg.setStreamOperator(new LifecycleTrackingStreamSource<>(srcFun, true));
 		cfg.setOperatorID(new OperatorID());
 		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
@@ -152,7 +160,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 		Configuration taskManagerConfig = new Configuration();
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		MockSourceFunction srcFun = new MockSourceFunction();
-		cfg.setStreamOperator(new LifecycleTrackingStreamSource(srcFun, false));
+		cfg.setStreamOperator(new LifecycleTrackingStreamSource<>(srcFun, false));
 		cfg.setOperatorID(new OperatorID());
 		cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
@@ -269,6 +277,12 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 		}
 
 		@Override
+		public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+			ACTUAL_ORDER_TRACKING.add("OPERATOR::prepareSnapshotPreBarrier");
+			super.prepareSnapshotPreBarrier(checkpointId);
+		}
+
+		@Override
 		public void open() throws Exception {
 			ACTUAL_ORDER_TRACKING.add("OPERATOR::open");
 			super.open();

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d12344/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
new file mode 100644
index 0000000..be51aa6
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
+import org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingOutput;
+import org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This class test the {@link OperatorChain}.
+ *
+ * <p>It takes a different (simpler) approach at testing the operator chain than
+ * {@link StreamOperatorChainingTest}.
+ */
+public class OperatorChainTest {
+
+	@Test
+	public void testPrepareCheckpointPreBarrier() throws Exception {
+		final AtomicInteger intRef = new AtomicInteger();
+
+		final OneInputStreamOperator<String, String> one = new ValidatingOperator(intRef, 0);
+		final OneInputStreamOperator<String, String> two = new ValidatingOperator(intRef, 1);
+		final OneInputStreamOperator<String, String> three = new ValidatingOperator(intRef, 2);
+
+		final OperatorChain<?, ?> chain = setupOperatorChain(one, two, three);
+		chain.prepareSnapshotPreBarrier(ValidatingOperator.CHECKPOINT_ID);
+
+		assertEquals(3, intRef.get());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Operator Chain Setup Utils
+	// ------------------------------------------------------------------------
+
+	@SafeVarargs
+	private static <T, OP extends StreamOperator<T>> OperatorChain<T, OP> setupOperatorChain(
+			OneInputStreamOperator<T, T>... operators) {
+
+		checkNotNull(operators);
+		checkArgument(operators.length > 0);
+
+		try (MockEnvironment env = MockEnvironment.builder().build()) {
+
+		final StreamTask<?, ?> containingTask = new OneInputStreamTask<T, OneInputStreamOperator<T, T>>(env);
+
+			final StreamStatusProvider statusProvider = mock(StreamStatusProvider.class);
+			final StreamConfig cfg = new StreamConfig(new Configuration());
+
+			final StreamOperator<?>[] ops = new StreamOperator<?>[operators.length];
+
+			// initial output goes to nowhere
+			@SuppressWarnings({"unchecked", "rawtypes"})
+			WatermarkGaugeExposingOutput<StreamRecord<T>> lastWriter = new BroadcastingOutputCollector<>(
+					new Output[0], statusProvider);
+
+			// build the reverse operators array
+			for (int i = 0; i < ops.length; i++) {
+				OneInputStreamOperator<T, T> op = operators[ops.length - i - 1];
+				op.setup(containingTask, cfg, lastWriter);
+				lastWriter = new ChainingOutput<>(op, statusProvider, null);
+				ops[i] = op;
+			}
+
+			@SuppressWarnings("unchecked")
+			final OP head = (OP) operators[0];
+
+			return new OperatorChain<>(
+					ops,
+					new RecordWriterOutput<?>[0],
+					lastWriter,
+					head);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Test Operator Implementations
+	// ------------------------------------------------------------------------
+
+	private static class ValidatingOperator
+			extends AbstractStreamOperator<String>
+			implements OneInputStreamOperator<String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		static final long CHECKPOINT_ID = 5765167L;
+
+		final AtomicInteger toUpdate;
+		final int expected;
+
+		public ValidatingOperator(AtomicInteger toUpdate, int expected) {
+			this.toUpdate = toUpdate;
+			this.expected = expected;
+		}
+
+		@Override
+		public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+			assertEquals("wrong checkpointId", CHECKPOINT_ID, checkpointId);
+			assertEquals("wrong order", expected, toUpdate.getAndIncrement());
+		}
+
+		@Override
+		public void processElement(StreamRecord<String> element) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public OperatorID getOperatorID() {
+			return new OperatorID();
+		}
+	}
+}