You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/05/03 14:28:11 UTC

[05/13] flink git commit: [FLINK-5969] Add OperatorSnapshotUtil

[FLINK-5969] Add OperatorSnapshotUtil

This has methods for storing/reading OperatorStateHandles, as returned
from stream operator test harnesses. This can be used to write binary
snapshots for use in state migration tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2779197f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2779197f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2779197f

Branch: refs/heads/master
Commit: 2779197f237446e3bff4e9e15f90c24d721c8ab4
Parents: 9ed98f2
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Apr 24 12:31:53 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 3 16:24:26 2017 +0200

----------------------------------------------------------------------
 .../savepoint/SavepointV1Serializer.java        |  27 ++--
 .../streaming/util/OperatorSnapshotUtil.java    | 156 +++++++++++++++++++
 2 files changed, 174 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2779197f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index aaa8cdd..f67d54c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
@@ -43,12 +45,13 @@ import java.util.Map;
 
 /**
  * Deserializer for checkpoints written in format {@code 1} (Flink 1.2.x format)
- * 
+ *
  * <p>In contrast to the previous versions, this serializer makes sure that no Java
- * serialization is used for serialization. Therefore, we don't rely on any involved 
+ * serialization is used for serialization. Therefore, we don't rely on any involved
  * classes to stay the same.
  */
-class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
+@Internal
+public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 
 	private static final byte NULL_HANDLE = 0;
 	private static final byte BYTE_STREAM_STATE_HANDLE = 1;
@@ -210,7 +213,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 				keyedStateStream);
 	}
 
-	private static void serializeKeyedStateHandle(
+	@VisibleForTesting
+	public static void serializeKeyedStateHandle(
 			KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
 
 		if (stateHandle == null) {
@@ -230,7 +234,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 		}
 	}
 
-	private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
+	@VisibleForTesting
+	public static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
 		final int type = dis.readByte();
 		if (NULL_HANDLE == type) {
 			return null;
@@ -251,7 +256,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 		}
 	}
 
-	private static void serializeOperatorStateHandle(
+	@VisibleForTesting
+	public static void serializeOperatorStateHandle(
 			OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
 
 		if (stateHandle != null) {
@@ -279,7 +285,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 		}
 	}
 
-	private static OperatorStateHandle deserializeOperatorStateHandle(
+	@VisibleForTesting
+	public static OperatorStateHandle deserializeOperatorStateHandle(
 			DataInputStream dis) throws IOException {
 
 		final int type = dis.readByte();
@@ -310,7 +317,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 		}
 	}
 
-	private static void serializeStreamStateHandle(
+	@VisibleForTesting
+	public static void serializeStreamStateHandle(
 			StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
 
 		if (stateHandle == null) {
@@ -337,7 +345,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 		dos.flush();
 	}
 
-	private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+	@VisibleForTesting
+	public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
 		int type = dis.read();
 		if (NULL_HANDLE == type) {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/2779197f/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
new file mode 100644
index 0000000..92a9452
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+
+/**
+ * Util for writing/reading {@link org.apache.flink.streaming.runtime.tasks.OperatorStateHandles},
+ * for use in tests.
+ */
+public class OperatorSnapshotUtil {
+
+	public static String getResourceFilename(String filename) {
+		ClassLoader cl = OperatorSnapshotUtil.class.getClassLoader();
+		URL resource = cl.getResource(filename);
+		return resource.getFile();
+	}
+
+	public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException {
+		FileOutputStream out = new FileOutputStream(path);
+		DataOutputStream dos = new DataOutputStream(out);
+
+		dos.writeInt(state.getOperatorChainIndex());
+
+		SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos);
+
+		Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState();
+		if (rawOperatorState != null) {
+			dos.writeInt(rawOperatorState.size());
+			for (OperatorStateHandle operatorStateHandle : rawOperatorState) {
+				SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+			}
+		} else {
+			// this means no states, not even an empty list
+			dos.writeInt(-1);
+		}
+
+		Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState();
+		if (managedOperatorState != null) {
+			dos.writeInt(managedOperatorState.size());
+			for (OperatorStateHandle operatorStateHandle : managedOperatorState) {
+				SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+			}
+		} else {
+			// this means no states, not even an empty list
+			dos.writeInt(-1);
+		}
+
+		Collection<KeyedStateHandle> rawKeyedState = state.getRawKeyedState();
+		if (rawKeyedState != null) {
+			dos.writeInt(rawKeyedState.size());
+			for (KeyedStateHandle keyedStateHandle : rawKeyedState) {
+				SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+			}
+		} else {
+			// this means no operator states, not even an empty list
+			dos.writeInt(-1);
+		}
+
+		Collection<KeyedStateHandle> managedKeyedState = state.getManagedKeyedState();
+		if (managedKeyedState != null) {
+			dos.writeInt(managedKeyedState.size());
+			for (KeyedStateHandle keyedStateHandle : managedKeyedState) {
+				SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+			}
+		} else {
+			// this means no operator states, not even an empty list
+			dos.writeInt(-1);
+		}
+
+		dos.flush();
+		out.close();
+	}
+
+	public static OperatorStateHandles readStateHandle(String path) throws IOException, ClassNotFoundException {
+		FileInputStream in = new FileInputStream(path);
+		DataInputStream dis = new DataInputStream(in);
+		int index = dis.readInt();
+
+		StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis);
+
+		List<OperatorStateHandle> rawOperatorState = null;
+		int numRawOperatorStates = dis.readInt();
+		if (numRawOperatorStates >= 0) {
+			rawOperatorState = new ArrayList<>();
+			for (int i = 0; i < numRawOperatorStates; i++) {
+				OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+						dis);
+				rawOperatorState.add(operatorState);
+			}
+		}
+
+		List<OperatorStateHandle> managedOperatorState = null;
+		int numManagedOperatorStates = dis.readInt();
+		if (numManagedOperatorStates >= 0) {
+			managedOperatorState = new ArrayList<>();
+			for (int i = 0; i < numManagedOperatorStates; i++) {
+				OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+						dis);
+				managedOperatorState.add(operatorState);
+			}
+		}
+
+		List<KeyedStateHandle> rawKeyedState = null;
+		int numRawKeyedStates = dis.readInt();
+		if (numRawKeyedStates >= 0) {
+			rawKeyedState = new ArrayList<>();
+			for (int i = 0; i < numRawKeyedStates; i++) {
+				KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
+						dis);
+				rawKeyedState.add(keyedState);
+			}
+		}
+
+		List<KeyedStateHandle> managedKeyedState = null;
+		int numManagedKeyedStates = dis.readInt();
+		if (numManagedKeyedStates >= 0) {
+			managedKeyedState = new ArrayList<>();
+			for (int i = 0; i < numManagedKeyedStates; i++) {
+				KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
+						dis);
+				managedKeyedState.add(keyedState);
+			}
+		}
+
+		return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+	}
+}