You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/20 05:40:09 UTC
[13/18] flink git commit: [FLINK-5113] Port functions in tests to new
CheckpointedFunction IF.
[FLINK-5113] Port functions in tests to new CheckpointedFunction IF.
This closes #2939.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/525edf1e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/525edf1e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/525edf1e
Branch: refs/heads/master
Commit: 525edf1e6925b55302d991ddf537a2f16caba21d
Parents: 570dbc8
Author: kl0u <kk...@gmail.com>
Authored: Tue Nov 22 19:23:33 2016 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 19 23:57:22 2017 +0100
----------------------------------------------------------------------
.../CassandraTupleWriteAheadSinkExample.java | 22 ++-
.../fs/RollingSinkFaultToleranceITCase.java | 18 ++-
.../BucketingSinkFaultToleranceITCase.java | 18 ++-
.../connectors/kafka/KafkaConsumerTestBase.java | 22 ++-
.../kafka/testutils/FailingIdentityMapper.java | 18 ++-
.../testutils/ValidatingExactlyOnceSink.java | 26 ++--
.../org/apache/flink/util/CollectionUtil.java | 2 +-
.../streaming/api/checkpoint/Checkpointed.java | 3 +-
.../runtime/tasks/SourceStreamTaskTest.java | 12 +-
...tractEventTimeWindowCheckpointingITCase.java | 43 ++++--
.../CoStreamCheckpointingITCase.java | 74 ++++++----
.../EventTimeAllWindowCheckpointingITCase.java | 38 +++--
.../PartitionedStateCheckpointingITCase.java | 17 ++-
.../test/checkpointing/RescalingITCase.java | 27 ++--
.../test/checkpointing/SavepointITCase.java | 14 +-
.../checkpointing/StateCheckpointedITCase.java | 94 +++++++-----
.../StreamCheckpointNotifierITCase.java | 31 ++--
.../StreamCheckpointingITCase.java | 79 ++++++----
.../UdfStreamOperatorCheckpointingITCase.java | 14 +-
.../WindowCheckpointingITCase.java | 33 +++--
.../jar/CheckpointedStreamingProgram.java | 28 ++--
.../jar/CheckpointingCustomKvStateProgram.java | 21 +--
.../jar/LegacyCheckpointedStreamingProgram.java | 148 +++++++++++++++++++
.../flink/test/recovery/ChaosMonkeyITCase.java | 38 +++--
.../JobManagerHACheckpointRecoveryITCase.java | 41 +++--
...erProcessFailureStreamingRecoveryITCase.java | 30 ++--
26 files changed, 608 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
index 811c410..23de949 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java
@@ -21,12 +21,14 @@ import com.datastax.driver.core.Cluster;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import java.util.Collections;
+import java.util.List;
import java.util.UUID;
/**
@@ -50,6 +52,9 @@ public class CassandraTupleWriteAheadSinkExample {
.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.enableWriteAheadLog()
.setClusterBuilder(new ClusterBuilder() {
+
+ private static final long serialVersionUID = 2793938419775311824L;
+
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
@@ -62,7 +67,9 @@ public class CassandraTupleWriteAheadSinkExample {
env.execute();
}
- public static class MySource implements SourceFunction<Tuple2<String, Integer>>, Checkpointed<Integer> {
+ public static class MySource implements SourceFunction<Tuple2<String, Integer>>, ListCheckpointed<Integer> {
+ private static final long serialVersionUID = 4022367939215095610L;
+
private int counter = 0;
private boolean stop = false;
@@ -84,13 +91,16 @@ public class CassandraTupleWriteAheadSinkExample {
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return counter;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.counter);
}
@Override
- public void restoreState(Integer state) throws Exception {
- this.counter = state;
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.counter = state.get(0);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
index 36c0d03..2d8492f 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.fs;
import com.google.common.collect.Sets;
import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -42,7 +42,9 @@ import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.regex.Matcher;
@@ -236,7 +238,7 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
}
private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
- implements CheckpointedAsynchronously<Integer> {
+ implements ListCheckpointed<Integer> {
private static final long serialVersionUID = 1L;
@@ -246,7 +248,6 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
private volatile boolean isRunning = true;
-
StringGeneratingSourceFunction(long numElements) {
this.numElements = numElements;
}
@@ -288,13 +289,16 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return index;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.index);
}
@Override
- public void restoreState(Integer state) {
- index = state;
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.index = state.get(0);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
index 54703a3..85f23b6 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.fs.bucketing;
import com.google.common.collect.Sets;
import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -42,7 +42,9 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.BufferedReader;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.regex.Matcher;
@@ -233,7 +235,7 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB
}
private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
- implements CheckpointedAsynchronously<Integer> {
+ implements ListCheckpointed<Integer> {
private static final long serialVersionUID = 1L;
@@ -243,7 +245,6 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB
private volatile boolean isRunning = true;
-
StringGeneratingSourceFunction(long numElements) {
this.numElements = numElements;
}
@@ -285,13 +286,16 @@ public class BucketingSinkFaultToleranceITCase extends StreamFaultToleranceTestB
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return index;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(index);
}
@Override
- public void restoreState(Integer state) {
- index = state;
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.index = state.get(0);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index aa7ea49..d7fab88 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -51,21 +51,16 @@ import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
@@ -73,7 +68,6 @@ import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidating
import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
@@ -1925,7 +1919,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
public static class BrokerKillingMapper<T> extends RichMapFunction<T,T>
- implements Checkpointed<Integer>, CheckpointListener {
+ implements ListCheckpointed<Integer>, CheckpointListener {
private static final long serialVersionUID = 6334389850158707313L;
@@ -1939,7 +1933,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
private boolean failer;
private boolean hasBeenCheckpointed;
-
public BrokerKillingMapper(int shutdownBrokerId, int failCount) {
this.shutdownBrokerId = shutdownBrokerId;
this.failCount = failCount;
@@ -1994,13 +1987,16 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return numElementsTotal;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.numElementsTotal);
}
@Override
- public void restoreState(Integer state) {
- this.numElementsTotal = state;
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.numElementsTotal = state.get(0);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
index 2bd400c..ec64b00 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
@@ -21,13 +21,16 @@ package org.apache.flink.streaming.connectors.kafka.testutils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.List;
+
public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
- Checkpointed<Integer>, CheckpointListener, Runnable {
+ ListCheckpointed<Integer>, CheckpointListener, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(FailingIdentityMapper.class);
@@ -89,13 +92,16 @@ public class FailingIdentityMapper<T> extends RichMapFunction<T,T> implements
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return numElementsTotal;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(numElementsTotal);
}
@Override
- public void restoreState(Integer state) {
- numElementsTotal = state;
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.numElementsTotal = state.get(0);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
index 7813561..46e70fd 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ValidatingExactlyOnceSink.java
@@ -19,15 +19,17 @@
package org.apache.flink.streaming.connectors.kafka.testutils;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.SuccessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
-public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements Checkpointed<Tuple2<Integer, BitSet>> {
+public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> implements ListCheckpointed<Tuple2<Integer, BitSet>> {
private static final Logger LOG = LoggerFactory.getLogger(ValidatingExactlyOnceSink.class);
@@ -39,7 +41,6 @@ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> impleme
private int numElements; // this is checkpointed
-
public ValidatingExactlyOnceSink(int numElementsTotal) {
this.numElementsTotal = numElementsTotal;
}
@@ -68,15 +69,20 @@ public class ValidatingExactlyOnceSink extends RichSinkFunction<Integer> impleme
}
@Override
- public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp) {
- LOG.info("Snapshot of counter "+numElements+" at checkpoint "+checkpointId);
- return new Tuple2<>(numElements, duplicateChecker);
+ public List<Tuple2<Integer, BitSet>> snapshotState(long checkpointId, long timestamp) throws Exception {
+ LOG.info("Snapshot of counter " + numElements + " at checkpoint " + checkpointId);
+ return Collections.singletonList(new Tuple2<>(numElements, duplicateChecker));
}
@Override
- public void restoreState(Tuple2<Integer, BitSet> state) {
- LOG.info("restoring num elements to {}", state.f0);
- this.numElements = state.f0;
- this.duplicateChecker = state.f1;
+ public void restoreState(List<Tuple2<Integer, BitSet>> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+
+ Tuple2<Integer, BitSet> s = state.get(0);
+ LOG.info("restoring num elements to {}", s.f0);
+ this.numElements = s.f0;
+ this.duplicateChecker = s.f1;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
index 15d00ae..cd5c2e5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/CollectionUtil.java
@@ -34,4 +34,4 @@ public final class CollectionUtil {
public static boolean isNullOrEmpty(Map<?, ?> map) {
return map == null || map.isEmpty();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
index 7af5cea..fb67ea7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
@@ -31,8 +31,7 @@ import java.io.Serializable;
* state is written, the function is not called, so the function needs not return a
* copy of its state, but may return a reference to its state. Functions that can
* continue to work and mutate the state, even while the state snapshot is being accessed,
- * can implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously}
- * interface.</p>
+ * can implement the {@link CheckpointedAsynchronously} interface.</p>
*
* @param <T> The type of the operator state.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index b592fe8..dd1fe58 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -34,6 +34,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.Serializable;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
@@ -135,7 +136,7 @@ public class SourceStreamTaskTest {
}
}
- private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> {
+ private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, ListCheckpointed<Serializable> {
private static final long serialVersionUID = 1;
private int maxElements;
@@ -181,7 +182,7 @@ public class SourceStreamTaskTest {
}
@Override
- public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ public List<Serializable> snapshotState(long checkpointId, long timestamp) throws Exception {
if (!semaphore.tryAcquire()) {
Assert.fail("Concurrent invocation of snapshotState.");
}
@@ -199,11 +200,12 @@ public class SourceStreamTaskTest {
Assert.fail("Count is different at start end end of snapshot.");
}
semaphore.release();
- return sum;
+ return Collections.<Serializable>singletonList(sum);
}
@Override
- public void restoreState(Serializable state) {}
+ public void restoreState(List<Serializable> state) throws Exception {
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 583e42f..1911f44 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -53,7 +53,9 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -495,7 +497,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
// ------------------------------------------------------------------------
private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
- implements Checkpointed<Integer>, CheckpointListener
+ implements ListCheckpointed<Integer>, CheckpointListener
{
private static volatile boolean failedBefore = false;
@@ -567,13 +569,16 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return numElementsEmitted;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.numElementsEmitted);
}
@Override
- public void restoreState(Integer state) {
- numElementsEmitted = state;
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.numElementsEmitted = state.get(0);
}
public static void reset() {
@@ -582,7 +587,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
}
private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
- implements Checkpointed<HashMap<Long, Integer>> {
+ implements ListCheckpointed<HashMap<Long, Integer>> {
private final HashMap<Long, Integer> windowCounts = new HashMap<>();
@@ -676,19 +681,22 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
}
@Override
- public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
- return this.windowCounts;
+ public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.windowCounts);
}
@Override
- public void restoreState(HashMap<Long, Integer> state) {
- this.windowCounts.putAll(state);
+ public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ windowCounts.putAll(state.get(0));
}
}
// Sink for validating the stateful window counts
private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
- implements Checkpointed<HashMap<Long, Integer>> {
+ implements ListCheckpointed<HashMap<Long, Integer>> {
private final HashMap<Long, Integer> windowCounts = new HashMap<>();
@@ -757,13 +765,16 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
}
@Override
- public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
- return this.windowCounts;
+ public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.windowCounts);
}
@Override
- public void restoreState(HashMap<Long, Integer> state) {
- this.windowCounts.putAll(state);
+ public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.windowCounts.putAll(state.get(0));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index 51a00b9..06d3ab0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
@@ -38,6 +38,8 @@ import org.junit.Test;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
import java.util.Random;
import static org.junit.Assert.assertEquals;
@@ -142,11 +144,11 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
/**
* A generating source that is slow before the first two checkpoints went through
* and will indefinitely stall at a certain point to allow the checkpoint to complete.
- *
+ *
* After the checkpoints are through, it continues with full speed.
*/
private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
- implements Checkpointed<Integer>, CheckpointListener {
+ implements ListCheckpointed<Integer>, CheckpointListener {
private static volatile int numCompletedCheckpoints = 0;
@@ -210,13 +212,16 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return index;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.index);
}
@Override
- public void restoreState(Integer state) {
- index = state;
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.index = state.get(0);
}
@Override
@@ -239,12 +244,11 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
}
private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>
- implements Checkpointed<Long> {
+ implements ListCheckpointed<Long> {
static final long[] counts = new long[PARALLELISM];
private long count;
-
@Override
public PrefixCount map(PrefixCount value) throws Exception {
@@ -258,13 +262,16 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.count);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.count = state.get(0);
}
}
@@ -303,7 +310,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
}
}
- private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
+ private static class StringRichFilterFunction extends RichFilterFunction<String> implements ListCheckpointed<Long> {
static final long[] counts = new long[PARALLELISM];
@@ -321,18 +328,21 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return count;
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.count);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.count = state.get(0);
}
}
- private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements Checkpointed<Long> {
-
+ private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount> implements ListCheckpointed<Long> {
+
static final long[] counts = new long[PARALLELISM];
private long count;
@@ -344,13 +354,16 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.count);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.count = state.get(0);
}
@Override
@@ -359,7 +372,7 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
}
}
- private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String> implements Checkpointed<Long> {
+ private static class LeftIdentityCoRichFlatMapFunction extends RichCoFlatMapFunction<String, String, String> implements ListCheckpointed<Long> {
static final long[] counts = new long[PARALLELISM];
@@ -378,13 +391,16 @@ public class CoStreamCheckpointingITCase extends StreamingMultipleProgramsTestBa
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.count);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.count = state.get(0);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index b493e42..09c1437 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -43,14 +43,16 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.flink.test.util.TestUtils.tryExecute;
import static org.junit.Assert.*;
/**
- * This verfies that checkpointing works correctly with event time windows.
+ * This verifies that checkpointing works correctly with event time windows.
*
* <p>
* This is a version of {@link AbstractEventTimeWindowCheckpointingITCase} for All-Windows.
@@ -432,7 +434,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
// ------------------------------------------------------------------------
private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
- implements Checkpointed<Integer>, CheckpointListener
+ implements ListCheckpointed<Integer>, CheckpointListener
{
private static volatile boolean failedBefore = false;
@@ -502,23 +504,26 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
numSuccessfulCheckpoints++;
}
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return numElementsEmitted;
+ public static void reset() {
+ failedBefore = false;
}
@Override
- public void restoreState(Integer state) {
- numElementsEmitted = state;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.numElementsEmitted);
}
- public static void reset() {
- failedBefore = false;
+ @Override
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.numElementsEmitted = state.get(0);
}
}
private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
- implements Checkpointed<HashMap<Long, Integer>> {
+ implements ListCheckpointed<HashMap<Long, Integer>> {
private final HashMap<Long, Integer> windowCounts = new HashMap<>();
@@ -612,13 +617,16 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
}
@Override
- public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
- return this.windowCounts;
+ public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.windowCounts);
}
@Override
- public void restoreState(HashMap<Long, Integer> state) {
- this.windowCounts.putAll(state);
+ public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.windowCounts.putAll(state.get(0));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
index 0728b41..4761d70 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
@@ -33,7 +35,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -86,7 +88,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
// --------------------------------------------------------------------------------------------
private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer>
- implements Checkpointed<Integer> {
+ implements ListCheckpointed<Integer> {
private final long numElements;
@@ -133,13 +135,16 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return index;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.index);
}
@Override
- public void restoreState(Integer state) {
- index = state;
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.index = state.get(0);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index bd1678e..8045d82 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -41,7 +41,6 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -66,6 +65,7 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -763,7 +763,7 @@ public class RescalingITCase extends TestLogger {
}
}
- private static class SubtaskIndexNonPartitionedStateSource extends SubtaskIndexSource implements Checkpointed<Integer> {
+ private static class SubtaskIndexNonPartitionedStateSource extends SubtaskIndexSource implements ListCheckpointed<Integer> {
private static final long serialVersionUID = 8388073059042040203L;
@@ -772,13 +772,16 @@ public class RescalingITCase extends TestLogger {
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return counter;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.counter);
}
@Override
- public void restoreState(Integer state) throws Exception {
- counter = state;
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.counter = state.get(0);
}
}
@@ -879,18 +882,20 @@ public class RescalingITCase extends TestLogger {
}
}
- private static class NonPartitionedStateSource extends StateSourceBase implements Checkpointed<Integer> {
+ private static class NonPartitionedStateSource extends StateSourceBase implements ListCheckpointed<Integer> {
private static final long serialVersionUID = -8108185918123186841L;
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return counter;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.counter);
}
@Override
- public void restoreState(Integer state) throws Exception {
- counter = state;
+ public void restoreState(List<Integer> state) throws Exception {
+ if (!state.isEmpty()) {
+ this.counter = state.get(0);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 9f957e5..77777d1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -63,7 +63,6 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseS
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseSubmitTaskListener;
import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -527,7 +526,7 @@ public class SavepointITCase extends TestLogger {
private static class StatefulCounter
extends RichMapFunction<Integer, Integer>
- implements Checkpointed<byte[]>, CheckpointListener {
+ implements ListCheckpointed<byte[]>, CheckpointListener {
private static final Object checkpointLock = new Object();
private static int numCompleteCalls;
@@ -556,13 +555,16 @@ public class SavepointITCase extends TestLogger {
}
@Override
- public byte[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return data;
+ public List<byte[]> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(data);
}
@Override
- public void restoreState(byte[] data) throws Exception {
- this.data = data;
+ public void restoreState(List<byte[]> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.data = state.get(0);
synchronized (checkpointLock) {
if (++numRestoreCalls == getRuntimeContext().getNumberOfParallelSubtasks()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
index 9d37b59..32d9e23 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
@@ -24,8 +24,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -34,9 +33,10 @@ import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -47,7 +47,7 @@ import static org.junit.Assert.assertTrue;
* A simple test that runs a streaming topology with checkpointing enabled.
*
* The test triggers a failure after a while and verifies that, after completion, the
- * state defined with either the {@link ValueState} or the {@link Checkpointed}
+ * state defined with either the {@link ValueState} or the {@link ListCheckpointed}
* interface reflects the "exactly once" semantics.
*
* The test throttles the input until at least two checkpoints are completed, to make sure that
@@ -139,15 +139,14 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
// --------------------------------------------------------------------------------------------
private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
- implements CheckpointedAsynchronously<Integer>
+ implements ListCheckpointed<Integer>
{
private final long numElements;
private int index;
private volatile boolean isRunning = true;
-
-
+
StringGeneratingSourceFunction(long numElements) {
this.numElements = numElements;
}
@@ -197,23 +196,26 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return index;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.index);
}
@Override
- public void restoreState(Integer state) {
- index = state;
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.index = state.get(0);
}
}
private static class StringRichFilterFunction extends RichFilterFunction<String>
- implements Checkpointed<Long> {
+ implements ListCheckpointed<Long> {
static final long[] counts = new long[PARALLELISM];
private long count;
-
+
@Override
public boolean filter(String value) throws Exception {
count++;
@@ -226,23 +228,26 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.count);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.count = state.get(0);
}
}
private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
- implements CheckpointedAsynchronously<Long> {
+ implements ListCheckpointed<Long> {
static final long[] counts = new long[PARALLELISM];
private long count;
-
+
@Override
public PrefixCount map(String value) {
count++;
@@ -255,18 +260,21 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.count);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.count = state.get(0);
}
}
private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount>
- implements Checkpointed<Long> {
+ implements ListCheckpointed<Long> {
static final long[] counts = new long[PARALLELISM];
@@ -284,18 +292,21 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.count);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.count = state.get(0);
}
}
private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount>
- implements Checkpointed<HashMap<String, PrefixCount>>, CheckpointListener {
+ implements ListCheckpointed<HashMap<String, PrefixCount>>, CheckpointListener {
static boolean wasCheckpointedBeforeFailure = false;
@@ -307,7 +318,6 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
private long count;
private boolean wasCheckpointed;
-
OnceFailingAggregator(long failurePos) {
this.failurePos = failurePos;
@@ -339,13 +349,16 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
}
@Override
- public HashMap<String, PrefixCount> snapshotState(long checkpointId, long checkpointTimestamp) {
- return aggregationMap;
+ public List<HashMap<String, PrefixCount>> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.aggregationMap);
}
@Override
- public void restoreState(HashMap<String, PrefixCount> state) {
- aggregationMap.putAll(state);
+ public void restoreState(List<HashMap<String, PrefixCount>> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.aggregationMap.putAll(state.get(0));
}
@Override
@@ -355,7 +368,7 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
}
private static class ValidatingSink extends RichSinkFunction<PrefixCount>
- implements Checkpointed<HashMap<Character, Long>> {
+ implements ListCheckpointed<HashMap<Character, Long>> {
@SuppressWarnings("unchecked")
private static Map<Character, Long>[] maps = (Map<Character, Long>[]) new Map<?, ?>[PARALLELISM];
@@ -379,13 +392,16 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
}
@Override
- public HashMap<Character, Long> snapshotState(long checkpointId, long checkpointTimestamp) {
- return counts;
+ public List<HashMap<Character, Long>> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.counts);
}
@Override
- public void restoreState(HashMap<Character, Long> state) {
- counts.putAll(state);
+ public void restoreState(List<HashMap<Character, Long>> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.counts.putAll(state.get(0));
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 6bf511f..be3fac5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -205,7 +206,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
* interface it stores all the checkpoint ids it has seen in a static list.
*/
private static class GeneratingSourceFunction extends RichSourceFunction<Long>
- implements ParallelSourceFunction<Long>, CheckpointListener, Checkpointed<Integer> {
+ implements ParallelSourceFunction<Long>, CheckpointListener, ListCheckpointed<Integer> {
static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM);
@@ -263,13 +264,16 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return index;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.index);
}
@Override
- public void restoreState(Integer state) {
- index = state;
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.index = state.get(0);
}
@Override
@@ -390,7 +394,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
* Reducer that causes one failure between seeing 40% to 70% of the records.
*/
private static class OnceFailingReducer extends RichReduceFunction<Tuple1<Long>>
- implements Checkpointed<Long>, CheckpointListener
+ implements ListCheckpointed<Long>, CheckpointListener
{
static volatile boolean hasFailed = false;
static volatile long failureCheckpointID;
@@ -402,7 +406,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
private volatile long count;
private volatile boolean notificationAlready;
-
+
OnceFailingReducer(long numElements) {
this.failurePos = (long) (0.5 * numElements / PARALLELISM);
}
@@ -419,19 +423,22 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 0) {
LOG.info(">>>>>>>>>>>>>>>>> Throwing Exception <<<<<<<<<<<<<<<<<<<<<");
hasFailed = true;
failureCheckpointID = checkpointId;
throw new Exception("Test Failure");
}
- return count;
+ return Collections.singletonList(this.count);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.count = state.get(0);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index b97e1f2..aae04c9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -31,6 +31,8 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
@@ -41,7 +43,7 @@ import static org.junit.Assert.assertEquals;
* A simple test that runs a streaming topology with checkpointing enabled.
*
* The test triggers a failure after a while and verifies that, after completion, the
- * state defined with the {@link Checkpointed} interface reflects the "exactly once" semantics.
+ * state defined with the {@link ListCheckpointed} interface reflects the "exactly once" semantics.
*/
@SuppressWarnings("serial")
public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
@@ -118,7 +120,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
// --------------------------------------------------------------------------------------------
private static class StringGeneratingSourceFunction extends RichSourceFunction<String>
- implements ParallelSourceFunction<String>, Checkpointed<Integer> {
+ implements ParallelSourceFunction<String>, ListCheckpointed<Integer> {
private final long numElements;
@@ -131,7 +133,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
private volatile boolean isRunning = true;
static final long[] counts = new long[PARALLELISM];
-
+
@Override
public void close() throws IOException {
counts[getRuntimeContext().getIndexOfThisSubtask()] = index;
@@ -186,17 +188,20 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return index;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.index);
}
@Override
- public void restoreState(Integer state) {
- index = state;
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.index = state.get(0);
}
}
- private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> implements Checkpointed<Long> {
+ private static class StatefulCounterFunction extends RichMapFunction<PrefixCount, PrefixCount> implements ListCheckpointed<Long> {
private long count;
static final long[] counts = new long[PARALLELISM];
@@ -213,13 +218,16 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return count;
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.count);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.count = state.get(0);
}
}
@@ -227,7 +235,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
* This function uses simultaneously the key/value state and is checkpointed.
*/
private static class OnceFailingPrefixCounter extends RichMapFunction<PrefixCount, PrefixCount>
- implements Checkpointed<Long> {
+ implements ListCheckpointed<Long> {
private static Map<String, Long> prefixCounts = new ConcurrentHashMap<String, Long>();
static final long[] counts = new long[PARALLELISM];
@@ -238,7 +246,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
private long failurePos;
private long count;
-
+
private ValueState<Long> pCount;
private long inputCount;
@@ -279,22 +287,25 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return inputCount;
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.inputCount);
}
@Override
- public void restoreState(Long state) {
- inputCount = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.inputCount = state.get(0);
}
}
- private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
+ private static class StringRichFilterFunction extends RichFilterFunction<String> implements ListCheckpointed<Long> {
static final long[] counts = new long[PARALLELISM];
private long count;
-
+
@Override
public boolean filter(String value) {
count++;
@@ -307,23 +318,26 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.count);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.count = state.get(0);
}
}
private static class StringPrefixCountRichMapFunction extends RichMapFunction<String, PrefixCount>
- implements Checkpointed<Long> {
+ implements ListCheckpointed<Long> {
static final long[] counts = new long[PARALLELISM];
private long count;
-
+
@Override
public PrefixCount map(String value) throws IOException {
count++;
@@ -336,13 +350,16 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase {
}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.count);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.count = state.get(0);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
index 5874f56..dae5cd9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UdfStreamOperatorCheckpointingITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -146,7 +145,7 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
* augmented by the designated parallel subtaskId. The source is not parallel to ensure order.
*/
private static class StatefulMultipleSequence extends RichSourceFunction<Tuple2<Integer, Long>>
- implements Checkpointed<Long> {
+ implements ListCheckpointed<Long> {
private long count;
@@ -168,13 +167,16 @@ public class UdfStreamOperatorCheckpointingITCase extends StreamFaultToleranceTe
public void cancel() {}
@Override
- public Long snapshotState(long checkpointId, long checkpointTimestamp) {
- return count;
+ public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.count);
}
@Override
- public void restoreState(Long state) {
- count = state;
+ public void restoreState(List<Long> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.count = state.get(0);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index e424a8d..a45349d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -28,7 +28,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@@ -46,7 +46,9 @@ import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -310,7 +312,7 @@ public class WindowCheckpointingITCase extends TestLogger {
// ------------------------------------------------------------------------
private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
- implements Checkpointed<Integer>, CheckpointListener
+ implements ListCheckpointed<Integer>, CheckpointListener
{
private static volatile boolean failedBefore = false;
@@ -373,13 +375,16 @@ public class WindowCheckpointingITCase extends TestLogger {
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
- return numElementsEmitted;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.numElementsEmitted);
}
@Override
- public void restoreState(Integer state) {
- numElementsEmitted = state;
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.numElementsEmitted = state.get(0);
}
public static void reset() {
@@ -388,7 +393,7 @@ public class WindowCheckpointingITCase extends TestLogger {
}
private static class ValidatingSink extends RichSinkFunction<Tuple2<Long, IntType>>
- implements Checkpointed<HashMap<Long, Integer>> {
+ implements ListCheckpointed<HashMap<Long, Integer>> {
private final HashMap<Long, Integer> counts = new HashMap<>();
@@ -439,18 +444,20 @@ public class WindowCheckpointingITCase extends TestLogger {
}
@Override
- public HashMap<Long, Integer> snapshotState(long checkpointId, long checkpointTimestamp) {
- return this.counts;
+ public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.counts);
}
@Override
- public void restoreState(HashMap<Long, Integer> state) {
- this.counts.putAll(state);
+ public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.counts.putAll(state.get(0));
- for (Integer i : state.values()) {
+ for (Integer i : state.get(0).values()) {
this.aggCount += i;
}
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
index f91582f..52a3ba8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
@@ -21,13 +21,15 @@ package org.apache.flink.test.classloading.jar;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.lang.RuntimeException;
+import java.util.Collections;
+import java.util.List;
/**
* A simple streaming program, which is using the state checkpointing of Flink.
@@ -57,7 +59,7 @@ public class CheckpointedStreamingProgram {
// with Checkpoining
- public static class SimpleStringGenerator implements SourceFunction<String>, Checkpointed<Integer> {
+ public static class SimpleStringGenerator implements SourceFunction<String>, ListCheckpointed<Integer> {
public boolean running = true;
@Override
@@ -74,32 +76,36 @@ public class CheckpointedStreamingProgram {
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return null;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.emptyList();
}
@Override
- public void restoreState(Integer state) {
+ public void restoreState(List<Integer> state) throws Exception {
}
}
- public static class StatefulMapper implements MapFunction<String, String>, Checkpointed<StatefulMapper>, CheckpointListener {
+ public static class StatefulMapper implements MapFunction<String, String>, ListCheckpointed<StatefulMapper>, CheckpointListener {
private String someState;
private boolean atLeastOneSnapshotComplete = false;
private boolean restored = false;
@Override
- public StatefulMapper snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return this;
+ public List<StatefulMapper> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this);
}
@Override
- public void restoreState(StatefulMapper state) {
+ public void restoreState(List<StatefulMapper> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
restored = true;
- this.someState = state.someState;
- this.atLeastOneSnapshotComplete = state.atLeastOneSnapshotComplete;
+ StatefulMapper s = state.get(0);
+ this.someState = s.someState;
+ this.atLeastOneSnapshotComplete = s.atLeastOneSnapshotComplete;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/525edf1e/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
index 6796cb0..d3baa7d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
@@ -34,7 +34,7 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
@@ -42,6 +42,8 @@ import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
public class CheckpointingCustomKvStateProgram {
@@ -84,7 +86,7 @@ public class CheckpointingCustomKvStateProgram {
env.execute();
}
- private static class InfiniteIntegerSource implements ParallelSourceFunction<Integer>, Checkpointed<Integer> {
+ private static class InfiniteIntegerSource implements ParallelSourceFunction<Integer>, ListCheckpointed<Integer> {
private static final long serialVersionUID = -7517574288730066280L;
private volatile boolean running = true;
@@ -104,17 +106,18 @@ public class CheckpointingCustomKvStateProgram {
}
@Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return 0;
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(0);
}
@Override
- public void restoreState(Integer state) throws Exception {
+ public void restoreState(List<Integer> state) throws Exception {
}
}
- private static class ReducingStateFlatMap extends RichFlatMapFunction<Tuple2<Integer, Integer>, Integer> implements Checkpointed<ReducingStateFlatMap>, CheckpointListener {
+ private static class ReducingStateFlatMap extends RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>
+ implements ListCheckpointed<ReducingStateFlatMap>, CheckpointListener {
private static final long serialVersionUID = -5939722892793950253L;
private transient ReducingState<Integer> kvState;
@@ -148,12 +151,12 @@ public class CheckpointingCustomKvStateProgram {
}
@Override
- public ReducingStateFlatMap snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return this;
+ public List<ReducingStateFlatMap> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this);
}
@Override
- public void restoreState(ReducingStateFlatMap state) throws Exception {
+ public void restoreState(List<ReducingStateFlatMap> state) throws Exception {
restored = true;
atLeastOneSnapshotComplete = true;
}