You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:30 UTC
[03/24] flink git commit: [FLINK-2808] [streaming] Refactor and
extend state backend abstraction
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index e5a1c23..c503a1f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -48,11 +47,6 @@ import static org.junit.Assert.assertTrue;
* this barriers are correctly forwarded.
*
* <p>
- * This uses a mixture of Operators with the {@link Checkpointed} interface and the new
- * {@link org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext#getOperatorState}
- * method.
- *
- * <p>
* The test triggers a failure after a while and verifies that, after completion, the
* state reflects the "exactly once" semantics.
*/
@@ -142,25 +136,21 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
// --------------------------------------------------------------------------------------------
private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
- implements ParallelSourceFunction<String> {
+ implements ParallelSourceFunction<String>, Checkpointed<Integer> {
+ static final long[] counts = new long[PARALLELISM];
+
private final long numElements;
private Random rnd;
private StringBuilder stringBuilder;
- private OperatorState<Integer> index;
+ private int index;
private int step;
- private volatile boolean isRunning;
-
- static final long[] counts = new long[PARALLELISM];
- @Override
- public void close() throws IOException {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = index.value();
- }
-
+ private volatile boolean isRunning = true;
+
StringGeneratingSourceFunction(long numElements) {
this.numElements = numElements;
}
@@ -169,20 +159,19 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
public void open(Configuration parameters) throws IOException {
rnd = new Random();
stringBuilder = new StringBuilder();
+
step = getRuntimeContext().getNumberOfParallelSubtasks();
-
-
- index = getRuntimeContext().getOperatorState("index", getRuntimeContext().getIndexOfThisSubtask(), false);
-
- isRunning = true;
+ if (index == 0) {
+ index = getRuntimeContext().getIndexOfThisSubtask();
+ }
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
final Object lockingObject = ctx.getCheckpointLock();
- while (isRunning && index.value() < numElements) {
- char first = (char) ((index.value() % 40) + 40);
+ while (isRunning && index < numElements) {
+ char first = (char) ((index % 40) + 40);
stringBuilder.setLength(0);
stringBuilder.append(first);
@@ -190,13 +179,18 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
String result = randomString(stringBuilder, rnd);
synchronized (lockingObject) {
- index.update(index.value() + step);
+ index += step;
ctx.collect(result);
}
}
}
@Override
+ public void close() throws IOException {
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
+ }
+
+ @Override
public void cancel() {
isRunning = false;
}
@@ -211,29 +205,46 @@ public class CoStreamCheckpointingITCase extends StreamFaultToleranceTestBase {
return bld.toString();
}
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return index;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ index = state;
+ }
}
- private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> {
+ private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>
+ implements Checkpointed<Long> {
- private OperatorState<Long> count;
static final long[] counts = new long[PARALLELISM];
+
+ private long count;
+
@Override
public PrefixCount map(PrefixCount value) throws Exception {
- count.update(count.value() + 1);
+ count++;
return value;
}
@Override
- public void open(Configuration conf) throws IOException {
- count = getRuntimeContext().getOperatorState("count", 0L, false);
+ public void close() throws IOException {
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
}
@Override
- public void close() throws IOException {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+ return count;
}
+ @Override
+ public void restoreState(Long state) {
+ count = state;
+ }
}
private static class OnceFailingReducer extends RichReduceFunction<PrefixCount> {
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index 108e1e6..0fcedda 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -29,10 +29,10 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -83,20 +83,21 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
// Custom Functions
// --------------------------------------------------------------------------------------------
- private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer> {
+ private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer>
+ implements Checkpointed<Integer> {
private final long numElements;
- private OperatorState<Integer> index;
+ private int index;
private int step;
- private volatile boolean isRunning;
+ private volatile boolean isRunning = true;
static final long[] counts = new long[PARALLELISM];
@Override
public void close() throws IOException {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = index.value();
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
}
IntGeneratingSourceFunction(long numElements) {
@@ -106,22 +107,18 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
@Override
public void open(Configuration parameters) throws IOException {
step = getRuntimeContext().getNumberOfParallelSubtasks();
-
- index = getRuntimeContext().getOperatorState("index",
- getRuntimeContext().getIndexOfThisSubtask(), false);
-
- isRunning = true;
+ index = getRuntimeContext().getIndexOfThisSubtask();
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
final Object lockingObject = ctx.getCheckpointLock();
- while (isRunning && index.value() < numElements) {
+ while (isRunning && index < numElements) {
synchronized (lockingObject) {
- index.update(index.value() + step);
- ctx.collect(index.value() % 40);
+ index += step;
+ ctx.collect(index % 40);
}
}
}
@@ -130,11 +127,22 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
public void cancel() {
isRunning = false;
}
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return index;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ index = state;
+ }
}
private static class OnceFailingPartitionedSum extends RichMapFunction<Integer, Tuple2<Integer, Long>> {
private static Map<Integer, Long> allSums = new ConcurrentHashMap<Integer, Long>();
+
private static volatile boolean hasFailed = false;
private final long numElements;
@@ -157,7 +165,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
count = 0;
- sum = getRuntimeContext().getOperatorState("sum", 0L, true);
+ sum = getRuntimeContext().getKeyValueState(Long.class, 0L);
}
@Override
@@ -183,20 +191,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
@Override
public void open(Configuration parameters) throws IOException {
- counts = getRuntimeContext().getOperatorState("count", NonSerializableLong.of(0L), true,
- new StateCheckpointer<NonSerializableLong, String>() {
-
- @Override
- public String snapshotState(NonSerializableLong state, long id, long ts) {
- return state.value.toString();
- }
-
- @Override
- public NonSerializableLong restoreState(String stateSnapshot) {
- return NonSerializableLong.of(Long.parseLong(stateSnapshot));
- }
-
- });
+ counts = getRuntimeContext().getKeyValueState(NonSerializableLong.class, NonSerializableLong.of(0L));
}
@Override
@@ -204,7 +199,6 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
long currentCount = counts.value().value + 1;
counts.update(NonSerializableLong.of(currentCount));
allCounts.put(value.f0, currentCount);
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
index 8a75de5..f517f83 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
@@ -265,28 +265,33 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
}
}
- private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> {
+ private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>
+ implements Checkpointed<Long> {
static final long[] counts = new long[PARALLELISM];
- private OperatorState<Long> count;
+ private long count;
@Override
public PrefixCount map(PrefixCount value) throws Exception {
- count.update(count.value() + 1);
+ count++;
return value;
}
@Override
- public void open(Configuration conf) throws IOException {
- count = getRuntimeContext().getOperatorState("count", 0L, false);
+ public void close() throws IOException {
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
}
@Override
- public void close() throws IOException {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+ return count;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ count = state;
}
-
}
private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount>
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 270cfaa..08af93a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
@@ -101,14 +100,16 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
@Override
public void postSubmit() {
- List[][] checkList = new List[][]{ GeneratingSourceFunction.completedCheckpoints,
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ List<Long>[][] checkList = new List[][] {
+ GeneratingSourceFunction.completedCheckpoints,
IdentityMapFunction.completedCheckpoints,
LongRichFilterFunction.completedCheckpoints,
LeftIdentityCoRichFlatMapFunction.completedCheckpoints};
long failureCheckpointID = OnceFailingReducer.failureCheckpointID;
- for(List[] parallelNotifications : checkList) {
+ for(List<Long>[] parallelNotifications : checkList) {
for (int i = 0; i < PARALLELISM; i++){
List<Long> notifications = parallelNotifications[i];
assertTrue("No checkpoint notification was received.",
@@ -134,21 +135,23 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
* interface it stores all the checkpoint ids it has seen in a static list.
*/
private static class GeneratingSourceFunction extends RichSourceFunction<Long>
- implements ParallelSourceFunction<Long>, CheckpointNotifier {
+ implements ParallelSourceFunction<Long>, CheckpointNotifier, Checkpointed<Integer> {
- // operator life cycle
- private volatile boolean isRunning;
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ static List<Long>[] completedCheckpoints = new List[PARALLELISM];
+
// operator behaviour
private final long numElements;
private long result;
- private OperatorState<Integer> index;
+ private int index;
private int step;
// test behaviour
private int subtaskId;
- public static List[] completedCheckpoints = new List[PARALLELISM];
+
+ private volatile boolean isRunning = true;
GeneratingSourceFunction(long numElements) {
this.numElements = numElements;
@@ -158,26 +161,27 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
public void open(Configuration parameters) throws IOException {
step = getRuntimeContext().getNumberOfParallelSubtasks();
subtaskId = getRuntimeContext().getIndexOfThisSubtask();
- index = getRuntimeContext().getOperatorState("index", subtaskId, false);
+
+ if (index == 0) {
+ index = subtaskId;
+ }
// Create a collection on the first open
if (completedCheckpoints[subtaskId] == null) {
- completedCheckpoints[subtaskId] = new ArrayList();
+ completedCheckpoints[subtaskId] = new ArrayList<>();
}
-
- isRunning = true;
}
@Override
public void run(SourceContext<Long> ctx) throws Exception {
final Object lockingObject = ctx.getCheckpointLock();
- while (isRunning && index.value() < numElements) {
+ while (isRunning && index < numElements) {
- result = index.value() % 10;
+ result = index % 10;
synchronized (lockingObject) {
- index.update(index.value() + step);
+ index += step;
ctx.collect(result);
}
}
@@ -192,6 +196,16 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
public void notifyCheckpointComplete(long checkpointId) throws Exception {
completedCheckpoints[subtaskId].add(checkpointId);
}
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return index;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ index = state;
+ }
}
/**
@@ -201,7 +215,9 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
private static class IdentityMapFunction extends RichMapFunction<Long, Tuple1<Long>>
implements CheckpointNotifier {
- public static List[] completedCheckpoints = new List[PARALLELISM];
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static List<Long>[] completedCheckpoints = new List[PARALLELISM];
+
private int subtaskId;
@Override
@@ -215,7 +231,7 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
// Create a collection on the first open
if (completedCheckpoints[subtaskId] == null) {
- completedCheckpoints[subtaskId] = new ArrayList();
+ completedCheckpoints[subtaskId] = new ArrayList<>();
}
}
@@ -283,7 +299,9 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
private static class LongRichFilterFunction extends RichFilterFunction<Long>
implements CheckpointNotifier {
- public static List[] completedCheckpoints = new List[PARALLELISM];
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ static List<Long>[] completedCheckpoints = new List[PARALLELISM];
+
private int subtaskId;
@Override
@@ -297,7 +315,7 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
// Create a collection on the first open
if (completedCheckpoints[subtaskId] == null) {
- completedCheckpoints[subtaskId] = new ArrayList();
+ completedCheckpoints[subtaskId] = new ArrayList<>();
}
}
@@ -315,7 +333,8 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<Long, Long, Long>
implements CheckpointNotifier {
- public static List[] completedCheckpoints = new List[PARALLELISM];
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static List<Long>[] completedCheckpoints = new List[PARALLELISM];
private int subtaskId;
@Override
@@ -324,7 +343,7 @@ public class StreamCheckpointNotifierITCase extends StreamFaultToleranceTestBase
// Create a collection on the first open
if (completedCheckpoints[subtaskId] == null) {
- completedCheckpoints[subtaskId] = new ArrayList();
+ completedCheckpoints[subtaskId] = new ArrayList<>();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 0804d53..992a679 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -36,7 +35,6 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
/**
* A simple test that runs a streaming topology with checkpointing enabled.
@@ -62,14 +60,15 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
stream
// -------------- first vertex, chained to the source ----------------
- .filter(new StringRichFilterFunction()).shuffle()
+ .filter(new StringRichFilterFunction())
+ .shuffle()
// -------------- seconds vertex - the stateful one that also fails ----------------
.map(new StringPrefixCountRichMapFunction())
.startNewChain()
.map(new StatefulCounterFunction())
- // -------------- third vertex - counter and the sink ----------------
+ // -------------- third vertex - counter and the sink ----------------
.keyBy("prefix")
.map(new OnceFailingPrefixCounter(NUM_STRINGS))
.addSink(new SinkFunction<PrefixCount>() {
@@ -118,22 +117,23 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
// --------------------------------------------------------------------------------------------
private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
- implements ParallelSourceFunction<String> {
+ implements ParallelSourceFunction<String>, Checkpointed<Integer> {
private final long numElements;
- private Random rnd;
- private StringBuilder stringBuilder;
+ private final Random rnd = new Random();
+ private final StringBuilder stringBuilder = new StringBuilder();
- private OperatorState<Integer> index;
+ private int index;
private int step;
- private volatile boolean isRunning;
+ private volatile boolean isRunning = true;
static final long[] counts = new long[PARALLELISM];
+
@Override
public void close() throws IOException {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = index.value();
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
}
@@ -143,22 +143,18 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
@Override
public void open(Configuration parameters) throws IOException {
- rnd = new Random();
- stringBuilder = new StringBuilder();
step = getRuntimeContext().getNumberOfParallelSubtasks();
-
-
- index = getRuntimeContext().getOperatorState("index", getRuntimeContext().getIndexOfThisSubtask(), false);
-
- isRunning = true;
+ if (index == 0) {
+ index = getRuntimeContext().getIndexOfThisSubtask();
+ }
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
final Object lockingObject = ctx.getCheckpointLock();
- while (isRunning && index.value() < numElements) {
- char first = (char) ((index.value() % 40) + 40);
+ while (isRunning && index < numElements) {
+ char first = (char) ((index % 40) + 40);
stringBuilder.setLength(0);
stringBuilder.append(first);
@@ -166,7 +162,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
String result = randomString(stringBuilder, rnd);
synchronized (lockingObject) {
- index.update(index.value() + step);
+ index += step;
ctx.collect(result);
}
}
@@ -187,6 +183,16 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
return bld.toString();
}
+
+ @Override
+ public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+ return index;
+ }
+
+ @Override
+ public void restoreState(Integer state) {
+ index = state;
+ }
}
private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> implements Checkpointed<Long> {
@@ -215,8 +221,12 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
count = state;
}
}
-
- private static class OnceFailingPrefixCounter extends RichMapFunction<PrefixCount, PrefixCount> {
+
+ /**
+ * This function uses simultaneously the key/value state and is checkpointed.
+ */
+ private static class OnceFailingPrefixCounter extends RichMapFunction<PrefixCount, PrefixCount>
+ implements Checkpointed<Long> {
private static Map<String, Long> prefixCounts = new ConcurrentHashMap<String, Long>();
static final long[] counts = new long[PARALLELISM];
@@ -229,7 +239,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
private long count;
private OperatorState<Long> pCount;
- private OperatorState<Long> inputCount;
+ private long inputCount;
OnceFailingPrefixCounter(long numElements) {
this.numElements = numElements;
@@ -242,13 +252,12 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
count = 0;
- pCount = getRuntimeContext().getOperatorState("prefix-count", 0L, true);
- inputCount = getRuntimeContext().getOperatorState("input-count", 0L, false);
+ pCount = getRuntimeContext().getKeyValueState(Long.class, 0L);
}
@Override
public void close() throws IOException {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = inputCount.value();
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = inputCount;
}
@Override
@@ -256,9 +265,9 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
count++;
if (!hasFailed && count >= failurePos) {
hasFailed = true;
- throw new Exception("Test Failure");
+// throw new Exception("Test Failure");
}
- inputCount.update(inputCount.value() + 1);
+ inputCount++;
long currentPrefixCount = pCount.value() + value.count;
pCount.update(currentPrefixCount);
@@ -266,12 +275,23 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
value.count = currentPrefixCount;
return value;
}
+
+ @Override
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+ return inputCount;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ inputCount = state;
+ }
}
private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
-
- Long count = 0L;
+
static final long[] counts = new long[PARALLELISM];
+
+ private long count;
@Override
public boolean filter(String value) {
@@ -285,7 +305,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) {
return count;
}
@@ -296,49 +316,31 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
}
private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
- implements Checkpointed<Integer> {
-
- OperatorState<Long> count;
+ implements Checkpointed<Long> {
+
static final long[] counts = new long[PARALLELISM];
+
+ private long count;
@Override
public PrefixCount map(String value) throws IOException {
- count.update(count.value() + 1);
+ count++;
return new PrefixCount(value.substring(0, 1), value, 1L);
}
-
- @Override
- public void open(Configuration conf) throws IOException {
- this.count = getRuntimeContext().getOperatorState("count", 0L, false,
- new StateCheckpointer<Long, String>() {
-
- @Override
- public String snapshotState(Long state, long id, long ts) {
- return state.toString();
- }
-
- @Override
- public Long restoreState(String stateSnapshot) {
- return Long.parseLong(stateSnapshot);
- }
-
- });
- }
@Override
public void close() throws IOException {
- counts[getRuntimeContext().getIndexOfThisSubtask()] = count.value();
+ counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return null;
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+ return count;
}
@Override
- public void restoreState(Integer state) {
- // verify that we never store/restore null state
- fail();
+ public void restoreState(Long state) {
+ count = state;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
index cb02d2f..c12bcb9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -22,10 +22,10 @@ import com.google.common.collect.EvictingQueue;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -142,32 +142,36 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
* Produces a sequence multiple times for each parallelism instance of downstream operators,
* augmented by the designated parallel subtaskId. The source is not parallel to ensure order.
*/
- private static class StatefulMultipleSequence extends RichSourceFunction<Tuple2<Integer, Long>>{
+ private static class StatefulMultipleSequence extends RichSourceFunction<Tuple2<Integer, Long>>
+ implements Checkpointed<Long> {
- private transient OperatorState<Long> count;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- count = getRuntimeContext().getOperatorState("count", 0L, false);
- }
+ private long count;
@Override
public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception {
Object lock = ctx.getCheckpointLock();
- while (count.value() < NUM_INPUT){
+ while (count < NUM_INPUT){
synchronized (lock){
for (int i = 0; i < PARALLELISM; i++) {
- ctx.collect(Tuple2.of(i, count.value() + 1));
+ ctx.collect(Tuple2.of(i, count + 1));
}
- count.update(count.value() + 1);
+ count++;
}
}
}
@Override
- public void cancel() {
+ public void cancel() {}
+
+ @Override
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+ return count;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ count = state;
}
}
@@ -175,14 +179,15 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
* Mapper that causes one failure between seeing 40% to 70% of the records.
*/
private static class OnceFailingIdentityMapFunction
- extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>> {
+ extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>>
+ implements Checkpointed<Long> {
private static volatile boolean hasFailed = false;
private final long numElements;
private long failurePos;
- private OperatorState<Long> count;
+ private long count;
public OnceFailingIdentityMapFunction(long numElements) {
this.numElements = numElements;
@@ -194,19 +199,28 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
- count = getRuntimeContext().getOperatorState("count", 0L, false);
}
@Override
public Tuple2<Integer, Long> map(Tuple2<Integer, Long> value) throws Exception {
- if (!hasFailed && count.value() >= failurePos) {
+ if (!hasFailed && count >= failurePos) {
hasFailed = true;
throw new Exception("Test Failure");
}
- count.update(count.value() + 1);
+ count++;
return value;
}
+
+ @Override
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+ return count;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ count = state;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index dab6a6d..989db14 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -23,6 +23,7 @@ import java.io.File;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.state.filesystem.FsStateBackendFactory;
import org.apache.flink.test.testdata.KMeansData;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Rule;
@@ -55,7 +56,8 @@ public class ClassLoaderITCase {
// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
- config.setString(ConfigConstants.STATE_BACKEND_FS_DIR, "file://" + folder.newFolder().getAbsolutePath());
+ config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
+ folder.newFolder().getAbsoluteFile().toURI().toString());
ForkableFlinkMiniCluster testCluster = new ForkableFlinkMiniCluster(config, false);
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index e7b1668..054b321 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -18,9 +18,6 @@
package org.apache.flink.test.recovery;
-
-import static org.junit.Assert.assertTrue;
-
import java.io.File;
import java.io.IOException;
import java.util.UUID;
@@ -29,17 +26,19 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FileStateHandle;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
+
import org.junit.Assert;
+import static org.junit.Assert.assertTrue;
+
/**
* Test for streaming program behaviour in case of TaskManager failure
* based on {@link AbstractProcessFailureRecoveryTest}.
@@ -72,7 +71,8 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
env.getConfig().disableSysoutLogging();
env.setNumberOfExecutionRetries(1);
env.enableCheckpointing(200);
- env.setStateHandleProvider(FileStateHandle.createProvider(tempCheckpointDir.getAbsolutePath()));
+
+ env.setStateBackend(new FsStateBackend(tempCheckpointDir.getAbsoluteFile().toURI()));
DataStream<Long> result = env.addSource(new SleepyDurableGenerateSequence(coordinateDir, DATA_COUNT))
// add a non-chained no-op map to test the chain state restore logic
@@ -104,7 +104,8 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
}
- public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long> {
+ public static class SleepyDurableGenerateSequence extends RichParallelSourceFunction<Long>
+ implements Checkpointed<Long> {
private static final long SLEEP_TIME = 50;
@@ -113,7 +114,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
private volatile boolean isRunning = true;
- private OperatorState<Long> collected;
+ private long collected;
public SleepyDurableGenerateSequence(File coordinateDir, long end) {
this.coordinateDir = coordinateDir;
@@ -133,7 +134,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
final File proceedFile = new File(coordinateDir, PROCEED_MARKER_FILE);
boolean checkForProceedFile = true;
- while (isRunning && collected.value() < toCollect) {
+ while (isRunning && collected < toCollect) {
// check if the proceed file exists (then we go full speed)
// if not, we always recheck and sleep
if (checkForProceedFile) {
@@ -146,21 +147,26 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
}
synchronized (checkpointLock) {
- sourceCtx.collect(collected.value() * stepSize + congruence);
- collected.update(collected.value() + 1);
+ sourceCtx.collect(collected * stepSize + congruence);
+ collected++;
}
}
}
-
- @Override
- public void open(Configuration conf) throws IOException {
- collected = getRuntimeContext().getOperatorState("count", 0L, false);
- }
@Override
public void cancel() {
isRunning = false;
}
+
+ @Override
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) {
+ return collected;
+ }
+
+ @Override
+ public void restoreState(Long state) {
+ collected = state;
+ }
}
public static class Mapper extends RichMapFunction<Long, Long> {