You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/15 14:12:49 UTC

[2/2] flink git commit: [FLINK-5800] [checkpointing] Create CheckpointSteamFactory only once per operator

[FLINK-5800] [checkpointing] Create CheckpointSteamFactory only once per operator

Previously, the factory was created once per checkpoint, and its repeated initialization logic
(like ensuring existence of base paths) caused heavy load on some filesystems at very large scale.

This closes #3312


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04e6758a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04e6758a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04e6758a

Branch: refs/heads/master
Commit: 04e6758abbadf39a12848a925e6e087e060bbe3a
Parents: d062448
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Feb 14 18:35:59 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 15 15:08:21 2017 +0100

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   | 44 ++++++++++--
 .../operators/StreamCheckpointedOperator.java   |  4 ++
 .../streaming/api/operators/StreamOperator.java | 26 ++++---
 .../streaming/runtime/tasks/StreamTask.java     | 48 ++-----------
 .../operators/AbstractStreamOperatorTest.java   | 18 +++--
 .../AbstractUdfStreamOperatorLifecycleTest.java | 15 +++-
 .../streaming/runtime/tasks/StreamTaskTest.java | 73 ++++++++------------
 .../util/AbstractStreamOperatorTestHarness.java | 16 ++---
 .../streaming/runtime/StateBackendITCase.java   |  5 +-
 9 files changed, 122 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7a3e2ce..144247f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -114,6 +114,10 @@ public abstract class AbstractStreamOperator<OUT>
 	/** The runtime context for UDFs */
 	private transient StreamingRuntimeContext runtimeContext;
 
+	// ----------------- general state -------------------
+
+	/** The factory that give this operator access to checkpoint storage */
+	private transient CheckpointStreamFactory checkpointStreamFactory;
 
 	// ---------------- key/value state ------------------
 
@@ -127,10 +131,11 @@ public abstract class AbstractStreamOperator<OUT>
 	/** Keyed state store view on the keyed backend */
 	private transient DefaultKeyedStateStore keyedStateStore;
 
+	// ---------------- operator state ------------------
+
 	/** Operator state backend / store */
 	private transient OperatorStateBackend operatorStateBackend;
 
-
 	// --------------- Metrics ---------------------------
 
 	/** Metric group for the operator */
@@ -212,6 +217,8 @@ public abstract class AbstractStreamOperator<OUT>
 			}
 		}
 
+		checkpointStreamFactory = container.createCheckpointStreamFactory(this);
+
 		initOperatorState(operatorStateHandlesBackend);
 
 		StateInitializationContext initializationContext = new StateInitializationContextImpl(
@@ -333,8 +340,7 @@ public abstract class AbstractStreamOperator<OUT>
 	}
 
 	@Override
-	public final OperatorSnapshotResult snapshotState(
-			long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
+	public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) throws Exception {
 
 		KeyGroupRange keyGroupRange = null != keyedStateBackend ?
 				keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
@@ -344,7 +350,7 @@ public abstract class AbstractStreamOperator<OUT>
 		try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
 				checkpointId,
 				timestamp,
-				streamFactory,
+				checkpointStreamFactory,
 				keyGroupRange,
 				getContainingTask().getCancelables())) {
 
@@ -355,14 +361,14 @@ public abstract class AbstractStreamOperator<OUT>
 
 			if (null != operatorStateBackend) {
 				snapshotInProgress.setOperatorStateManagedFuture(
-					operatorStateBackend.snapshot(checkpointId, timestamp, streamFactory));
+					operatorStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory));
 			}
 
 			if (null != keyedStateBackend) {
 				snapshotInProgress.setKeyedStateManagedFuture(
-					keyedStateBackend.snapshot(checkpointId, timestamp, streamFactory));
+					keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory));
 			}
-		}  catch (Exception snapshotException) {
+		} catch (Exception snapshotException) {
 			try {
 				snapshotInProgress.cancel();
 			} catch (Exception e) {
@@ -422,6 +428,30 @@ public abstract class AbstractStreamOperator<OUT>
 		}
 	}
 
+	@SuppressWarnings("deprecation")
+	@Deprecated
+	@Override
+	public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception {
+		if (this instanceof StreamCheckpointedOperator) {
+
+			final CheckpointStreamFactory.CheckpointStateOutputStream outStream =
+					checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+
+			getContainingTask().getCancelables().registerClosable(outStream);
+
+			try {
+				((StreamCheckpointedOperator) this).snapshotState(outStream, checkpointId, timestamp);
+				return outStream.closeAndGetHandle();
+			}
+			finally {
+				getContainingTask().getCancelables().unregisterClosable(outStream);
+				outStream.close();
+			}
+		} else {
+			return null;
+		}
+	}
+
 	/**
 	 * Stream operators with state which can be restored need to override this hook method.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
index f93e7ea..a28cdc4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
@@ -20,6 +20,10 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.core.fs.FSDataOutputStream;
 
+/**
+ * This interface is deprecated without replacement.
+ * All operators are now checkpointed.
+ */
 @Deprecated
 public interface StreamCheckpointedOperator extends CheckpointedRestoringOperator {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index f6e5472..d8e4d08 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
@@ -93,18 +93,24 @@ public interface StreamOperator<OUT> extends Serializable {
 	/**
 	 * Called to draw a state snapshot from the operator.
 	 *
-	 * @throws Exception Forwards exceptions that occur while preparing for the snapshot
-	 */
-
-	/**
-	 * Called to draw a state snapshot from the operator.
-	 *
 	 * @return a runnable future to the state handle that points to the snapshotted state. For synchronous implementations,
 	 * the runnable might already be finished.
+	 * 
 	 * @throws Exception exception that happened during snapshotting.
 	 */
-	OperatorSnapshotResult snapshotState(
-			long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception;
+	OperatorSnapshotResult snapshotState(long checkpointId, long timestamp) throws Exception;
+
+	/**
+	 * Takes a snapshot of the legacy operator state defined via {@link StreamCheckpointedOperator}.
+	 * 
+	 * @return The handle to the legacy operator state, or null, if no state was snapshotted.
+	 * @throws Exception This method should forward any type of exception that happens during snapshotting.
+	 * 
+	 * @deprecated This method will be removed as soon as no more operators use the legacy state code paths
+	 */
+	@SuppressWarnings("deprecation")
+	@Deprecated
+	StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception;
 
 	/**
 	 * Provides state handles to restore the operator state.
@@ -134,6 +140,6 @@ public interface StreamOperator<OUT> extends Serializable {
 	ChainingStrategy getChainingStrategy();
 
 	void setChainingStrategy(ChainingStrategy strategy);
-	
+
 	MetricGroup getMetricGroup();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 3781cb6..6b33d12 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -53,7 +53,6 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -1045,8 +1044,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		// ------------------------
 
-		private CheckpointStreamFactory streamFactory;
-
 		private final List<StreamStateHandle> nonPartitionedStates;
 		private final List<OperatorSnapshotResult> snapshotInProgressList;
 
@@ -1125,15 +1122,17 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			}
 		}
 
+		@SuppressWarnings("deprecation")
 		private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
 			if (null != op) {
-				createStreamFactory(op);
-				snapshotNonPartitionableState(op);
+				// first call the legacy checkpoint code paths 
+				nonPartitionedStates.add(op.snapshotLegacyOperatorState(
+						checkpointMetaData.getCheckpointId(),
+						checkpointMetaData.getTimestamp()));
 
 				OperatorSnapshotResult snapshotInProgress = op.snapshotState(
 						checkpointMetaData.getCheckpointId(),
-						checkpointMetaData.getTimestamp(),
-						streamFactory);
+						checkpointMetaData.getTimestamp());
 
 				snapshotInProgressList.add(snapshotInProgress);
 			} else {
@@ -1143,41 +1142,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			}
 		}
 
-		private void createStreamFactory(StreamOperator<?> operator) throws IOException {
-			String operatorId = owner.createOperatorIdentifier(operator, owner.configuration.getVertexID());
-			this.streamFactory = owner.stateBackend.createStreamFactory(owner.getEnvironment().getJobID(), operatorId);
-		}
-
-		//TODO deprecated code path
-		private void snapshotNonPartitionableState(StreamOperator<?> operator) throws Exception {
-
-			StreamStateHandle stateHandle = null;
-
-			if (operator instanceof StreamCheckpointedOperator) {
-
-				CheckpointStreamFactory.CheckpointStateOutputStream outStream =
-						streamFactory.createCheckpointStateOutputStream(
-								checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
-
-				owner.cancelables.registerClosable(outStream);
-
-				try {
-					((StreamCheckpointedOperator) operator).
-							snapshotState(
-									outStream,
-									checkpointMetaData.getCheckpointId(),
-									checkpointMetaData.getTimestamp());
-
-					stateHandle = outStream.closeAndGetHandle();
-				} finally {
-					owner.cancelables.unregisterClosable(outStream);
-					outStream.close();
-				}
-			}
-
-			nonPartitionedStates.add(stateHandle);
-		}
-
 		public void runAsyncCheckpointingAndAcknowledge() throws IOException {
 
 			AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 409a732..274611a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -55,7 +55,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
@@ -491,16 +490,15 @@ public class AbstractStreamOperatorTest {
 		StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class);
 
 		whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
-
-		CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
+		
 		StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
 		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
 		AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
-		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenCallRealMethod();
+		when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
 		doReturn(containingTask).when(operator).getContainingTask();
 
-		operator.snapshotState(checkpointId, timestamp, streamFactory);
+		operator.snapshotState(checkpointId, timestamp);
 
 		verify(context).close();
 	}
@@ -522,19 +520,18 @@ public class AbstractStreamOperatorTest {
 
 		whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
 
-		CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class);
 		StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class);
 		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
 		AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
-		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenCallRealMethod();
+		when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
 		doReturn(containingTask).when(operator).getContainingTask();
 
 		// lets fail when calling the actual snapshotState method
 		doThrow(failingException).when(operator).snapshotState(eq(context));
 
 		try {
-			operator.snapshotState(checkpointId, timestamp, streamFactory);
+			operator.snapshotState(checkpointId, timestamp);
 			fail("Exception expected.");
 		} catch (Exception e) {
 			assertEquals(failingException, e.getCause());
@@ -574,7 +571,7 @@ public class AbstractStreamOperatorTest {
 		when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
 		AbstractStreamOperator<Void> operator = mock(AbstractStreamOperator.class);
-		when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenCallRealMethod();
+		when(operator.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
 		doReturn(containingTask).when(operator).getContainingTask();
 
 		RunnableFuture<OperatorStateHandle> futureManagedOperatorStateHandle = mock(RunnableFuture.class);
@@ -587,9 +584,10 @@ public class AbstractStreamOperatorTest {
 
 		Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend);
 		Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend);
+		Whitebox.setInternalState(operator, "checkpointStreamFactory", streamFactory);
 
 		try {
-			operator.snapshotState(checkpointId, timestamp, streamFactory);
+			operator.snapshotState(checkpointId, timestamp);
 			fail("Exception expected.");
 		} catch (Exception e) {
 			assertEquals(failingException, e.getCause());

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 965aec6..357163c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -60,6 +61,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 			"UDF::open",
 			"OPERATOR::run",
 			"UDF::run",
+			"OPERATOR::snapshotLegacyOperatorState",
 			"OPERATOR::snapshotState",
 			"OPERATOR::close",
 			"UDF::close",
@@ -86,8 +88,9 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 			"setKeyContextElement2[class org.apache.flink.streaming.runtime.streamrecord.StreamRecord], " +
 			"setup[class org.apache.flink.streaming.runtime.tasks.StreamTask, class " +
 			"org.apache.flink.streaming.api.graph.StreamConfig, interface " +
-			"org.apache.flink.streaming.api.operators.Output], snapshotState[long, long, " +
-			"interface org.apache.flink.runtime.state.CheckpointStreamFactory]]";
+			"org.apache.flink.streaming.api.operators.Output], " +
+			"snapshotLegacyOperatorState[long, long], " +
+			"snapshotState[long, long]]";
 
 	private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[], getRuntimeContext[]" +
 			", open[class org.apache.flink.configuration.Configuration], setRuntimeContext[interface " +
@@ -198,7 +201,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 	}
 
 	private static class LifecycleTrackingStreamSource<OUT, SRC extends SourceFunction<OUT>>
-			extends StreamSource<OUT, SRC> implements Serializable {
+			extends StreamSource<OUT, SRC> implements Serializable, StreamCheckpointedOperator {
 
 		private static final long serialVersionUID = 2431488948886850562L;
 		private transient Thread testCheckpointer;
@@ -254,6 +257,12 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 		}
 
 		@Override
+		public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp) throws Exception {
+			ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotLegacyOperatorState");
+			return super.snapshotLegacyOperatorState(checkpointId, timestamp);
+		}
+
+		@Override
 		public void initializeState(StateInitializationContext context) throws Exception {
 			ACTUAL_ORDER_TRACKING.add("OPERATOR::initializeState");
 			super.initializeState(context);

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index cb9850f..6a63ee8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -95,6 +95,7 @@ import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import scala.concurrent.Await;
@@ -135,6 +136,8 @@ import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(StreamTask.class)
+@PowerMockIgnore("org.apache.log4j.*")
+@SuppressWarnings("deprecation")
 public class StreamTaskTest extends TestLogger {
 
 	private static OneShotLatch SYNC_LATCH;
@@ -289,49 +292,42 @@ public class StreamTaskTest extends TestLogger {
 		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
 		streamTask.setEnvironment(mockEnvironment);
 
+		// mock the operators
 		StreamOperator<?> streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 		StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 		StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 
+		// mock the returned snapshots
 		OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
 		OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class);
 
 		final Exception testException = new Exception("Test exception");
 
-		when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
-		when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
-		when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenThrow(testException);
-
-		StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
-
-		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
-		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+		when(streamOperator1.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult1);
+		when(streamOperator2.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult2);
+		when(streamOperator3.snapshotState(anyLong(), anyLong())).thenThrow(testException);
 
+		// mock the returned legacy snapshots
 		StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
 
-		CheckpointStreamFactory.CheckpointStateOutputStream outStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-		CheckpointStreamFactory.CheckpointStateOutputStream outStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-		CheckpointStreamFactory.CheckpointStateOutputStream outStream3 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
+		when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle1);
+		when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle2);
+		when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle3);
 
-		when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1);
-		when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2);
-		when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3);
+		// set up the task
 
-		CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
-		when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(
-			outStream1, outStream2, outStream3);
+		StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
 
-		AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
-		when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
 
 		Whitebox.setInternalState(streamTask, "isRunning", true);
 		Whitebox.setInternalState(streamTask, "lock", new Object());
 		Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
 		Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
-		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
 
 		try {
 			streamTask.triggerCheckpoint(checkpointMetaData);
@@ -371,6 +367,8 @@ public class StreamTaskTest extends TestLogger {
 		StreamOperator<?> streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 		StreamOperator<?> streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class));
 
+		// mock the new state handles / futures
+
 		OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class);
 		OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class);
 		OperatorSnapshotResult operatorSnapshotResult3 = mock(OperatorSnapshotResult.class);
@@ -380,33 +378,23 @@ public class StreamTaskTest extends TestLogger {
 
 		when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
 
-		when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
-		when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
-		when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult3);
-
-		StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
-
-		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
-		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+		when(streamOperator1.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult1);
+		when(streamOperator2.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult2);
+		when(streamOperator3.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult3);
 
+		// mock the legacy state snapshot
 		StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class);
 		StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class);
 
-		CheckpointStreamFactory.CheckpointStateOutputStream outStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-		CheckpointStreamFactory.CheckpointStateOutputStream outStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-		CheckpointStreamFactory.CheckpointStateOutputStream outStream3 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-
-		when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1);
-		when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2);
-		when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3);
+		when(streamOperator1.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle1);
+		when(streamOperator2.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle2);
+		when(streamOperator3.snapshotLegacyOperatorState(anyLong(), anyLong())).thenReturn(streamStateHandle3);
 
-		CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
-		when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(
-			outStream1, outStream2, outStream3);
+		StreamOperator<?>[] streamOperators = {streamOperator1, streamOperator2, streamOperator3};
 
-		AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
-		when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
 
 		Whitebox.setInternalState(streamTask, "isRunning", true);
 		Whitebox.setInternalState(streamTask, "lock", new Object());
@@ -414,7 +402,6 @@ public class StreamTaskTest extends TestLogger {
 		Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
 		Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", new DirectExecutorService());
 		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
-		Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
 
 		streamTask.triggerCheckpoint(checkpointMetaData);
 
@@ -479,7 +466,7 @@ public class StreamTaskTest extends TestLogger {
 			new DoneFuture<>(managedOperatorStateHandle),
 			new DoneFuture<>(rawOperatorStateHandle));
 
-		when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult);
+		when(streamOperator.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult);
 
 		StreamOperator<?>[] streamOperators = {streamOperator};
 
@@ -595,7 +582,7 @@ public class StreamTaskTest extends TestLogger {
 			new DoneFuture<>(managedOperatorStateHandle),
 			new DoneFuture<>(rawOperatorStateHandle));
 
-		when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult);
+		when(streamOperator.snapshotState(anyLong(), anyLong())).thenReturn(operatorSnapshotResult);
 
 		StreamOperator<?>[] streamOperators = {streamOperator};
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 568410a..2df4efd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -459,15 +459,11 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	}
 
 	/**
-	 * Calls {@link StreamOperator#snapshotState(long, long, CheckpointStreamFactory)}.
+	 * Calls {@link StreamOperator#snapshotState(long, long)}.
 	 */
 	public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception {
 
-		CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory(
-				new JobID(),
-				"test_op");
-
-		OperatorSnapshotResult operatorStateResult = operator.snapshotState(checkpointId, timestamp, streamFactory);
+		OperatorSnapshotResult operatorStateResult = operator.snapshotState(checkpointId, timestamp);
 
 		KeyGroupsStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
 		KeyGroupsStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());
@@ -475,14 +471,13 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
 		OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());
 
-		OperatorStateHandles handles = new OperatorStateHandles(
+		return new OperatorStateHandles(
 			0,
 			null,
 			keyedManaged != null ? Collections.singletonList(keyedManaged) : null,
 			keyedRaw != null ? Collections.singletonList(keyedRaw) : null,
 			opManaged != null ? Collections.singletonList(opManaged) : null,
 			opRaw != null ? Collections.singletonList(opRaw) : null);
-		return handles;
 	}
 
 	/**
@@ -490,6 +485,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	 * the operator implements this interface.
 	 */
 	@Deprecated
+	@SuppressWarnings("deprecation")
 	public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception {
 
 		CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory(
@@ -513,7 +509,9 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	/**
 	 * Calls {@link StreamCheckpointedOperator#restoreState(FSDataInputStream)} if
 	 * the operator implements this interface.
-	 */	@Deprecated
+	 */
+	@Deprecated
+	@SuppressWarnings("deprecation")
 	public void restore(StreamStateHandle snapshot) throws Exception {
 		if(operator instanceof StreamCheckpointedOperator) {
 			try (FSDataInputStream in = snapshot.openInputStream()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/04e6758a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 0e62fbb..ec6a8f5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -38,6 +38,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
@@ -81,9 +82,7 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 		}
 		catch (JobExecutionException e) {
 			Throwable t = e.getCause();
-			if (!(t != null && t.getCause() instanceof SuccessException)) {
-				throw e;
-			}
+			assertTrue("wrong exception", t instanceof SuccessException);
 		}
 	}