You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 11:52:00 UTC

[04/11] flink git commit: [FLINK-5969] Add OperatorSnapshotUtil

[FLINK-5969] Add OperatorSnapshotUtil

This has methods for storing/reading OperatorStateHandles, as returned
from stream operator test harnesses. This can be used to write binary
snapshots for use in state migration tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/611434c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/611434c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/611434c6

Branch: refs/heads/release-1.2
Commit: 611434c6fa8e53cac25dd93f568d2670ec4ead72
Parents: 52fb578
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Apr 24 12:31:53 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 13:50:04 2017 +0200

----------------------------------------------------------------------
 .../savepoint/SavepointV1Serializer.java        |  23 ++-
 .../streaming/util/OperatorSnapshotUtil.java    | 156 +++++++++++++++++++
 2 files changed, 172 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/611434c6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index ba1949a..a9fa3c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
@@ -47,7 +49,8 @@ import java.util.Map;
  * that no default Java serialization is used for serialization. Therefore, we
  * don't rely on any involved Java classes to stay the same.
  */
-class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
+@Internal
+public class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 
 	private static final byte NULL_HANDLE = 0;
 	private static final byte BYTE_STREAM_STATE_HANDLE = 1;
@@ -209,7 +212,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 				keyedStateStream);
 	}
 
-	private static void serializeKeyGroupStateHandle(
+	@VisibleForTesting
+	public static void serializeKeyGroupStateHandle(
 			KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException {
 
 		if (stateHandle != null) {
@@ -225,7 +229,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 		}
 	}
 
-	private static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
+	@VisibleForTesting
+	public static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
 		final int type = dis.readByte();
 		if (NULL_HANDLE == type) {
 			return null;
@@ -245,7 +250,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 		}
 	}
 
-	private static void serializeOperatorStateHandle(
+	@VisibleForTesting
+	public static void serializeOperatorStateHandle(
 			OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
 
 		if (stateHandle != null) {
@@ -273,7 +279,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 		}
 	}
 
-	private static OperatorStateHandle deserializeOperatorStateHandle(
+	@VisibleForTesting
+	public static OperatorStateHandle deserializeOperatorStateHandle(
 			DataInputStream dis) throws IOException {
 
 		final int type = dis.readByte();
@@ -304,7 +311,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 		}
 	}
 
-	private static void serializeStreamStateHandle(
+	@VisibleForTesting
+	public static void serializeStreamStateHandle(
 			StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
 
 		if (stateHandle == null) {
@@ -331,7 +339,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 		dos.flush();
 	}
 
-	private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+	@VisibleForTesting
+	public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
 		int type = dis.read();
 		if (NULL_HANDLE == type) {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/611434c6/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
new file mode 100644
index 0000000..1a53598
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+
+/**
+ * Util for writing/reading {@link org.apache.flink.streaming.runtime.tasks.OperatorStateHandles},
+ * for use in tests.
+ */
+public class OperatorSnapshotUtil {
+
+	public static String getResourceFilename(String filename) {
+		ClassLoader cl = OperatorSnapshotUtil.class.getClassLoader();
+		URL resource = cl.getResource(filename);
+		return resource.getFile();
+	}
+
+	public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException {
+		FileOutputStream out = new FileOutputStream(path);
+		DataOutputStream dos = new DataOutputStream(out);
+
+		dos.writeInt(state.getOperatorChainIndex());
+
+		SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos);
+
+		Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState();
+		if (rawOperatorState != null) {
+			dos.writeInt(rawOperatorState.size());
+			for (OperatorStateHandle operatorStateHandle : rawOperatorState) {
+				SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+			}
+		} else {
+			// this means no states, not even an empty list
+			dos.writeInt(-1);
+		}
+
+		Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState();
+		if (managedOperatorState != null) {
+			dos.writeInt(managedOperatorState.size());
+			for (OperatorStateHandle operatorStateHandle : managedOperatorState) {
+				SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+			}
+		} else {
+			// this means no states, not even an empty list
+			dos.writeInt(-1);
+		}
+
+		Collection<KeyGroupsStateHandle> rawKeyedState = state.getRawKeyedState();
+		if (rawKeyedState != null) {
+			dos.writeInt(rawKeyedState.size());
+			for (KeyGroupsStateHandle keyedStateHandle : rawKeyedState) {
+				SavepointV1Serializer.serializeKeyGroupStateHandle(keyedStateHandle, dos);
+			}
+		} else {
+			// this means no operator states, not even an empty list
+			dos.writeInt(-1);
+		}
+
+		Collection<KeyGroupsStateHandle> managedKeyedState = state.getManagedKeyedState();
+		if (managedKeyedState != null) {
+			dos.writeInt(managedKeyedState.size());
+			for (KeyGroupsStateHandle keyedStateHandle : managedKeyedState) {
+				SavepointV1Serializer.serializeKeyGroupStateHandle(keyedStateHandle, dos);
+			}
+		} else {
+			// this means no operator states, not even an empty list
+			dos.writeInt(-1);
+		}
+
+		dos.flush();
+		out.close();
+	}
+
+	public static OperatorStateHandles readStateHandle(String path) throws IOException, ClassNotFoundException {
+		FileInputStream in = new FileInputStream(path);
+		DataInputStream dis = new DataInputStream(in);
+		int index = dis.readInt();
+
+		StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis);
+
+		List<OperatorStateHandle> rawOperatorState = null;
+		int numRawOperatorStates = dis.readInt();
+		if (numRawOperatorStates >= 0) {
+			rawOperatorState = new ArrayList<>();
+			for (int i = 0; i < numRawOperatorStates; i++) {
+				OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+						dis);
+				rawOperatorState.add(operatorState);
+			}
+		}
+
+		List<OperatorStateHandle> managedOperatorState = null;
+		int numManagedOperatorStates = dis.readInt();
+		if (numManagedOperatorStates >= 0) {
+			managedOperatorState = new ArrayList<>();
+			for (int i = 0; i < numManagedOperatorStates; i++) {
+				OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+						dis);
+				managedOperatorState.add(operatorState);
+			}
+		}
+
+		List<KeyGroupsStateHandle> rawKeyedState = null;
+		int numRawKeyedStates = dis.readInt();
+		if (numRawKeyedStates >= 0) {
+			rawKeyedState = new ArrayList<>();
+			for (int i = 0; i < numRawKeyedStates; i++) {
+				KeyGroupsStateHandle keyedState = SavepointV1Serializer.deserializeKeyGroupStateHandle(
+						dis);
+				rawKeyedState.add(keyedState);
+			}
+		}
+
+		List<KeyGroupsStateHandle> managedKeyedState = null;
+		int numManagedKeyedStates = dis.readInt();
+		if (numManagedKeyedStates >= 0) {
+			managedKeyedState = new ArrayList<>();
+			for (int i = 0; i < numManagedKeyedStates; i++) {
+				KeyGroupsStateHandle keyedState = SavepointV1Serializer.deserializeKeyGroupStateHandle(
+						dis);
+				managedKeyedState.add(keyedState);
+			}
+		}
+
+		return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+	}
+}