You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 14:28:13 UTC
[07/13] flink git commit: [FLINK-5969] Add savepoint IT case that
checks restore from 1.2
[FLINK-5969] Add savepoint IT case that checks restore from 1.2
The binary savepoints in this were created on the Flink 1.2.0 release
commit.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ed98f2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ed98f2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ed98f2e
Branch: refs/heads/master
Commit: 9ed98f2e5a32fb14de03b9a8ea1dd45851cc3a7e
Parents: 1882c90
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Apr 20 10:46:10 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 16:24:26 2017 +0200
----------------------------------------------------------------------
.../utils/SavepointMigrationTestBase.java | 24 +-
...atefulJobSavepointFrom11MigrationITCase.java | 562 ++++++++++++++
...atefulJobSavepointFrom12MigrationITCase.java | 769 +++++++++++++++++++
.../StatefulUDFSavepointMigrationITCase.java | 562 --------------
...eful-udf-migration-itcase-flink1.2-savepoint | Bin 0 -> 25245 bytes
...-migration-itcase-flink1.2-savepoint-rocksdb | Bin 0 -> 25256 bytes
6 files changed, 1332 insertions(+), 585 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed98f2e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 301fc72..c5672a2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -155,15 +155,10 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
}
LOG.info("Triggering savepoint.");
- // Flink 1.2
+
final Future<Object> savepointResultFuture =
jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobSubmissionResult.getJobID(), Option.<String>empty()), DEADLINE.timeLeft());
- // Flink 1.1
-// final Future<Object> savepointResultFuture =
-// jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobSubmissionResult.getJobID()), DEADLINE.timeLeft());
-
-
Object savepointResult = Await.result(savepointResultFuture, DEADLINE.timeLeft());
if (savepointResult instanceof JobManagerMessages.TriggerSavepointFailure) {
@@ -174,24 +169,7 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
final String jobmanagerSavepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResult).savepointPath();
LOG.info("Saved savepoint: " + jobmanagerSavepointPath);
- // Flink 1.2
FileUtils.moveFile(new File(new URI(jobmanagerSavepointPath).getPath()), new File(savepointPath));
-
- // Flink 1.1
- // Retrieve the savepoint from the testing job manager
-// LOG.info("Requesting the savepoint.");
-// Future<Object> savepointFuture = jobManager.ask(new TestingJobManagerMessages.RequestSavepoint(jobmanagerSavepointPath), DEADLINE.timeLeft());
-//
-// Savepoint savepoint = ((TestingJobManagerMessages.ResponseSavepoint) Await.result(savepointFuture, DEADLINE.timeLeft())).savepoint();
-// LOG.info("Retrieved savepoint: " + jobmanagerSavepointPath + ".");
-//
-// LOG.info("Storing savepoint to file.");
-// Configuration config = new Configuration();
-// config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
-// config.setString(org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, "file:///Users/aljoscha/Downloads");
-// String path = org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory.createFromConfig(config).storeSavepoint(savepoint);
-//
-// FileUtils.moveFile(new File(new URI(path).getPath()), new File(savepointPath));
}
@SafeVarargs
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed98f2e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
new file mode 100644
index 0000000..4d94d25
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom11MigrationITCase.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This verifies that we can restore a complete job from a Flink 1.1 savepoint.
+ *
+ * <p>The test pipeline contains both "Checkpointed" state and keyed user state.
+ */
+public class StatefulJobSavepointFrom11MigrationITCase extends SavepointMigrationTestBase {
+ private static final int NUM_SOURCE_ELEMENTS = 4;
+ private static final String EXPECTED_ELEMENTS_ACCUMULATOR = "NUM_EXPECTED_ELEMENTS";
+ private static final String SUCCESSFUL_CHECK_ACCUMULATOR = "SUCCESSFUL_CHECKS";
+
+ /**
+ * This has to be manually executed to create the savepoint on Flink 1.1.
+ */
+ @Test
+ @Ignore
+ public void testCreateSavepointOnFlink11() throws Exception {
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ // we only test memory state backend yet
+ env.setStateBackend(new MemoryStateBackend());
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ // create source
+ env
+ .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+ .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+ executeAndSavepoint(
+ env,
+ "src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint",
+ new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+ }
+
+ /**
+ * This has to be manually executed to create the savepoint on Flink 1.1.
+ */
+ @Test
+ @Ignore
+ public void testCreateSavepointOnFlink11WithRocksDB() throws Exception {
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ RocksDBStateBackend rocksBackend =
+ new RocksDBStateBackend(new MemoryStateBackend());
+// rocksBackend.enableFullyAsyncSnapshots();
+ env.setStateBackend(rocksBackend);
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ // create source
+ env
+ .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+ .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+ executeAndSavepoint(
+ env,
+ "src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb",
+ new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+ }
+
+
+ @Test
+ public void testSavepointRestoreFromFlink11() throws Exception {
+
+ final int EXPECTED_SUCCESSFUL_CHECKS = 21;
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ // we only test memory state backend yet
+ env.setStateBackend(new MemoryStateBackend());
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ // create source
+ env
+ .addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+ .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+ restoreAndExecute(
+ env,
+ getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint"),
+ new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
+ }
+
+ @Test
+ public void testSavepointRestoreFromFlink11FromRocksDB() throws Exception {
+
+ final int EXPECTED_SUCCESSFUL_CHECKS = 21;
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ // we only test memory state backend yet
+ env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ // create source
+ env
+ .addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+ .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
+
+ restoreAndExecute(
+ env,
+ getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb"),
+ new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
+ }
+
+ private static class LegacyCheckpointedSource
+ implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> {
+
+ public static String CHECKPOINTED_STRING = "Here be dragons!";
+
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean isRunning = true;
+
+ private final int numElements;
+
+ public LegacyCheckpointedSource(int numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+
+ synchronized (ctx.getCheckpointLock()) {
+ for (long i = 0; i < numElements; i++) {
+ ctx.collect(new Tuple2<>(i, i));
+ }
+ }
+ while (isRunning) {
+ Thread.sleep(20);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ @Override
+ public void restoreState(String state) throws Exception {
+ assertEquals(CHECKPOINTED_STRING, state);
+ }
+
+ @Override
+ public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return CHECKPOINTED_STRING;
+ }
+ }
+
+ private static class RestoringCheckingSource
+ extends RichSourceFunction<Tuple2<Long, Long>>
+ implements CheckpointedRestoring<String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean isRunning = true;
+
+ private final int numElements;
+
+ private String restoredState;
+
+ public RestoringCheckingSource(int numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+ assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState);
+ getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+
+ synchronized (ctx.getCheckpointLock()) {
+ for (long i = 0; i < numElements; i++) {
+ ctx.collect(new Tuple2<>(i, i));
+ }
+ }
+
+ while (isRunning) {
+ Thread.sleep(20);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ @Override
+ public void restoreState(String state) throws Exception {
+ restoredState = state;
+ }
+ }
+
+ public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ implements Checkpointed<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+ new Tuple2<>("hello", 42L);
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+ }
+
+ @Override
+ public void restoreState(Tuple2<String, Long> state) throws Exception {
+ }
+
+ @Override
+ public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return CHECKPOINTED_TUPLE;
+ }
+ }
+
+ public static class RestoringCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Tuple2<String, Long> restoredState;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+ getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+
+ }
+
+ @Override
+ public void restoreState(Tuple2<String, Long> state) throws Exception {
+ restoredState = state;
+ }
+ }
+
+ public static class LegacyCheckpointedFlatMapWithKeyedState
+ extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ implements Checkpointed<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+ new Tuple2<>("hello", 42L);
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ getRuntimeContext().getState(stateDescriptor).update(value.f1);
+ }
+
+ @Override
+ public void restoreState(Tuple2<String, Long> state) throws Exception {
+ }
+
+ @Override
+ public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return CHECKPOINTED_TUPLE;
+ }
+ }
+
+ public static class RestoringCheckingFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Tuple2<String, Long> restoredState;
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+ if (state == null) {
+ throw new RuntimeException("Missing key value state for " + value);
+ }
+
+ assertEquals(value.f1, state.value());
+ assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+ getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+ }
+
+ @Override
+ public void restoreState(Tuple2<String, Long> state) throws Exception {
+ restoredState = state;
+ }
+ }
+
+ public static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ getRuntimeContext().getState(stateDescriptor).update(value.f1);
+ }
+ }
+
+ public static class KeyedStateCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+ if (state == null) {
+ throw new RuntimeException("Missing key value state for " + value);
+ }
+
+ assertEquals(value.f1, state.value());
+ getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+ }
+ }
+
+ public static class CheckpointedUdfOperator
+ extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
+ implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ private static final long serialVersionUID = 1L;
+
+ private static final String CHECKPOINTED_STRING = "Oh my, that's nice!";
+
+ public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
+ super(userFunction);
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+ output.collect(element);
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ output.emitWatermark(mark);
+ }
+
+ // Flink 1.1
+// @Override
+// public StreamTaskState snapshotOperatorState(
+// long checkpointId, long timestamp) throws Exception {
+// StreamTaskState result = super.snapshotOperatorState(checkpointId, timestamp);
+//
+// AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(
+// checkpointId,
+// timestamp);
+//
+// out.writeUTF(CHECKPOINTED_STRING);
+//
+// result.setOperatorState(out.closeAndGetHandle());
+//
+// return result;
+// }
+ }
+
+ public static class RestoringCheckingUdfOperator
+ extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
+ implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ private static final long serialVersionUID = 1L;
+
+ private String restoredState;
+
+ public RestoringCheckingUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
+ super(userFunction);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+ userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
+
+ assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState);
+ getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ output.emitWatermark(mark);
+ }
+
+ @Override
+ public void restoreState(FSDataInputStream in) throws Exception {
+ super.restoreState(in);
+
+ DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in);
+
+ restoredState = streamWrapper.readUTF();
+ }
+ }
+
+ public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
+ private static final long serialVersionUID = 1L;
+
+ private final String accumulatorName;
+
+ int count = 0;
+
+ public AccumulatorCountingSink(String accumulatorName) {
+ this.accumulatorName = accumulatorName;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(accumulatorName, new IntCounter());
+ }
+
+ @Override
+ public void invoke(T value) throws Exception {
+ count++;
+ getRuntimeContext().getAccumulator(accumulatorName).add(1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed98f2e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
new file mode 100644
index 0000000..e60cb5d
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointFrom12MigrationITCase.java
@@ -0,0 +1,769 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * This verifies that we can restore a complete job from a Flink 1.2 savepoint.
+ *
+ * <p>The test pipeline contains both "Checkpointed" state and keyed user state.
+ *
+ * <p>The tests will time out if they don't see the required number of successful checks within
+ * a time limit.
+ */
+public class StatefulJobSavepointFrom12MigrationITCase extends SavepointMigrationTestBase {
+ private static final int NUM_SOURCE_ELEMENTS = 4;
+
+ /**
+ * This has to be manually executed to create the savepoint on Flink 1.2.
+ */
+ @Test
+ @Ignore
+ public void testCreateSavepointOnFlink12() throws Exception {
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ // we only test memory state backend yet
+ env.setStateBackend(new MemoryStateBackend());
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ env
+ .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+ .keyBy(0)
+ .transform(
+ "timely_stateful_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new TimelyStatefulOperator()).uid("TimelyStatefulOperator")
+ .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
+
+ executeAndSavepoint(
+ env,
+ "src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint",
+ new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+ }
+
+ /**
+ * This has to be manually executed to create the savepoint on Flink 1.2.
+ */
+ @Test
+ @Ignore
+ public void testCreateSavepointOnFlink12WithRocksDB() throws Exception {
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ RocksDBStateBackend rocksBackend =
+ new RocksDBStateBackend(new MemoryStateBackend());
+ env.setStateBackend(rocksBackend);
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ env
+ .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
+ .keyBy(0)
+ .transform(
+ "timely_stateful_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new TimelyStatefulOperator()).uid("TimelyStatefulOperator")
+ .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
+
+ executeAndSavepoint(
+ env,
+ "src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb",
+ new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+ }
+
+
+ @Test
+ public void testSavepointRestoreFromFlink12() throws Exception {
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ // we only test memory state backend yet
+ env.setStateBackend(new MemoryStateBackend());
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ env
+ .addSource(new CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new CheckingRestoringUdfOperator(new CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator")
+ .keyBy(0)
+ .transform(
+ "timely_stateful_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator")
+ .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
+
+ restoreAndExecute(
+ env,
+ getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint"),
+ new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
+ new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(CheckingRestoringFlatMapWithKeyedStateInOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+ }
+
+ @Test
+ public void testSavepointRestoreFromFlink12FromRocksDB() throws Exception {
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ // we only test memory state backend yet
+ env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+ env.enableCheckpointing(500);
+ env.setParallelism(4);
+ env.setMaxParallelism(4);
+
+ env
+ .addSource(new CheckingRestoringSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
+ .flatMap(new CheckingRestoringFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
+ .keyBy(0)
+ .flatMap(new CheckingRestoringFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
+ .keyBy(0)
+ .flatMap(new CheckingKeyedStateFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
+ .keyBy(0)
+ .transform(
+ "custom_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new CheckingRestoringUdfOperator(new CheckingRestoringFlatMapWithKeyedStateInOperator())).uid("LegacyCheckpointedOperator")
+ .keyBy(0)
+ .transform(
+ "timely_stateful_operator",
+ new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
+ new CheckingTimelyStatefulOperator()).uid("TimelyStatefulOperator")
+ .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>());
+
+ restoreAndExecute(
+ env,
+ getResourceFilename("stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb"),
+ new Tuple2<>(CheckingRestoringSource.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1),
+ new Tuple2<>(CheckingRestoringFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(CheckingRestoringFlatMapWithKeyedState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(CheckingRestoringUdfOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(CheckingRestoringFlatMapWithKeyedStateInOperator.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS),
+ new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
+ }
+
+ private static class LegacyCheckpointedSource
+ implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> {
+
+ public static String CHECKPOINTED_STRING = "Here be dragons!";
+
+ private static final long serialVersionUID = 1L;
+
+ private volatile boolean isRunning = true;
+
+ private final int numElements;
+
+ public LegacyCheckpointedSource(int numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+
+ ctx.emitWatermark(new Watermark(0));
+
+ synchronized (ctx.getCheckpointLock()) {
+ for (long i = 0; i < numElements; i++) {
+ ctx.collect(new Tuple2<>(i, i));
+ }
+ }
+
+ // don't emit a final watermark so that we don't trigger the registered event-time
+ // timers
+ while (isRunning) {
+ Thread.sleep(20);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ @Override
+ public void restoreState(String state) throws Exception {
+ assertEquals(CHECKPOINTED_STRING, state);
+ }
+
+ @Override
+ public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return CHECKPOINTED_STRING;
+ }
+ }
+
+ private static class CheckingRestoringSource
+ extends RichSourceFunction<Tuple2<Long, Long>>
+ implements CheckpointedRestoring<String> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringSource.class + "_RESTORE_CHECK";
+
+ private volatile boolean isRunning = true;
+
+ private final int numElements;
+
+ private String restoredState;
+
+ public CheckingRestoringSource(int numElements) {
+ this.numElements = numElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+ assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState);
+ getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
+
+ // immediately trigger any set timers
+ ctx.emitWatermark(new Watermark(1000));
+
+ synchronized (ctx.getCheckpointLock()) {
+ for (long i = 0; i < numElements; i++) {
+ ctx.collect(new Tuple2<>(i, i));
+ }
+ }
+
+ while (isRunning) {
+ Thread.sleep(20);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+ @Override
+ public void restoreState(String state) throws Exception {
+ restoredState = state;
+ }
+ }
+
+ public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ implements Checkpointed<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+ new Tuple2<>("hello", 42L);
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+ }
+
+ @Override
+ public void restoreState(Tuple2<String, Long> state) throws Exception {
+ }
+
+ @Override
+ public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return CHECKPOINTED_TUPLE;
+ }
+ }
+
+ public static class CheckingRestoringFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMap.class + "_RESTORE_CHECK";
+
+ private transient Tuple2<String, Long> restoredState;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+ getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
+
+ }
+
+ @Override
+ public void restoreState(Tuple2<String, Long> state) throws Exception {
+ restoredState = state;
+ }
+ }
+
+ public static class LegacyCheckpointedFlatMapWithKeyedState
+ extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ implements Checkpointed<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
+ new Tuple2<>("hello", 42L);
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ getRuntimeContext().getState(stateDescriptor).update(value.f1);
+
+ assertEquals(value.f1, getRuntimeContext().getState(stateDescriptor).value());
+ }
+
+ @Override
+ public void restoreState(Tuple2<String, Long> state) throws Exception {
+ }
+
+ @Override
+ public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return CHECKPOINTED_TUPLE;
+ }
+ }
+
+ public static class CheckingRestoringFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMapWithKeyedState.class + "_RESTORE_CHECK";
+
+ private transient Tuple2<String, Long> restoredState;
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+ if (state == null) {
+ throw new RuntimeException("Missing key value state for " + value);
+ }
+
+ assertEquals(value.f1, state.value());
+ assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+ getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
+ }
+
+ @Override
+ public void restoreState(Tuple2<String, Long> state) throws Exception {
+ restoredState = state;
+ }
+ }
+
+ public static class CheckingRestoringFlatMapWithKeyedStateInOperator extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
+ implements CheckpointedRestoring<Tuple2<String, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringFlatMapWithKeyedStateInOperator.class + "_RESTORE_CHECK";
+
+ private transient Tuple2<String, Long> restoredState;
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+ if (state == null) {
+ throw new RuntimeException("Missing key value state for " + value);
+ }
+
+ assertEquals(value.f1, state.value());
+ assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
+ getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
+ }
+
+ @Override
+ public void restoreState(Tuple2<String, Long> state) throws Exception {
+ restoredState = state;
+ }
+ }
+
+ public static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ getRuntimeContext().getState(stateDescriptor).update(value.f1);
+ }
+ }
+
+ public static class CheckingKeyedStateFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingKeyedStateFlatMap.class + "_RESTORE_CHECK";
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+ out.collect(value);
+
+ ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
+ if (state == null) {
+ throw new RuntimeException("Missing key value state for " + value);
+ }
+
+ assertEquals(value.f1, state.value());
+ getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
+ }
+ }
+
+ public static class CheckpointedUdfOperator
+ extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
+ implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+ private static final long serialVersionUID = 1L;
+
+ private static final String CHECKPOINTED_STRING = "Oh my, that's nice!";
+
+ public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
+ super(userFunction);
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+ userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ output.emitWatermark(mark);
+ }
+
+ @Override
+ public void snapshotState(
+ FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
+ super.snapshotState(out, checkpointId, timestamp);
+
+ DataOutputViewStreamWrapper streamWrapper = new DataOutputViewStreamWrapper(out);
+
+ streamWrapper.writeUTF(CHECKPOINTED_STRING);
+ streamWrapper.flush();
+ }
+ }
+
+ public static class CheckingRestoringUdfOperator
+ extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
+ implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingRestoringUdfOperator.class + "_RESTORE_CHECK";
+
+ private String restoredState;
+
+ public CheckingRestoringUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
+ super(userFunction);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+ userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
+
+ assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState);
+ getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1);
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ output.emitWatermark(mark);
+ }
+
+ @Override
+ public void restoreState(FSDataInputStream in) throws Exception {
+ super.restoreState(in);
+
+ DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in);
+
+ restoredState = streamWrapper.readUTF();
+ }
+ }
+
+ public static class TimelyStatefulOperator
+ extends AbstractStreamOperator<Tuple2<Long, Long>>
+ implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>, Triggerable<Long, Long> {
+ private static final long serialVersionUID = 1L;
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+ private transient InternalTimerService<Long> timerService;
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ timerService = getInternalTimerService(
+ "timer",
+ LongSerializer.INSTANCE,
+ this);
+
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+ ValueState<Long> state = getKeyedStateBackend().getPartitionedState(
+ element.getValue().f0,
+ LongSerializer.INSTANCE,
+ stateDescriptor);
+
+ state.update(element.getValue().f1);
+
+ timerService.registerEventTimeTimer(element.getValue().f0, timerService.currentWatermark() + 10);
+ timerService.registerProcessingTimeTimer(element.getValue().f0, timerService.currentProcessingTime() + 30_000);
+
+ output.collect(element);
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<Long, Long> timer) throws Exception {
+
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception {
+
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ output.emitWatermark(mark);
+ }
+ }
+
+ public static class CheckingTimelyStatefulOperator
+ extends AbstractStreamOperator<Tuple2<Long, Long>>
+ implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>, Triggerable<Long, Long> {
+ private static final long serialVersionUID = 1L;
+
+ public static final String SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_PROCESS_CHECKS";
+ public static final String SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_ET_CHECKS";
+ public static final String SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_PT_CHECKS";
+
+ private final ValueStateDescriptor<Long> stateDescriptor =
+ new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
+
+ private transient InternalTimerService<Long> timerService;
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ timerService = getInternalTimerService(
+ "timer",
+ LongSerializer.INSTANCE,
+ this);
+
+ getRuntimeContext().addAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, new IntCounter());
+ getRuntimeContext().addAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, new IntCounter());
+ getRuntimeContext().addAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
+ ValueState<Long> state = getKeyedStateBackend().getPartitionedState(
+ element.getValue().f0,
+ LongSerializer.INSTANCE,
+ stateDescriptor);
+
+ assertEquals(state.value(), element.getValue().f1);
+ getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR).add(1);
+
+ output.collect(element);
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<Long, Long> timer) throws Exception {
+ ValueState<Long> state = getKeyedStateBackend().getPartitionedState(
+ timer.getNamespace(),
+ LongSerializer.INSTANCE,
+ stateDescriptor);
+
+ assertEquals(state.value(), timer.getNamespace());
+ getRuntimeContext().getAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR).add(1);
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception {
+ ValueState<Long> state = getKeyedStateBackend().getPartitionedState(
+ timer.getNamespace(),
+ LongSerializer.INSTANCE,
+ stateDescriptor);
+
+ assertEquals(state.value(), timer.getNamespace());
+ getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR).add(1);
+ }
+ }
+
+ public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
+ private static final long serialVersionUID = 1L;
+
+ public static final String NUM_ELEMENTS_ACCUMULATOR = AccumulatorCountingSink.class + "_NUM_ELEMENTS";
+
+ int count = 0;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ getRuntimeContext().addAccumulator(NUM_ELEMENTS_ACCUMULATOR, new IntCounter());
+ }
+
+ @Override
+ public void invoke(T value) throws Exception {
+ count++;
+ getRuntimeContext().getAccumulator(NUM_ELEMENTS_ACCUMULATOR).add(1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed98f2e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
deleted file mode 100644
index 10a8998..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulUDFSavepointMigrationITCase.java
+++ /dev/null
@@ -1,562 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing.utils;
-
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * This verifies that we can restore a complete job from a Flink 1.1 savepoint.
- *
- * <p>The test pipeline contains both "Checkpointed" state and keyed user state.
- */
-public class StatefulUDFSavepointMigrationITCase extends SavepointMigrationTestBase {
- private static final int NUM_SOURCE_ELEMENTS = 4;
- private static final String EXPECTED_ELEMENTS_ACCUMULATOR = "NUM_EXPECTED_ELEMENTS";
- private static final String SUCCESSFUL_CHECK_ACCUMULATOR = "SUCCESSFUL_CHECKS";
-
- /**
- * This has to be manually executed to create the savepoint on Flink 1.1.
- */
- @Test
- @Ignore
- public void testCreateSavepointOnFlink11() throws Exception {
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- // we only test memory state backend yet
- env.setStateBackend(new MemoryStateBackend());
- env.enableCheckpointing(500);
- env.setParallelism(4);
- env.setMaxParallelism(4);
-
- // create source
- env
- .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
- .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
- .keyBy(0)
- .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
- .keyBy(0)
- .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
- .keyBy(0)
- .transform(
- "custom_operator",
- new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
- new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
- .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
-
- executeAndSavepoint(
- env,
- "src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint",
- new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
- }
-
- /**
- * This has to be manually executed to create the savepoint on Flink 1.1.
- */
- @Test
- @Ignore
- public void testCreateSavepointOnFlink11WithRocksDB() throws Exception {
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- RocksDBStateBackend rocksBackend =
- new RocksDBStateBackend(new MemoryStateBackend());
-// rocksBackend.enableFullyAsyncSnapshots();
- env.setStateBackend(rocksBackend);
- env.enableCheckpointing(500);
- env.setParallelism(4);
- env.setMaxParallelism(4);
-
- // create source
- env
- .addSource(new LegacyCheckpointedSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
- .flatMap(new LegacyCheckpointedFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
- .keyBy(0)
- .flatMap(new LegacyCheckpointedFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
- .keyBy(0)
- .flatMap(new KeyedStateSettingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
- .keyBy(0)
- .transform(
- "custom_operator",
- new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
- new CheckpointedUdfOperator(new LegacyCheckpointedFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
- .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
-
- executeAndSavepoint(
- env,
- "src/test/resources/stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb",
- new Tuple2<>(EXPECTED_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS));
- }
-
-
- @Test
- public void testSavepointRestoreFromFlink11() throws Exception {
-
- final int EXPECTED_SUCCESSFUL_CHECKS = 21;
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- // we only test memory state backend yet
- env.setStateBackend(new MemoryStateBackend());
- env.enableCheckpointing(500);
- env.setParallelism(4);
- env.setMaxParallelism(4);
-
- // create source
- env
- .addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
- .flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
- .keyBy(0)
- .flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
- .keyBy(0)
- .flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
- .keyBy(0)
- .transform(
- "custom_operator",
- new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
- new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
- .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
-
- restoreAndExecute(
- env,
- getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint"),
- new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
- }
-
- @Test
- public void testSavepointRestoreFromFlink11FromRocksDB() throws Exception {
-
- final int EXPECTED_SUCCESSFUL_CHECKS = 21;
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- // we only test memory state backend yet
- env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
- env.enableCheckpointing(500);
- env.setParallelism(4);
- env.setMaxParallelism(4);
-
- // create source
- env
- .addSource(new RestoringCheckingSource(NUM_SOURCE_ELEMENTS)).setMaxParallelism(1).uid("LegacyCheckpointedSource")
- .flatMap(new RestoringCheckingFlatMap()).startNewChain().uid("LegacyCheckpointedFlatMap")
- .keyBy(0)
- .flatMap(new RestoringCheckingFlatMapWithKeyedState()).startNewChain().uid("LegacyCheckpointedFlatMapWithKeyedState")
- .keyBy(0)
- .flatMap(new KeyedStateCheckingFlatMap()).startNewChain().uid("KeyedStateSettingFlatMap")
- .keyBy(0)
- .transform(
- "custom_operator",
- new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(),
- new RestoringCheckingUdfOperator(new RestoringCheckingFlatMapWithKeyedState())).uid("LegacyCheckpointedOperator")
- .addSink(new AccumulatorCountingSink<Tuple2<Long, Long>>(EXPECTED_ELEMENTS_ACCUMULATOR));
-
- restoreAndExecute(
- env,
- getResourceFilename("stateful-udf-migration-itcase-flink1.1-savepoint-rocksdb"),
- new Tuple2<>(SUCCESSFUL_CHECK_ACCUMULATOR, EXPECTED_SUCCESSFUL_CHECKS));
- }
-
- private static class LegacyCheckpointedSource
- implements SourceFunction<Tuple2<Long, Long>>, Checkpointed<String> {
-
- public static String CHECKPOINTED_STRING = "Here be dragons!";
-
- private static final long serialVersionUID = 1L;
-
- private volatile boolean isRunning = true;
-
- private final int numElements;
-
- public LegacyCheckpointedSource(int numElements) {
- this.numElements = numElements;
- }
-
- @Override
- public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
-
- synchronized (ctx.getCheckpointLock()) {
- for (long i = 0; i < numElements; i++) {
- ctx.collect(new Tuple2<>(i, i));
- }
- }
- while (isRunning) {
- Thread.sleep(20);
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
- @Override
- public void restoreState(String state) throws Exception {
- assertEquals(CHECKPOINTED_STRING, state);
- }
-
- @Override
- public String snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return CHECKPOINTED_STRING;
- }
- }
-
- private static class RestoringCheckingSource
- extends RichSourceFunction<Tuple2<Long, Long>>
- implements CheckpointedRestoring<String> {
-
- private static final long serialVersionUID = 1L;
-
- private volatile boolean isRunning = true;
-
- private final int numElements;
-
- private String restoredState;
-
- public RestoringCheckingSource(int numElements) {
- this.numElements = numElements;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
- }
-
- @Override
- public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
- assertEquals(LegacyCheckpointedSource.CHECKPOINTED_STRING, restoredState);
- getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
-
- synchronized (ctx.getCheckpointLock()) {
- for (long i = 0; i < numElements; i++) {
- ctx.collect(new Tuple2<>(i, i));
- }
- }
-
- while (isRunning) {
- Thread.sleep(20);
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
- @Override
- public void restoreState(String state) throws Exception {
- restoredState = state;
- }
- }
-
- public static class LegacyCheckpointedFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
- implements Checkpointed<Tuple2<String, Long>> {
-
- private static final long serialVersionUID = 1L;
-
- public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
- new Tuple2<>("hello", 42L);
-
- @Override
- public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
- out.collect(value);
- }
-
- @Override
- public void restoreState(Tuple2<String, Long> state) throws Exception {
- }
-
- @Override
- public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return CHECKPOINTED_TUPLE;
- }
- }
-
- public static class RestoringCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
- implements CheckpointedRestoring<Tuple2<String, Long>> {
-
- private static final long serialVersionUID = 1L;
-
- private transient Tuple2<String, Long> restoredState;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
- }
-
- @Override
- public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
- out.collect(value);
-
- assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
- getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
-
- }
-
- @Override
- public void restoreState(Tuple2<String, Long> state) throws Exception {
- restoredState = state;
- }
- }
-
- public static class LegacyCheckpointedFlatMapWithKeyedState
- extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
- implements Checkpointed<Tuple2<String, Long>> {
-
- private static final long serialVersionUID = 1L;
-
- public static Tuple2<String, Long> CHECKPOINTED_TUPLE =
- new Tuple2<>("hello", 42L);
-
- private final ValueStateDescriptor<Long> stateDescriptor =
- new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
-
- @Override
- public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
- out.collect(value);
-
- getRuntimeContext().getState(stateDescriptor).update(value.f1);
- }
-
- @Override
- public void restoreState(Tuple2<String, Long> state) throws Exception {
- }
-
- @Override
- public Tuple2<String, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return CHECKPOINTED_TUPLE;
- }
- }
-
- public static class RestoringCheckingFlatMapWithKeyedState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
- implements CheckpointedRestoring<Tuple2<String, Long>> {
-
- private static final long serialVersionUID = 1L;
-
- private transient Tuple2<String, Long> restoredState;
-
- private final ValueStateDescriptor<Long> stateDescriptor =
- new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
- }
-
- @Override
- public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
- out.collect(value);
-
- ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
- if (state == null) {
- throw new RuntimeException("Missing key value state for " + value);
- }
-
- assertEquals(value.f1, state.value());
- assertEquals(LegacyCheckpointedFlatMap.CHECKPOINTED_TUPLE, restoredState);
- getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
- }
-
- @Override
- public void restoreState(Tuple2<String, Long> state) throws Exception {
- restoredState = state;
- }
- }
-
- public static class KeyedStateSettingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
- private static final long serialVersionUID = 1L;
-
- private final ValueStateDescriptor<Long> stateDescriptor =
- new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
-
- @Override
- public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
- out.collect(value);
-
- getRuntimeContext().getState(stateDescriptor).update(value.f1);
- }
- }
-
- public static class KeyedStateCheckingFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
- private static final long serialVersionUID = 1L;
-
- private final ValueStateDescriptor<Long> stateDescriptor =
- new ValueStateDescriptor<Long>("state-name", LongSerializer.INSTANCE);
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- getRuntimeContext().addAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR, new IntCounter());
- }
-
- @Override
- public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
- out.collect(value);
-
- ValueState<Long> state = getRuntimeContext().getState(stateDescriptor);
- if (state == null) {
- throw new RuntimeException("Missing key value state for " + value);
- }
-
- assertEquals(value.f1, state.value());
- getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
- }
- }
-
- public static class CheckpointedUdfOperator
- extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
- implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
- private static final long serialVersionUID = 1L;
-
- private static final String CHECKPOINTED_STRING = "Oh my, that's nice!";
-
- public CheckpointedUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
- super(userFunction);
- }
-
- @Override
- public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
- output.collect(element);
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-
- // Flink 1.1
-// @Override
-// public StreamTaskState snapshotOperatorState(
-// long checkpointId, long timestamp) throws Exception {
-// StreamTaskState result = super.snapshotOperatorState(checkpointId, timestamp);
-//
-// AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(
-// checkpointId,
-// timestamp);
-//
-// out.writeUTF(CHECKPOINTED_STRING);
-//
-// result.setOperatorState(out.closeAndGetHandle());
-//
-// return result;
-// }
- }
-
- public static class RestoringCheckingUdfOperator
- extends AbstractUdfStreamOperator<Tuple2<Long, Long>, FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>>
- implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> {
- private static final long serialVersionUID = 1L;
-
- private String restoredState;
-
- public RestoringCheckingUdfOperator(FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> userFunction) {
- super(userFunction);
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- }
-
- @Override
- public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception {
- userFunction.flatMap(element.getValue(), new TimestampedCollector<>(output));
-
- assertEquals(CheckpointedUdfOperator.CHECKPOINTED_STRING, restoredState);
- getRuntimeContext().getAccumulator(SUCCESSFUL_CHECK_ACCUMULATOR).add(1);
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-
- @Override
- public void restoreState(FSDataInputStream in) throws Exception {
- super.restoreState(in);
-
- DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper(in);
-
- restoredState = streamWrapper.readUTF();
- }
- }
-
- public static class AccumulatorCountingSink<T> extends RichSinkFunction<T> {
- private static final long serialVersionUID = 1L;
-
- private final String accumulatorName;
-
- int count = 0;
-
- public AccumulatorCountingSink(String accumulatorName) {
- this.accumulatorName = accumulatorName;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- getRuntimeContext().addAccumulator(accumulatorName, new IntCounter());
- }
-
- @Override
- public void invoke(T value) throws Exception {
- count++;
- getRuntimeContext().getAccumulator(accumulatorName).add(1);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed98f2e/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint
new file mode 100644
index 0000000..a8d19f2
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint differ
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed98f2e/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb
new file mode 100644
index 0000000..548993f
Binary files /dev/null and b/flink-tests/src/test/resources/stateful-udf-migration-itcase-flink1.2-savepoint-rocksdb differ