You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/20 15:09:17 UTC
[02/15] flink git commit: [FLINK-5292] Expose some
SavepointV0Serializer methods for use in tests
[FLINK-5292] Expose some SavepointV0Serializer methods for use in tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/896fbaef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/896fbaef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/896fbaef
Branch: refs/heads/master
Commit: 896fbaefb3801302fe7b0e60215ad69b2457ddee
Parents: 32f300f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Dec 16 10:56:20 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Dec 20 15:42:53 2016 +0100
----------------------------------------------------------------------
.../savepoint/SavepointV0Serializer.java | 18 +++++++++++++++---
1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/896fbaef/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
index 6c6a8f6..9e37dbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -252,7 +252,11 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
null);
}
- private StreamStateHandle convertOperatorAndFunctionState(StreamTaskState streamTaskState) throws Exception {
+ /**
+ * This is public so that we can use it when restoring a legacy snapshot
+ * in {@code AbstractStreamOperatorTestHarness}.
+ */
+ public static StreamStateHandle convertOperatorAndFunctionState(StreamTaskState streamTaskState) throws Exception {
List<StreamStateHandle> mergeStateHandles = new ArrayList<>(4);
@@ -273,7 +277,11 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
return new MigrationStreamStateHandle(new MultiStreamStateHandle(mergeStateHandles));
}
- private KeyGroupsStateHandle convertKeyedBackendState(
+ /**
+ * This is public so that we can use it when restoring a legacy snapshot
+ * in {@code AbstractStreamOperatorTestHarness}.
+ */
+ public static KeyGroupsStateHandle convertKeyedBackendState(
HashMap<String, KvStateSnapshot<?, ?, ?, ?>> oldKeyedState,
int parallelInstanceIdx,
long checkpointID) throws Exception {
@@ -327,7 +335,11 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
return 0;
}
- private static StreamStateHandle convertStateHandle(StateHandle<?> oldStateHandle) throws Exception {
+ /**
+ * This is public so that we can use it when restoring a legacy snapshot
+ * in {@code AbstractStreamOperatorTestHarness}.
+ */
+ public static StreamStateHandle convertStateHandle(StateHandle<?> oldStateHandle) throws Exception {
if (oldStateHandle instanceof AbstractFileStateHandle) {
Path path = ((AbstractFileStateHandle) oldStateHandle).getFilePath();
return new FileStateHandle(path, oldStateHandle.getStateSize());