You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 11:52:00 UTC
[04/11] flink git commit: [FLINK-5969] Add OperatorSnapshotUtil
[FLINK-5969] Add OperatorSnapshotUtil
This has methods for storing/reading OperatorStateHandles, as returned
from stream operator test harnesses. This can be used to write binary
snapshots for use in state migration tests.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/611434c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/611434c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/611434c6
Branch: refs/heads/release-1.2
Commit: 611434c6fa8e53cac25dd93f568d2670ec4ead72
Parents: 52fb578
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Apr 24 12:31:53 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 13:50:04 2017 +0200
----------------------------------------------------------------------
.../savepoint/SavepointV1Serializer.java | 23 ++-
.../streaming/util/OperatorSnapshotUtil.java | 156 +++++++++++++++++++
2 files changed, 172 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/611434c6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index ba1949a..a9fa3c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.checkpoint.savepoint;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskState;
@@ -47,7 +49,8 @@ import java.util.Map;
* that no default Java serialization is used for serialization. Therefore, we
* don't rely on any involved Java classes to stay the same.
*/
-class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
+@Internal
+public class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
private static final byte NULL_HANDLE = 0;
private static final byte BYTE_STREAM_STATE_HANDLE = 1;
@@ -209,7 +212,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
keyedStateStream);
}
- private static void serializeKeyGroupStateHandle(
+ @VisibleForTesting
+ public static void serializeKeyGroupStateHandle(
KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException {
if (stateHandle != null) {
@@ -225,7 +229,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
}
}
- private static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
+ @VisibleForTesting
+ public static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
final int type = dis.readByte();
if (NULL_HANDLE == type) {
return null;
@@ -245,7 +250,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
}
}
- private static void serializeOperatorStateHandle(
+ @VisibleForTesting
+ public static void serializeOperatorStateHandle(
OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
if (stateHandle != null) {
@@ -273,7 +279,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
}
}
- private static OperatorStateHandle deserializeOperatorStateHandle(
+ @VisibleForTesting
+ public static OperatorStateHandle deserializeOperatorStateHandle(
DataInputStream dis) throws IOException {
final int type = dis.readByte();
@@ -304,7 +311,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
}
}
- private static void serializeStreamStateHandle(
+ @VisibleForTesting
+ public static void serializeStreamStateHandle(
StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
if (stateHandle == null) {
@@ -331,7 +339,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
dos.flush();
}
- private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+ @VisibleForTesting
+ public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
int type = dis.read();
if (NULL_HANDLE == type) {
return null;
http://git-wip-us.apache.org/repos/asf/flink/blob/611434c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
new file mode 100644
index 0000000..1a53598
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+
+/**
+ * Util for writing/reading {@link org.apache.flink.streaming.runtime.tasks.OperatorStateHandles},
+ * for use in tests.
+ */
+public class OperatorSnapshotUtil {
+
+ public static String getResourceFilename(String filename) {
+ ClassLoader cl = OperatorSnapshotUtil.class.getClassLoader();
+ URL resource = cl.getResource(filename);
+ return resource.getFile();
+ }
+
+ public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException {
+ FileOutputStream out = new FileOutputStream(path);
+ DataOutputStream dos = new DataOutputStream(out);
+
+ dos.writeInt(state.getOperatorChainIndex());
+
+ SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos);
+
+ Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState();
+ if (rawOperatorState != null) {
+ dos.writeInt(rawOperatorState.size());
+ for (OperatorStateHandle operatorStateHandle : rawOperatorState) {
+ SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ }
+ } else {
+ // this means no states, not even an empty list
+ dos.writeInt(-1);
+ }
+
+ Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState();
+ if (managedOperatorState != null) {
+ dos.writeInt(managedOperatorState.size());
+ for (OperatorStateHandle operatorStateHandle : managedOperatorState) {
+ SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ }
+ } else {
+ // this means no states, not even an empty list
+ dos.writeInt(-1);
+ }
+
+ Collection<KeyGroupsStateHandle> rawKeyedState = state.getRawKeyedState();
+ if (rawKeyedState != null) {
+ dos.writeInt(rawKeyedState.size());
+ for (KeyGroupsStateHandle keyedStateHandle : rawKeyedState) {
+ SavepointV1Serializer.serializeKeyGroupStateHandle(keyedStateHandle, dos);
+ }
+ } else {
+ // this means no operator states, not even an empty list
+ dos.writeInt(-1);
+ }
+
+ Collection<KeyGroupsStateHandle> managedKeyedState = state.getManagedKeyedState();
+ if (managedKeyedState != null) {
+ dos.writeInt(managedKeyedState.size());
+ for (KeyGroupsStateHandle keyedStateHandle : managedKeyedState) {
+ SavepointV1Serializer.serializeKeyGroupStateHandle(keyedStateHandle, dos);
+ }
+ } else {
+ // this means no operator states, not even an empty list
+ dos.writeInt(-1);
+ }
+
+ dos.flush();
+ out.close();
+ }
+
+ public static OperatorStateHandles readStateHandle(String path) throws IOException, ClassNotFoundException {
+ FileInputStream in = new FileInputStream(path);
+ DataInputStream dis = new DataInputStream(in);
+ int index = dis.readInt();
+
+ StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis);
+
+ List<OperatorStateHandle> rawOperatorState = null;
+ int numRawOperatorStates = dis.readInt();
+ if (numRawOperatorStates >= 0) {
+ rawOperatorState = new ArrayList<>();
+ for (int i = 0; i < numRawOperatorStates; i++) {
+ OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+ dis);
+ rawOperatorState.add(operatorState);
+ }
+ }
+
+ List<OperatorStateHandle> managedOperatorState = null;
+ int numManagedOperatorStates = dis.readInt();
+ if (numManagedOperatorStates >= 0) {
+ managedOperatorState = new ArrayList<>();
+ for (int i = 0; i < numManagedOperatorStates; i++) {
+ OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+ dis);
+ managedOperatorState.add(operatorState);
+ }
+ }
+
+ List<KeyGroupsStateHandle> rawKeyedState = null;
+ int numRawKeyedStates = dis.readInt();
+ if (numRawKeyedStates >= 0) {
+ rawKeyedState = new ArrayList<>();
+ for (int i = 0; i < numRawKeyedStates; i++) {
+ KeyGroupsStateHandle keyedState = SavepointV1Serializer.deserializeKeyGroupStateHandle(
+ dis);
+ rawKeyedState.add(keyedState);
+ }
+ }
+
+ List<KeyGroupsStateHandle> managedKeyedState = null;
+ int numManagedKeyedStates = dis.readInt();
+ if (numManagedKeyedStates >= 0) {
+ managedKeyedState = new ArrayList<>();
+ for (int i = 0; i < numManagedKeyedStates; i++) {
+ KeyGroupsStateHandle keyedState = SavepointV1Serializer.deserializeKeyGroupStateHandle(
+ dis);
+ managedKeyedState.add(keyedState);
+ }
+ }
+
+ return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+ }
+}