You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/06/25 19:21:36 UTC

[03/12] flink git commit: [streaming] Initial rework of the operator state interfaces

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index c745e6c..0a26ebb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -18,17 +18,29 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.collector.selector.BroadcastOutputSelectorWrapper;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamNode;
@@ -42,18 +54,6 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicLong;
-
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({Task.class, ResultPartitionWriter.class})
 public class SourceStreamTaskTest extends StreamTaskTestBase {
@@ -144,7 +144,7 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
 		Assert.assertEquals(NUM_ELEMENTS, outList.size());
 	}
 
-	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed {
+	private static class MockSource extends RichSourceFunction<Tuple2<Long, Integer>> implements StateCheckpointer<Integer, Integer> {
 
 		private static final long serialVersionUID = 1;
 
@@ -156,6 +156,7 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
 		private volatile long lastCheckpointId = -1;
 
 		private Semaphore semaphore;
+		private OperatorState<Integer> state;
 
 		private volatile boolean isRunning = true;
 
@@ -164,7 +165,7 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
 			this.checkpointDelay = checkpointDelay;
 			this.readDelay = readDelay;
 			this.count = 0;
-			semaphore = new Semaphore(1);
+			this.semaphore = new Semaphore(1);
 		}
 
 		@Override
@@ -189,32 +190,33 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
 		public void cancel() {
 			isRunning = false;
 		}
+		
+		@Override
+		public void open(Configuration conf){
+			state = getRuntimeContext().getOperatorState(1, this);
+		}
+
 
 		@Override
-		public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		public Integer snapshotState(Integer state, long checkpointId, long checkpointTimestamp) {
 			if (!semaphore.tryAcquire()) {
 				Assert.fail("Concurrent invocation of snapshotState.");
-			}
-			int startCount = count;
-			lastCheckpointId = checkpointId;
-
-			long sum = 0;
-			for (int i = 0; i < checkpointDelay; i++) {
-				sum += new Random().nextLong();
-			}
-
-			if (startCount != count) {
+			} else {
+				int startCount = count;
+				
+				if (startCount != count) {
+					semaphore.release();
+					// This means that next() was invoked while the snapshot was ongoing
+					Assert.fail("Count is different at start end end of snapshot.");
+				}
 				semaphore.release();
-				// This means that next() was invoked while the snapshot was ongoing
-				Assert.fail("Count is different at start end end of snapshot.");
 			}
-			semaphore.release();
-			return sum;
+			return 0;
 		}
 
 		@Override
-		public void restoreState(Serializable state) {
-
+		public Integer restoreState(Integer stateSnapshot) {
+			return stateSnapshot;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
index 7713994..ea0cb94 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
@@ -20,12 +20,10 @@ package org.apache.flink.streaming.util;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -157,8 +155,9 @@ public class MockCoContext<IN1, IN2, OUT> {
 	public static <IN1, IN2, OUT> List<OUT> createAndExecute(TwoInputStreamOperator<IN1, IN2, OUT> operator,
 			List<IN1> input1, List<IN2> input2) {
 		MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2);
-		RuntimeContext runtimeContext =  new StreamingRuntimeContext("CoMockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
-				new ExecutionConfig());
+		StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("CoMockTask",
+				new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
+				new ExecutionConfig(), null);
 
 		operator.setup(mockContext.collector, runtimeContext);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 8b5607f..adec338 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -20,12 +20,10 @@ package org.apache.flink.streaming.util;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -105,8 +103,9 @@ public class MockContext<IN, OUT> {
 	public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator,
 			List<IN> inputs) {
 		MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
-		RuntimeContext runtimeContext =  new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
-				new ExecutionConfig());
+		StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask",
+				new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
+				new ExecutionConfig(), null);
 
 		operator.setup(mockContext.output, runtimeContext);
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index f0eef9d..8c17eec 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.test.checkpointing;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
@@ -25,24 +29,19 @@ import java.util.Random;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.
  * 
@@ -182,14 +181,14 @@ public class StreamCheckpointingITCase {
 	// --------------------------------------------------------------------------------------------
 	
 	private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
-			implements Checkpointed<Long>, ParallelSourceFunction<String> {
+			implements  ParallelSourceFunction<String> {
 
 		private final long numElements;
 		
 		private Random rnd;
 		private StringBuilder stringBuilder;
 
-		private long index;
+		private OperatorState<Integer> index;
 		private int step;
 
 		private volatile boolean isRunning;
@@ -197,7 +196,7 @@ public class StreamCheckpointingITCase {
 		static final long[] counts = new long[PARALLELISM];
 		@Override
 		public void close() {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = index.getState();
 		}
 
 
@@ -211,9 +210,9 @@ public class StreamCheckpointingITCase {
 			stringBuilder = new StringBuilder();
 			step = getRuntimeContext().getNumberOfParallelSubtasks();
 			
-			if (index == 0) {
-				index = getRuntimeContext().getIndexOfThisSubtask();
-			}
+			
+			index = getRuntimeContext().getOperatorState(getRuntimeContext().getIndexOfThisSubtask());
+			
 			isRunning = true;
 		}
 
@@ -221,8 +220,8 @@ public class StreamCheckpointingITCase {
 		public void run(SourceContext<String> ctx) throws Exception {
 			final Object lockingObject = ctx.getCheckpointLock();
 
-			while (isRunning && index < numElements) {
-				char first = (char) ((index % 40) + 40);
+			while (isRunning && index.getState() < numElements) {
+				char first = (char) ((index.getState() % 40) + 40);
 
 				stringBuilder.setLength(0);
 				stringBuilder.append(first);
@@ -230,7 +229,7 @@ public class StreamCheckpointingITCase {
 				String result = randomString(stringBuilder, rnd);
 
 				synchronized (lockingObject) {
-					index += step;
+					index.updateState(index.getState() + step);
 					ctx.collect(result);
 				}
 			}
@@ -241,16 +240,6 @@ public class StreamCheckpointingITCase {
 			isRunning = false;
 		}
 
-		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return this.index;
-		}
-
-		@Override
-		public void restoreState(Long state) {
-			this.index = state;
-		}
-
 		private static String randomString(StringBuilder bld, Random rnd) {
 			final int len = rnd.nextInt(10) + 5;
 
@@ -263,35 +252,27 @@ public class StreamCheckpointingITCase {
 		}
 	}
 	
-	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> 
-			implements Checkpointed<Long> {
-
+	private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> {
 
+		private OperatorState<Long> count;
+		static final long[] counts = new long[PARALLELISM];
 
 		@Override
 		public PrefixCount map(PrefixCount value) throws Exception {
-			count++;
+			count.updateState(count.getState() + 1);
 			return value;
 		}
 
-		static final long[] counts = new long[PARALLELISM];
-
-		private long count = 0;
-
-		@Override
-		public void close() {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
-		}
-
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
+		public void open(Configuration conf) {
+			count = getRuntimeContext().getOperatorState(0L);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void close() {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState();
 		}
+		
 	}
 	
 	private static class OnceFailingReducer extends RichReduceFunction<PrefixCount> {
@@ -353,60 +334,47 @@ public class StreamCheckpointingITCase {
 		}
 	}
 
-	private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
+	private static class StringRichFilterFunction extends RichFilterFunction<String> {
 
+		OperatorState<Long> count;
+		static final long[] counts = new long[PARALLELISM];
+		
 		@Override
 		public boolean filter(String value) {
-			count++;
+			count.updateState(count.getState() + 1);
 			return value.length() < 100;
 		}
-
-		static final long[] counts = new long[PARALLELISM];
-
-		private long count = 0;
-
-		@Override
-		public void close() {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
-		}
-
+		
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
+		public void open(Configuration conf) {
+			this.count = getRuntimeContext().getOperatorState(0L);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void close() {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState();
 		}
 	}
 
-	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements Checkpointed<Long> {
-
+	private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> {
 
+		OperatorState<Long> count;
+		static final long[] counts = new long[PARALLELISM];
+		
 		@Override
 		public PrefixCount map(String value) {
-			count++;
+			count.updateState(count.getState() + 1);
 			return new PrefixCount(value.substring(0, 1), value, 1L);
 		}
-
-		static final long[] counts = new long[PARALLELISM];
-
-		private long count = 0;
-
-		@Override
-		public void close() {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
-		}
-
+		
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-			return count;
+		public void open(Configuration conf) {
+			this.count = getRuntimeContext().getOperatorState(0L);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			count = state;
+		public void close() {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index f153eb7..6003205 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.test.recovery;
 
 
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
@@ -30,10 +29,10 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FileStateHandle;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -84,7 +83,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 					}
 				}).startNewChain()
 				// populate the coordinate directory so we can proceed to TaskManager failure
-				.map(new StatefulMapper(coordinateDir));
+				.map(new Mapper(coordinateDir));				
 
 		//write result to temporary file
 		result.addSink(new CheckpointedSink(DATA_COUNT));
@@ -105,18 +104,16 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 		}
 	}
 
-	public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long>
-			implements Checkpointed<Long> {
-		private static final long serialVersionUID = 1L;
+	public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long> {
 
 		private static final long SLEEP_TIME = 50;
 
 		private final File coordinateDir;
 		private final long end;
 
-		private long collected;
-
 		private volatile boolean isRunning = true;
+		
+		private OperatorState<Long> collected;
 
 		public SleepyDurableGenerateSequence(File coordinateDir, long end) {
 			this.coordinateDir = coordinateDir;
@@ -136,7 +133,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 			final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
 			boolean checkForProceedFile = true;
 
-			while (isRunning && collected < toCollect) {
+			while (isRunning && collected.getState() < toCollect) {
 				// check if the proceed file exists (then we go full speed)
 				// if not, we always recheck and sleep
 				if (checkForProceedFile) {
@@ -149,34 +146,28 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 				}
 
 				synchronized (checkpointLock) {
-					sourceCtx.collect(collected * stepSize + congruence);
-					collected++;
+					sourceCtx.collect(collected.getState() * stepSize + congruence);
+					collected.updateState(collected.getState() + 1);
 				}
 			}
 		}
-
-		@Override
-		public void cancel() {
-			isRunning = false;
-		}
-
+		
 		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return collected;
+		public void open(Configuration conf) {
+			collected = getRuntimeContext().getOperatorState(0L);
 		}
 
 		@Override
-		public void restoreState(Long state) {
-			collected = state;
+		public void cancel() {
+			isRunning = false;
 		}
 	}
-	public static class StatefulMapper extends RichMapFunction<Long, Long> implements
-			Checkpointed<Integer> {
+	
+	public static class Mapper extends RichMapFunction<Long, Long> {
 		private boolean markerCreated = false;
 		private File coordinateDir;
-		private boolean restored = false;
 
-		public StatefulMapper(File coordinateDir) {
+		public Mapper(File coordinateDir) {
 			this.coordinateDir = coordinateDir;
 		}
 
@@ -189,31 +180,14 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 			}
 			return value;
 		}
-
-		@Override
-		public void close() {
-			if (!restored) {
-				fail();
-			}
-		}
-
-		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return 1;
-		}
-
-		@Override
-		public void restoreState(Integer state) {
-			restored = true;
-		}
 	}
 
-	private static class CheckpointedSink extends RichSinkFunction<Long> implements Checkpointed<Long> {
+	private static class CheckpointedSink extends RichSinkFunction<Long> {
 
 		private long stepSize;
 		private long congruence;
 		private long toCollect;
-		private long collected = 0L;
+		private OperatorState<Long> collected;
 		private long end;
 
 		public CheckpointedSink(long end) {
@@ -225,30 +199,21 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 			stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
 			congruence = getRuntimeContext().getIndexOfThisSubtask();
 			toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
+			collected = getRuntimeContext().getOperatorState(0L);
 		}
 
 		@Override
 		public void invoke(Long value) throws Exception {
-			long expected = collected * stepSize + congruence;
+			long expected = collected.getState() * stepSize + congruence;
 
 			Assert.assertTrue("Value did not match expected value. " + expected + " != " + value, value.equals(expected));
 
-			collected++;
+			collected.updateState(collected.getState() + 1);
 
-			if (collected > toCollect) {
+			if (collected.getState() > toCollect) {
 				Assert.fail("Collected <= toCollect: " + collected + " > " + toCollect);
 			}
 
 		}
-
-		@Override
-		public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return collected;
-		}
-
-		@Override
-		public void restoreState(Long state) {
-			collected = state;
-		}
 	}
 }