You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 14:28:11 UTC
[05/13] flink git commit: [FLINK-5969] Add OperatorSnapshotUtil
[FLINK-5969] Add OperatorSnapshotUtil
This has methods for storing/reading OperatorStateHandles, as returned
from stream operator test harnesses. This can be used to write binary
snapshots for use in state migration tests.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2779197f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2779197f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2779197f
Branch: refs/heads/master
Commit: 2779197f237446e3bff4e9e15f90c24d721c8ab4
Parents: 9ed98f2
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Apr 24 12:31:53 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 16:24:26 2017 +0200
----------------------------------------------------------------------
.../savepoint/SavepointV1Serializer.java | 27 ++--
.../streaming/util/OperatorSnapshotUtil.java | 156 +++++++++++++++++++
2 files changed, 174 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2779197f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index aaa8cdd..f67d54c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.checkpoint.savepoint;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskState;
@@ -43,12 +45,13 @@ import java.util.Map;
/**
* Deserializer for checkpoints written in format {@code 1} (Flink 1.2.x format)
- *
+ *
* <p>In contrast to the previous versions, this serializer makes sure that no Java
- * serialization is used for serialization. Therefore, we don't rely on any involved
+ * serialization is used for serialization. Therefore, we don't rely on any involved
* classes to stay the same.
*/
-class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
+@Internal
+public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
private static final byte NULL_HANDLE = 0;
private static final byte BYTE_STREAM_STATE_HANDLE = 1;
@@ -210,7 +213,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
keyedStateStream);
}
- private static void serializeKeyedStateHandle(
+ @VisibleForTesting
+ public static void serializeKeyedStateHandle(
KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
if (stateHandle == null) {
@@ -230,7 +234,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
}
}
- private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
+ @VisibleForTesting
+ public static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
final int type = dis.readByte();
if (NULL_HANDLE == type) {
return null;
@@ -251,7 +256,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
}
}
- private static void serializeOperatorStateHandle(
+ @VisibleForTesting
+ public static void serializeOperatorStateHandle(
OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
if (stateHandle != null) {
@@ -279,7 +285,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
}
}
- private static OperatorStateHandle deserializeOperatorStateHandle(
+ @VisibleForTesting
+ public static OperatorStateHandle deserializeOperatorStateHandle(
DataInputStream dis) throws IOException {
final int type = dis.readByte();
@@ -310,7 +317,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
}
}
- private static void serializeStreamStateHandle(
+ @VisibleForTesting
+ public static void serializeStreamStateHandle(
StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
if (stateHandle == null) {
@@ -337,7 +345,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
dos.flush();
}
- private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+ @VisibleForTesting
+ public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
int type = dis.read();
if (NULL_HANDLE == type) {
return null;
http://git-wip-us.apache.org/repos/asf/flink/blob/2779197f/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
new file mode 100644
index 0000000..92a9452
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+
+/**
+ * Util for writing/reading {@link org.apache.flink.streaming.runtime.tasks.OperatorStateHandles},
+ * for use in tests.
+ */
+public class OperatorSnapshotUtil {
+
+ public static String getResourceFilename(String filename) {
+ ClassLoader cl = OperatorSnapshotUtil.class.getClassLoader();
+ URL resource = cl.getResource(filename);
+ return resource.getFile();
+ }
+
+ public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException {
+ FileOutputStream out = new FileOutputStream(path);
+ DataOutputStream dos = new DataOutputStream(out);
+
+ dos.writeInt(state.getOperatorChainIndex());
+
+ SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos);
+
+ Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState();
+ if (rawOperatorState != null) {
+ dos.writeInt(rawOperatorState.size());
+ for (OperatorStateHandle operatorStateHandle : rawOperatorState) {
+ SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ }
+ } else {
+ // this means no states, not even an empty list
+ dos.writeInt(-1);
+ }
+
+ Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState();
+ if (managedOperatorState != null) {
+ dos.writeInt(managedOperatorState.size());
+ for (OperatorStateHandle operatorStateHandle : managedOperatorState) {
+ SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ }
+ } else {
+ // this means no states, not even an empty list
+ dos.writeInt(-1);
+ }
+
+ Collection<KeyedStateHandle> rawKeyedState = state.getRawKeyedState();
+ if (rawKeyedState != null) {
+ dos.writeInt(rawKeyedState.size());
+ for (KeyedStateHandle keyedStateHandle : rawKeyedState) {
+ SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ }
+ } else {
+ // this means no operator states, not even an empty list
+ dos.writeInt(-1);
+ }
+
+ Collection<KeyedStateHandle> managedKeyedState = state.getManagedKeyedState();
+ if (managedKeyedState != null) {
+ dos.writeInt(managedKeyedState.size());
+ for (KeyedStateHandle keyedStateHandle : managedKeyedState) {
+ SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ }
+ } else {
+ // this means no operator states, not even an empty list
+ dos.writeInt(-1);
+ }
+
+ dos.flush();
+ out.close();
+ }
+
+ public static OperatorStateHandles readStateHandle(String path) throws IOException, ClassNotFoundException {
+ FileInputStream in = new FileInputStream(path);
+ DataInputStream dis = new DataInputStream(in);
+ int index = dis.readInt();
+
+ StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis);
+
+ List<OperatorStateHandle> rawOperatorState = null;
+ int numRawOperatorStates = dis.readInt();
+ if (numRawOperatorStates >= 0) {
+ rawOperatorState = new ArrayList<>();
+ for (int i = 0; i < numRawOperatorStates; i++) {
+ OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+ dis);
+ rawOperatorState.add(operatorState);
+ }
+ }
+
+ List<OperatorStateHandle> managedOperatorState = null;
+ int numManagedOperatorStates = dis.readInt();
+ if (numManagedOperatorStates >= 0) {
+ managedOperatorState = new ArrayList<>();
+ for (int i = 0; i < numManagedOperatorStates; i++) {
+ OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+ dis);
+ managedOperatorState.add(operatorState);
+ }
+ }
+
+ List<KeyedStateHandle> rawKeyedState = null;
+ int numRawKeyedStates = dis.readInt();
+ if (numRawKeyedStates >= 0) {
+ rawKeyedState = new ArrayList<>();
+ for (int i = 0; i < numRawKeyedStates; i++) {
+ KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
+ dis);
+ rawKeyedState.add(keyedState);
+ }
+ }
+
+ List<KeyedStateHandle> managedKeyedState = null;
+ int numManagedKeyedStates = dis.readInt();
+ if (numManagedKeyedStates >= 0) {
+ managedKeyedState = new ArrayList<>();
+ for (int i = 0; i < numManagedKeyedStates; i++) {
+ KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
+ dis);
+ managedKeyedState.add(keyedState);
+ }
+ }
+
+ return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+ }
+}