You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/07/07 12:45:23 UTC
flink git commit: [FLINK-2323] [api-breaking] Rename OperatorState
interface methods to value() and update(..)
Repository: flink
Updated Branches:
refs/heads/master 0e21941e3 -> 0d2c49005
[FLINK-2323] [api-breaking] Rename OperatorState interface methods to value() and update(..)
Closes #890
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d2c4900
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d2c4900
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d2c4900
Branch: refs/heads/master
Commit: 0d2c49005449d6e05bbf53446edac928bb7ecbb6
Parents: 0e21941
Author: Gyula Fora <gy...@apache.org>
Authored: Tue Jul 7 11:07:05 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Jul 7 12:42:29 2015 +0200
----------------------------------------------------------------------
docs/apis/streaming_guide.md | 6 +++---
.../api/common/functions/RuntimeContext.java | 8 +++----
.../flink/api/common/state/OperatorState.java | 22 ++++++++++----------
.../api/persistent/PersistentKafkaSource.java | 16 +++++++-------
.../source/StatefulSequenceSource.java | 6 +++---
.../state/PartitionedStreamOperatorState.java | 4 ++--
.../api/state/StreamOperatorState.java | 12 +++++------
.../api/state/StatefulOperatorTest.java | 22 ++++++++++----------
.../StreamCheckpointingITCase.java | 16 +++++++-------
.../ProcessFailureStreamingRecoveryITCase.java | 6 +++---
10 files changed, 59 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index c612b69..7d8ab6d 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1220,7 +1220,7 @@ Flink supports the checkpointing and persistence of user defined operator states
Flink supports two types of operator states: partitioned and non-partitioned states.
-In case of non-partitioned operator state, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.getState()` is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `getState()` will return number of inputs processed by each parallel mapper.
+In case of non-partitioned operator state, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.value()` is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `value()` will return number of inputs processed by each parallel mapper.
In case of of partitioned operator state a separate state is maintained for each received key. This can be used for instance to count received inputs by different keys, or store and update summary statistics of different sub-streams.
@@ -1244,7 +1244,7 @@ public class CounterSum implements RichReduceFunction<Long> {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
- counter.updateState(counter.getState() + 1);
+ counter.update(counter.value() + 1);
return value1 + value2;
}
@@ -1275,7 +1275,7 @@ public static class CounterSource implements RichParallelSourceFunction<Long> {
// output and state update are atomic
synchronized (lock){
ctx.collect(offset);
- offset.updateState(offset.getState() + 1);
+ offset.update(offset.value() + 1);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 4c8e924..eb84d1c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -184,9 +184,9 @@ public interface RuntimeContext {
* can be used by the same operator.
* @param defaultState
* Default value for the operator state. This will be returned
- * the first time {@link OperatorState#getState()} (for every
+ * the first time {@link OperatorState#value()} (for every
* state partition) is called before
- * {@link OperatorState#updateState(Object)}.
+ * {@link OperatorState#update(Object)}.
* @param partitioned
* Sets whether partitioning should be applied for the given
* state. If true a partitioner key must be used.
@@ -216,9 +216,9 @@ public interface RuntimeContext {
* can be used by the same operator.
* @param defaultState
* Default value for the operator state. This will be returned
- * the first time {@link OperatorState#getState()} (for every
+ * the first time {@link OperatorState#value()} (for every
* state partition) is called before
- * {@link OperatorState#updateState(Object)}.
+ * {@link OperatorState#update(Object)}.
* @param partitioned
* Sets whether partitioning should be applied for the given
* state. If true a partitioner key must be used.
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
index 4198a50..955b35b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
@@ -27,8 +27,8 @@ import org.apache.flink.api.common.functions.MapFunction;
* partitioned (when state partitioning is defined in the program) or
* non-partitioned user states.
*
- * State can be accessed and manipulated using the {@link #getState()} and
- * {@link #updateState(T)} methods. These calls are only safe in the
+ * State can be accessed and manipulated using the {@link #value()} and
+ * {@link #update(T)} methods. These calls are only safe in the
* transformation call the operator represents, for instance inside
* {@link MapFunction#map()} and can lead tp unexpected behavior in the
* {@link #open(org.apache.flink.configuration.Configuration)} or
@@ -40,28 +40,28 @@ import org.apache.flink.api.common.functions.MapFunction;
public interface OperatorState<T> {
/**
- * Gets the current state for the operator. When the state is not
- * partitioned the returned state is the same for all inputs in a given
- * operator instance. If state partitioning is applied, the state returned
+ * Returns the current value for the state. When the state is not
+ * partitioned the returned value is the same for all inputs in a given
+ * operator instance. If state partitioning is applied, the value returned
* depends on the current operator input, as the operator maintains an
* independent state for each partition.
*
- * @return The operator state corresponding to the current input.
+ * @return The operator state value corresponding to the current input.
*
* @throws IOException Thrown if the system cannot access the state.
*/
- T getState() throws IOException;
+ T value() throws IOException;
/**
- * Updates the operator state accessible by {@link #getState()} to the given
- * value. The next time {@link #getState()} is called (for the same state
+ * Updates the operator state accessible by {@link #value()} to the given
+ * value. The next time {@link #value()} is called (for the same state
* partition) the returned state will represent the updated value.
*
* @param state
- * The new state.
+ * The new value for the state.
*
* @throws IOException Thrown if the system cannot access the state.
*/
- void updateState(T state) throws IOException;
+ void update(T value) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index befbef6..6758f2c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -148,17 +148,17 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
this.lastOffsets = getRuntimeContext().getOperatorState("offset", new long[numPartitions], false);
this.commitedOffsets = new long[numPartitions];
// check if there are offsets to restore
- if (!Arrays.equals(lastOffsets.getState(), new long[numPartitions])) {
- if (lastOffsets.getState().length != numPartitions) {
- throw new IllegalStateException("There are "+lastOffsets.getState().length+" offsets to restore for topic "+topicName+" but " +
+ if (!Arrays.equals(lastOffsets.value(), new long[numPartitions])) {
+ if (lastOffsets.value().length != numPartitions) {
+ throw new IllegalStateException("There are "+lastOffsets.value().length+" offsets to restore for topic "+topicName+" but " +
"there are only "+numPartitions+" in the topic");
}
- LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(lastOffsets.getState()));
- setOffsetsInZooKeeper(lastOffsets.getState());
+ LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(lastOffsets.value()));
+ setOffsetsInZooKeeper(lastOffsets.value());
} else {
// initialize empty offsets
- Arrays.fill(this.lastOffsets.getState(), -1);
+ Arrays.fill(this.lastOffsets.value(), -1);
}
Arrays.fill(this.commitedOffsets, 0); // just to make it clear
@@ -175,7 +175,7 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
while (running && iteratorToRead.hasNext()) {
MessageAndMetadata<byte[], byte[]> message = iteratorToRead.next();
- if(lastOffsets.getState()[message.partition()] >= message.offset()) {
+ if(lastOffsets.value()[message.partition()] >= message.offset()) {
LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition());
continue;
}
@@ -188,7 +188,7 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
// make the state update and the element emission atomic
synchronized (checkpointLock) {
- lastOffsets.getState()[message.partition()] = message.offset();
+ lastOffsets.value()[message.partition()] = message.offset();
ctx.collect(next);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
index 9a2ba4c..2d74e38 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
@@ -63,14 +63,14 @@ public class StatefulSequenceSource extends RichParallelSourceFunction<Long> {
((end - start + 1) / stepSize + 1) :
((end - start + 1) / stepSize);
- Long currentCollected = collected.getState();
+ Long currentCollected = collected.value();
while (isRunning && currentCollected < toCollect) {
synchronized (checkpointLock) {
ctx.collect(currentCollected * stepSize + congruence);
- collected.updateState(currentCollected + 1);
+ collected.update(currentCollected + 1);
}
- currentCollected = collected.getState();
+ currentCollected = collected.value();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
index 808b7c8..bfc160f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
@@ -69,7 +69,7 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
}
@Override
- public S getState() throws IOException{
+ public S value() throws IOException{
if (currentInput == null) {
throw new IllegalStateException("Need a valid input for accessing the state.");
} else {
@@ -87,7 +87,7 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
}
@Override
- public void updateState(S state) throws IOException {
+ public void update(S state) throws IOException {
if (state == null) {
throw new RuntimeException("Cannot set state to null.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
index 1699c27..a80d730 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
@@ -59,12 +59,12 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
}
@Override
- public S getState() throws IOException {
+ public S value() throws IOException {
return state;
}
@Override
- public void updateState(S state) throws IOException {
+ public void update(S state) throws IOException {
if (state == null) {
throw new RuntimeException("Cannot set state to null.");
}
@@ -72,8 +72,8 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
}
public void setDefaultState(S defaultState) throws IOException {
- if (getState() == null) {
- updateState(defaultState);
+ if (value() == null) {
+ update(defaultState);
}
}
@@ -92,12 +92,12 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
public Map<Serializable, StateHandle<C>> snapshotState(long checkpointId,
long checkpointTimestamp) throws Exception {
return ImmutableMap.of(DEFAULTKEY, provider.createStateHandle(checkpointer.snapshotState(
- getState(), checkpointId, checkpointTimestamp)));
+ value(), checkpointId, checkpointTimestamp)));
}
public void restoreState(Map<Serializable, StateHandle<C>> snapshots) throws Exception {
- updateState(checkpointer.restoreState(snapshots.get(DEFAULTKEY).getState()));
+ update(checkpointer.restoreState(snapshots.get(DEFAULTKEY).getState()));
}
public Map<Serializable, S> getPartitionedState() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 774b431..8c7ffeb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -71,9 +71,9 @@ public class StatefulOperatorTest {
processInputs(map, Arrays.asList(1, 2, 3, 4, 5));
assertEquals(Arrays.asList("1", "2", "3", "4", "5"), out);
- assertEquals((Integer) 5, context.getOperatorState("counter", 0, false).getState());
+ assertEquals((Integer) 5, context.getOperatorState("counter", 0, false).value());
assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState());
- assertEquals("12345", context.getOperatorState("concat", "", false).getState());
+ assertEquals("12345", context.getOperatorState("concat", "", false).value());
assertEquals((Integer) 5, ((StatefulMapper) map.getUserFunction()).checkpointedCounter);
byte[] serializedState = InstantiationUtil.serializeObject(map.getStateSnapshotFromFunction(1, 1));
@@ -81,19 +81,19 @@ public class StatefulOperatorTest {
StreamMap<Integer, String> restoredMap = createOperatorWithContext(out, new ModKey(2), serializedState);
StreamingRuntimeContext restoredContext = restoredMap.getRuntimeContext();
- assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).getState());
+ assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).value());
assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState());
- assertEquals("12345", restoredContext.getOperatorState("concat", "", false).getState());
+ assertEquals("12345", restoredContext.getOperatorState("concat", "", false).value());
assertEquals((Integer) 5, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);
out.clear();
processInputs(restoredMap, Arrays.asList(7, 8));
assertEquals(Arrays.asList("7", "8"), out);
- assertEquals((Integer) 7, restoredContext.getOperatorState("counter", 0, false).getState());
+ assertEquals((Integer) 7, restoredContext.getOperatorState("counter", 0, false).value());
assertEquals(ImmutableMap.of(0, 3, 1, 4), restoredContext.getOperatorStates().get("groupCounter")
.getPartitionedState());
- assertEquals("1234578", restoredContext.getOperatorState("concat", "", false).getState());
+ assertEquals("1234578", restoredContext.getOperatorState("concat", "", false).value());
assertEquals((Integer) 7, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);
}
@@ -176,12 +176,12 @@ public class StatefulOperatorTest {
@Override
public String map(Integer value) throws Exception {
- counter.updateState(counter.getState() + 1);
- groupCounter.updateState(groupCounter.getState() + 1);
- concat.updateState(concat.getState() + value.toString());
+ counter.update(counter.value() + 1);
+ groupCounter.update(groupCounter.value() + 1);
+ concat.update(concat.value() + value.toString());
checkpointedCounter++;
try {
- counter.updateState(null);
+ counter.update(null);
fail();
} catch (RuntimeException e){
}
@@ -235,7 +235,7 @@ public class StatefulOperatorTest {
@Override
public String map(Integer value) throws Exception {
- groupCounter.updateState(groupCounter.getState() + 1);
+ groupCounter.update(groupCounter.value() + 1);
return value.toString();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index e2430d6..a826eff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -198,7 +198,7 @@ public class StreamCheckpointingITCase {
static final long[] counts = new long[PARALLELISM];
@Override
public void close() throws IOException {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = index.getState();
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = index.value();
}
@@ -222,8 +222,8 @@ public class StreamCheckpointingITCase {
public void run(SourceContext<String> ctx) throws Exception {
final Object lockingObject = ctx.getCheckpointLock();
- while (isRunning && index.getState() < numElements) {
- char first = (char) ((index.getState() % 40) + 40);
+ while (isRunning && index.value() < numElements) {
+ char first = (char) ((index.value() % 40) + 40);
stringBuilder.setLength(0);
stringBuilder.append(first);
@@ -231,7 +231,7 @@ public class StreamCheckpointingITCase {
String result = randomString(stringBuilder, rnd);
synchronized (lockingObject) {
- index.updateState(index.getState() + step);
+ index.update(index.value() + step);
ctx.collect(result);
}
}
@@ -261,7 +261,7 @@ public class StreamCheckpointingITCase {
@Override
public PrefixCount map(PrefixCount value) throws Exception {
- count.updateState(count.getState() + 1);
+ count.update(count.value() + 1);
return value;
}
@@ -272,7 +272,7 @@ public class StreamCheckpointingITCase {
@Override
public void close() throws IOException {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState();
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
}
}
@@ -370,7 +370,7 @@ public class StreamCheckpointingITCase {
@Override
public PrefixCount map(String value) throws IOException {
- count.updateState(count.getState() + 1);
+ count.update(count.value() + 1);
return new PrefixCount(value.substring(0, 1), value, 1L);
}
@@ -381,7 +381,7 @@ public class StreamCheckpointingITCase {
@Override
public void close() throws IOException {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState();
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2c4900/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index d8c925d..decc861 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -133,7 +133,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
boolean checkForProceedFile = true;
- while (isRunning && collected.getState() < toCollect) {
+ while (isRunning && collected.value() < toCollect) {
// check if the proceed file exists (then we go full speed)
// if not, we always recheck and sleep
if (checkForProceedFile) {
@@ -146,8 +146,8 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
synchronized (checkpointLock) {
- sourceCtx.collect(collected.getState() * stepSize + congruence);
- collected.updateState(collected.getState() + 1);
+ sourceCtx.collect(collected.value() * stepSize + congruence);
+ collected.update(collected.value() + 1);
}
}
}