You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/19 21:10:05 UTC

[4/5] flink git commit: [FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil

[FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil

This closes #3904.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65fdadac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65fdadac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65fdadac

Branch: refs/heads/master
Commit: 65fdadac805cb1efe30ff9a57605676b1b8e45b9
Parents: 17ec6f0
Author: zjureel <zj...@gmail.com>
Authored: Mon May 15 18:14:11 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri May 19 21:08:34 2017 +0200

----------------------------------------------------------------------
 .../streaming/util/OperatorSnapshotUtil.java    | 162 ++++++++++---------
 1 file changed, 82 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/65fdadac/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 92a9452..8011279 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -46,111 +46,113 @@ public class OperatorSnapshotUtil {
 
 	public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException {
 		FileOutputStream out = new FileOutputStream(path);
-		DataOutputStream dos = new DataOutputStream(out);
-
-		dos.writeInt(state.getOperatorChainIndex());
-
-		SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos);
-
-		Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState();
-		if (rawOperatorState != null) {
-			dos.writeInt(rawOperatorState.size());
-			for (OperatorStateHandle operatorStateHandle : rawOperatorState) {
-				SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+		
+		try (DataOutputStream dos = new DataOutputStream(out)) {
+
+			dos.writeInt(state.getOperatorChainIndex());
+
+			SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos);
+
+			Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState();
+			if (rawOperatorState != null) {
+				dos.writeInt(rawOperatorState.size());
+				for (OperatorStateHandle operatorStateHandle : rawOperatorState) {
+					SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+				}
+			} else {
+				// this means no states, not even an empty list
+				dos.writeInt(-1);
 			}
-		} else {
-			// this means no states, not even an empty list
-			dos.writeInt(-1);
-		}
 
-		Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState();
-		if (managedOperatorState != null) {
-			dos.writeInt(managedOperatorState.size());
-			for (OperatorStateHandle operatorStateHandle : managedOperatorState) {
-				SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+			Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState();
+			if (managedOperatorState != null) {
+				dos.writeInt(managedOperatorState.size());
+				for (OperatorStateHandle operatorStateHandle : managedOperatorState) {
+					SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+				}
+			} else {
+				// this means no states, not even an empty list
+				dos.writeInt(-1);
 			}
-		} else {
-			// this means no states, not even an empty list
-			dos.writeInt(-1);
-		}
 
-		Collection<KeyedStateHandle> rawKeyedState = state.getRawKeyedState();
-		if (rawKeyedState != null) {
-			dos.writeInt(rawKeyedState.size());
-			for (KeyedStateHandle keyedStateHandle : rawKeyedState) {
-				SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+			Collection<KeyedStateHandle> rawKeyedState = state.getRawKeyedState();
+			if (rawKeyedState != null) {
+				dos.writeInt(rawKeyedState.size());
+				for (KeyedStateHandle keyedStateHandle : rawKeyedState) {
+					SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+				}
+			} else {
+				// this means no operator states, not even an empty list
+				dos.writeInt(-1);
 			}
-		} else {
-			// this means no operator states, not even an empty list
-			dos.writeInt(-1);
-		}
 
-		Collection<KeyedStateHandle> managedKeyedState = state.getManagedKeyedState();
-		if (managedKeyedState != null) {
-			dos.writeInt(managedKeyedState.size());
-			for (KeyedStateHandle keyedStateHandle : managedKeyedState) {
-				SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+			Collection<KeyedStateHandle> managedKeyedState = state.getManagedKeyedState();
+			if (managedKeyedState != null) {
+				dos.writeInt(managedKeyedState.size());
+				for (KeyedStateHandle keyedStateHandle : managedKeyedState) {
+					SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+				}
+			} else {
+				// this means no operator states, not even an empty list
+				dos.writeInt(-1);
 			}
-		} else {
-			// this means no operator states, not even an empty list
-			dos.writeInt(-1);
-		}
 
-		dos.flush();
-		out.close();
+			dos.flush();
+		}
 	}
 
 	public static OperatorStateHandles readStateHandle(String path) throws IOException, ClassNotFoundException {
 		FileInputStream in = new FileInputStream(path);
-		DataInputStream dis = new DataInputStream(in);
-		int index = dis.readInt();
+		try (DataInputStream dis = new DataInputStream(in)) {
+			int index = dis.readInt();
 
-		StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis);
+			StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis);
 
-		List<OperatorStateHandle> rawOperatorState = null;
-		int numRawOperatorStates = dis.readInt();
-		if (numRawOperatorStates >= 0) {
-			rawOperatorState = new ArrayList<>();
-			for (int i = 0; i < numRawOperatorStates; i++) {
-				OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+			List<OperatorStateHandle> rawOperatorState = null;
+			int numRawOperatorStates = dis.readInt();
+			if (numRawOperatorStates >= 0) {
+				rawOperatorState = new ArrayList<>();
+				for (int i = 0; i < numRawOperatorStates; i++) {
+					OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
 						dis);
-				rawOperatorState.add(operatorState);
+					rawOperatorState.add(operatorState);
+				}
 			}
-		}
 
-		List<OperatorStateHandle> managedOperatorState = null;
-		int numManagedOperatorStates = dis.readInt();
-		if (numManagedOperatorStates >= 0) {
-			managedOperatorState = new ArrayList<>();
-			for (int i = 0; i < numManagedOperatorStates; i++) {
-				OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+			List<OperatorStateHandle> managedOperatorState = null;
+			int numManagedOperatorStates = dis.readInt();
+			if (numManagedOperatorStates >= 0) {
+				managedOperatorState = new ArrayList<>();
+				for (int i = 0; i < numManagedOperatorStates; i++) {
+					OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
 						dis);
-				managedOperatorState.add(operatorState);
+					managedOperatorState.add(operatorState);
+				}
 			}
-		}
 
-		List<KeyedStateHandle> rawKeyedState = null;
-		int numRawKeyedStates = dis.readInt();
-		if (numRawKeyedStates >= 0) {
-			rawKeyedState = new ArrayList<>();
-			for (int i = 0; i < numRawKeyedStates; i++) {
-				KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
+			List<KeyedStateHandle> rawKeyedState = null;
+			int numRawKeyedStates = dis.readInt();
+			if (numRawKeyedStates >= 0) {
+				rawKeyedState = new ArrayList<>();
+				for (int i = 0; i < numRawKeyedStates; i++) {
+					KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
 						dis);
-				rawKeyedState.add(keyedState);
+					rawKeyedState.add(keyedState);
+				}
 			}
-		}
 
-		List<KeyedStateHandle> managedKeyedState = null;
-		int numManagedKeyedStates = dis.readInt();
-		if (numManagedKeyedStates >= 0) {
-			managedKeyedState = new ArrayList<>();
-			for (int i = 0; i < numManagedKeyedStates; i++) {
-				KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
+			List<KeyedStateHandle> managedKeyedState = null;
+			int numManagedKeyedStates = dis.readInt();
+			if (numManagedKeyedStates >= 0) {
+				managedKeyedState = new ArrayList<>();
+				for (int i = 0; i < numManagedKeyedStates; i++) {
+					KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
 						dis);
-				managedKeyedState.add(keyedState);
+					managedKeyedState.add(keyedState);
+				}
 			}
-		}
 
-		return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+			return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+		}
 	}
 }