You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/07/07 12:45:23 UTC

flink git commit: [FLINK-2323] [api-breaking] Rename OperatorState interface methods to value() and update(..)

Repository: flink
Updated Branches:
  refs/heads/master 0e21941e3 -> 0d2c49005


[FLINK-2323] [api-breaking] Rename OperatorState interface methods to value() and update(..)

Closes #890


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d2c4900
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d2c4900
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d2c4900

Branch: refs/heads/master
Commit: 0d2c49005449d6e05bbf53446edac928bb7ecbb6
Parents: 0e21941
Author: Gyula Fora <gy...@apache.org>
Authored: Tue Jul 7 11:07:05 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Jul 7 12:42:29 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    |  6 +++---
 .../api/common/functions/RuntimeContext.java    |  8 +++----
 .../flink/api/common/state/OperatorState.java   | 22 ++++++++++----------
 .../api/persistent/PersistentKafkaSource.java   | 16 +++++++-------
 .../source/StatefulSequenceSource.java          |  6 +++---
 .../state/PartitionedStreamOperatorState.java   |  4 ++--
 .../api/state/StreamOperatorState.java          | 12 +++++------
 .../api/state/StatefulOperatorTest.java         | 22 ++++++++++----------
 .../StreamCheckpointingITCase.java              | 16 +++++++-------
 .../ProcessFailureStreamingRecoveryITCase.java  |  6 +++---
 10 files changed, 59 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index c612b69..7d8ab6d 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1220,7 +1220,7 @@ Flink supports the checkpointing and persistence of user defined operator states
 
 Flink supports two types of operator states: partitioned and non-partitioned states.
 
-In case of non-partitioned operator state, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.getState()` is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `getState()` will return number of inputs processed by each parallel mapper.
+In case of non-partitioned operator state, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.value()` is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `value()` will return number of inputs processed by each parallel mapper.
 
 In case of of partitioned operator state a separate state is maintained for each received key. This can be used for instance to count received inputs by different keys, or store and update summary statistics of different sub-streams.
 
@@ -1244,7 +1244,7 @@ public class CounterSum implements RichReduceFunction<Long> {
 
     @Override
     public Long reduce(Long value1, Long value2) throws Exception {
-        counter.updateState(counter.getState() + 1);
+        counter.update(counter.value() + 1);
         return value1 + value2;
     }
 
@@ -1275,7 +1275,7 @@ public static class CounterSource implements RichParallelSourceFunction<Long> {
             // output and state update are atomic
             synchronized (lock){
                 ctx.collect(offset);
-                offset.updateState(offset.getState() + 1);
+                offset.update(offset.value() + 1);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 4c8e924..eb84d1c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -184,9 +184,9 @@ public interface RuntimeContext {
 	 *            can be used by the same operator.
 	 * @param defaultState
 	 *            Default value for the operator state. This will be returned
-	 *            the first time {@link OperatorState#getState()} (for every
+	 *            the first time {@link OperatorState#value()} (for every
 	 *            state partition) is called before
-	 *            {@link OperatorState#updateState(Object)}.
+	 *            {@link OperatorState#update(Object)}.
 	 * @param partitioned
 	 *            Sets whether partitioning should be applied for the given
 	 *            state. If true a partitioner key must be used.
@@ -216,9 +216,9 @@ public interface RuntimeContext {
 	 *            can be used by the same operator.
 	 * @param defaultState
 	 *            Default value for the operator state. This will be returned
-	 *            the first time {@link OperatorState#getState()} (for every
+	 *            the first time {@link OperatorState#value()} (for every
 	 *            state partition) is called before
-	 *            {@link OperatorState#updateState(Object)}.
+	 *            {@link OperatorState#update(Object)}.
 	 * @param partitioned
 	 *            Sets whether partitioning should be applied for the given
 	 *            state. If true a partitioner key must be used.

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
index 4198a50..955b35b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
@@ -27,8 +27,8 @@ import org.apache.flink.api.common.functions.MapFunction;
  * partitioned (when state partitioning is defined in the program) or
  * non-partitioned user states.
  * 
- * State can be accessed and manipulated using the {@link #getState()} and
- * {@link #updateState(T)} methods. These calls are only safe in the
+ * State can be accessed and manipulated using the {@link #value()} and
+ * {@link #update(T)} methods. These calls are only safe in the
  * transformation call the operator represents, for instance inside
  * {@link MapFunction#map()} and can lead tp unexpected behavior in the
  * {@link #open(org.apache.flink.configuration.Configuration)} or
@@ -40,28 +40,28 @@ import org.apache.flink.api.common.functions.MapFunction;
 public interface OperatorState<T> {
 
 	/**
-	 * Gets the current state for the operator. When the state is not
-	 * partitioned the returned state is the same for all inputs in a given
-	 * operator instance. If state partitioning is applied, the state returned
+	 * Returns the current value for the state. When the state is not
+	 * partitioned the returned value is the same for all inputs in a given
+	 * operator instance. If state partitioning is applied, the value returned
 	 * depends on the current operator input, as the operator maintains an
 	 * independent state for each partition.
 	 * 
-	 * @return The operator state corresponding to the current input.
+	 * @return The operator state value corresponding to the current input.
 	 * 
 	 * @throws IOException Thrown if the system cannot access the state.
 	 */
-	T getState() throws IOException;
+	T value() throws IOException;
 
 	/**
-	 * Updates the operator state accessible by {@link #getState()} to the given
-	 * value. The next time {@link #getState()} is called (for the same state
+	 * Updates the operator state accessible by {@link #value()} to the given
+	 * value. The next time {@link #value()} is called (for the same state
 	 * partition) the returned state will represent the updated value.
 	 * 
 	 * @param state
-	 *            The new state.
+	 *            The new value for the state.
 	 *            
 	 * @throws IOException Thrown if the system cannot access the state.
 	 */
-	void updateState(T state) throws IOException;
+	void update(T value) throws IOException;
 	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index befbef6..6758f2c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -148,17 +148,17 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 		this.lastOffsets = getRuntimeContext().getOperatorState("offset", new long[numPartitions], false);
 		this.commitedOffsets = new long[numPartitions];
 		// check if there are offsets to restore
-		if (!Arrays.equals(lastOffsets.getState(), new long[numPartitions])) {
-			if (lastOffsets.getState().length != numPartitions) {
-				throw new IllegalStateException("There are "+lastOffsets.getState().length+" offsets to restore for topic "+topicName+" but " +
+		if (!Arrays.equals(lastOffsets.value(), new long[numPartitions])) {
+			if (lastOffsets.value().length != numPartitions) {
+				throw new IllegalStateException("There are "+lastOffsets.value().length+" offsets to restore for topic "+topicName+" but " +
 						"there are only "+numPartitions+" in the topic");
 			}
 
-			LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(lastOffsets.getState()));
-			setOffsetsInZooKeeper(lastOffsets.getState());
+			LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(lastOffsets.value()));
+			setOffsetsInZooKeeper(lastOffsets.value());
 		} else {
 			// initialize empty offsets
-			Arrays.fill(this.lastOffsets.getState(), -1);
+			Arrays.fill(this.lastOffsets.value(), -1);
 		}
 		Arrays.fill(this.commitedOffsets, 0); // just to make it clear
 		
@@ -175,7 +175,7 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 		
 		while (running && iteratorToRead.hasNext()) {
 			MessageAndMetadata<byte[], byte[]> message = iteratorToRead.next();
-			if(lastOffsets.getState()[message.partition()] >= message.offset()) {
+			if(lastOffsets.value()[message.partition()] >= message.offset()) {
 				LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition());
 				continue;
 			}
@@ -188,7 +188,7 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 
 			// make the state update and the element emission atomic
 			synchronized (checkpointLock) {
-				lastOffsets.getState()[message.partition()] = message.offset();
+				lastOffsets.value()[message.partition()] = message.offset();
 				ctx.collect(next);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
index 9a2ba4c..2d74e38 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
@@ -63,14 +63,14 @@ public class StatefulSequenceSource extends RichParallelSourceFunction<Long> {
 					((end - start + 1) / stepSize + 1) :
 					((end - start + 1) / stepSize);
 					
-		Long currentCollected = collected.getState();
+		Long currentCollected = collected.value();
 
 		while (isRunning && currentCollected < toCollect) {
 			synchronized (checkpointLock) {
 				ctx.collect(currentCollected * stepSize + congruence);
-				collected.updateState(currentCollected + 1);
+				collected.update(currentCollected + 1);
 			}
-			currentCollected = collected.getState();
+			currentCollected = collected.value();
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
index 808b7c8..bfc160f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
@@ -69,7 +69,7 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
 	}
 
 	@Override
-	public S getState() throws IOException{
+	public S value() throws IOException{
 		if (currentInput == null) {
 			throw new IllegalStateException("Need a valid input for accessing the state.");
 		} else {
@@ -87,7 +87,7 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
 	}
 
 	@Override
-	public void updateState(S state) throws IOException {
+	public void update(S state) throws IOException {
 		if (state == null) {
 			throw new RuntimeException("Cannot set state to null.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
index 1699c27..a80d730 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
@@ -59,12 +59,12 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
 	}
 
 	@Override
-	public S getState() throws IOException {
+	public S value() throws IOException {
 		return state;
 	}
 
 	@Override
-	public void updateState(S state) throws IOException {
+	public void update(S state) throws IOException {
 		if (state == null) {
 			throw new RuntimeException("Cannot set state to null.");
 		}
@@ -72,8 +72,8 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
 	}
 	
 	public void setDefaultState(S defaultState) throws IOException {
-		if (getState() == null) {
-			updateState(defaultState);
+		if (value() == null) {
+			update(defaultState);
 		}
 	}
 
@@ -92,12 +92,12 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
 	public Map<Serializable, StateHandle<C>> snapshotState(long checkpointId,
 			long checkpointTimestamp) throws Exception {
 		return ImmutableMap.of(DEFAULTKEY, provider.createStateHandle(checkpointer.snapshotState(
-				getState(), checkpointId, checkpointTimestamp)));
+				value(), checkpointId, checkpointTimestamp)));
 
 	}
 
 	public void restoreState(Map<Serializable, StateHandle<C>> snapshots) throws Exception {
-		updateState(checkpointer.restoreState(snapshots.get(DEFAULTKEY).getState()));
+		update(checkpointer.restoreState(snapshots.get(DEFAULTKEY).getState()));
 	}
 
 	public Map<Serializable, S> getPartitionedState() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 774b431..8c7ffeb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -71,9 +71,9 @@ public class StatefulOperatorTest {
 		processInputs(map, Arrays.asList(1, 2, 3, 4, 5));
 
 		assertEquals(Arrays.asList("1", "2", "3", "4", "5"), out);
-		assertEquals((Integer) 5, context.getOperatorState("counter", 0, false).getState());
+		assertEquals((Integer) 5, context.getOperatorState("counter", 0, false).value());
 		assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState());
-		assertEquals("12345", context.getOperatorState("concat", "", false).getState());
+		assertEquals("12345", context.getOperatorState("concat", "", false).value());
 		assertEquals((Integer) 5, ((StatefulMapper) map.getUserFunction()).checkpointedCounter);
 
 		byte[] serializedState = InstantiationUtil.serializeObject(map.getStateSnapshotFromFunction(1, 1));
@@ -81,19 +81,19 @@ public class StatefulOperatorTest {
 		StreamMap<Integer, String> restoredMap = createOperatorWithContext(out, new ModKey(2), serializedState);
 		StreamingRuntimeContext restoredContext = restoredMap.getRuntimeContext();
 
-		assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).getState());
+		assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).value());
 		assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState());
-		assertEquals("12345", restoredContext.getOperatorState("concat", "", false).getState());
+		assertEquals("12345", restoredContext.getOperatorState("concat", "", false).value());
 		assertEquals((Integer) 5, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);
 		out.clear();
 
 		processInputs(restoredMap, Arrays.asList(7, 8));
 
 		assertEquals(Arrays.asList("7", "8"), out);
-		assertEquals((Integer) 7, restoredContext.getOperatorState("counter", 0, false).getState());
+		assertEquals((Integer) 7, restoredContext.getOperatorState("counter", 0, false).value());
 		assertEquals(ImmutableMap.of(0, 3, 1, 4), restoredContext.getOperatorStates().get("groupCounter")
 				.getPartitionedState());
-		assertEquals("1234578", restoredContext.getOperatorState("concat", "", false).getState());
+		assertEquals("1234578", restoredContext.getOperatorState("concat", "", false).value());
 		assertEquals((Integer) 7, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);
 
 	}
@@ -176,12 +176,12 @@ public class StatefulOperatorTest {
 
 		@Override
 		public String map(Integer value) throws Exception {
-			counter.updateState(counter.getState() + 1);
-			groupCounter.updateState(groupCounter.getState() + 1);
-			concat.updateState(concat.getState() + value.toString());
+			counter.update(counter.value() + 1);
+			groupCounter.update(groupCounter.value() + 1);
+			concat.update(concat.value() + value.toString());
 			checkpointedCounter++;
 			try {
-				counter.updateState(null);
+				counter.update(null);
 				fail();
 			} catch (RuntimeException e){
 			}
@@ -235,7 +235,7 @@ public class StatefulOperatorTest {
 		
 		@Override
 		public String map(Integer value) throws Exception {
-			groupCounter.updateState(groupCounter.getState() + 1);
+			groupCounter.update(groupCounter.value() + 1);
 			
 			return value.toString();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index e2430d6..a826eff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -198,7 +198,7 @@ public class StreamCheckpointingITCase {
 		static final long[] counts = new long[PARALLELISM];
 		@Override
 		public void close() throws IOException {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = index.getState();
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = index.value();
 		}
 
 
@@ -222,8 +222,8 @@ public class StreamCheckpointingITCase {
 		public void run(SourceContext<String> ctx) throws Exception {
 			final Object lockingObject = ctx.getCheckpointLock();
 
-			while (isRunning && index.getState() < numElements) {
-				char first = (char) ((index.getState() % 40) + 40);
+			while (isRunning && index.value() < numElements) {
+				char first = (char) ((index.value() % 40) + 40);
 
 				stringBuilder.setLength(0);
 				stringBuilder.append(first);
@@ -231,7 +231,7 @@ public class StreamCheckpointingITCase {
 				String result = randomString(stringBuilder, rnd);
 
 				synchronized (lockingObject) {
-					index.updateState(index.getState() + step);
+					index.update(index.value() + step);
 					ctx.collect(result);
 				}
 			}
@@ -261,7 +261,7 @@ public class StreamCheckpointingITCase {
 
 		@Override
 		public PrefixCount map(PrefixCount value) throws Exception {
-			count.updateState(count.getState() + 1);
+			count.update(count.value() + 1);
 			return value;
 		}
 
@@ -272,7 +272,7 @@ public class StreamCheckpointingITCase {
 
 		@Override
 		public void close() throws IOException {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState();
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
 		}
 		
 	}
@@ -370,7 +370,7 @@ public class StreamCheckpointingITCase {
 		
 		@Override
 		public PrefixCount map(String value) throws IOException {
-			count.updateState(count.getState() + 1);
+			count.update(count.value() + 1);
 			return new PrefixCount(value.substring(0, 1), value, 1L);
 		}
 		
@@ -381,7 +381,7 @@ public class StreamCheckpointingITCase {
 
 		@Override
 		public void close() throws IOException {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState();
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index d8c925d..decc861 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -133,7 +133,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 			final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
 			boolean checkForProceedFile = true;
 
-			while (isRunning && collected.getState() < toCollect) {
+			while (isRunning && collected.value() < toCollect) {
 				// check if the proceed file exists (then we go full speed)
 				// if not, we always recheck and sleep
 				if (checkForProceedFile) {
@@ -146,8 +146,8 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 				}
 
 				synchronized (checkpointLock) {
-					sourceCtx.collect(collected.getState() * stepSize + congruence);
-					collected.updateState(collected.getState() + 1);
+					sourceCtx.collect(collected.value() * stepSize + congruence);
+					collected.update(collected.value() + 1);
 				}
 			}
 		}