You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/19 21:10:05 UTC
[4/5] flink git commit: [FLINK-6439] Fix close OutputStream &&
InputStream in OperatorSnapshotUtil
[FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil
This closes #3904.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65fdadac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65fdadac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65fdadac
Branch: refs/heads/master
Commit: 65fdadac805cb1efe30ff9a57605676b1b8e45b9
Parents: 17ec6f0
Author: zjureel <zj...@gmail.com>
Authored: Mon May 15 18:14:11 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri May 19 21:08:34 2017 +0200
----------------------------------------------------------------------
.../streaming/util/OperatorSnapshotUtil.java | 162 ++++++++++---------
1 file changed, 82 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/65fdadac/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 92a9452..8011279 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -46,111 +46,113 @@ public class OperatorSnapshotUtil {
public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException {
FileOutputStream out = new FileOutputStream(path);
- DataOutputStream dos = new DataOutputStream(out);
-
- dos.writeInt(state.getOperatorChainIndex());
-
- SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos);
-
- Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState();
- if (rawOperatorState != null) {
- dos.writeInt(rawOperatorState.size());
- for (OperatorStateHandle operatorStateHandle : rawOperatorState) {
- SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+
+ try (DataOutputStream dos = new DataOutputStream(out)) {
+
+ dos.writeInt(state.getOperatorChainIndex());
+
+ SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos);
+
+ Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState();
+ if (rawOperatorState != null) {
+ dos.writeInt(rawOperatorState.size());
+ for (OperatorStateHandle operatorStateHandle : rawOperatorState) {
+ SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ }
+ } else {
+ // this means no states, not even an empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no states, not even an empty list
- dos.writeInt(-1);
- }
- Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState();
- if (managedOperatorState != null) {
- dos.writeInt(managedOperatorState.size());
- for (OperatorStateHandle operatorStateHandle : managedOperatorState) {
- SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState();
+ if (managedOperatorState != null) {
+ dos.writeInt(managedOperatorState.size());
+ for (OperatorStateHandle operatorStateHandle : managedOperatorState) {
+ SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+ }
+ } else {
+ // this means no states, not even an empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no states, not even an empty list
- dos.writeInt(-1);
- }
- Collection<KeyedStateHandle> rawKeyedState = state.getRawKeyedState();
- if (rawKeyedState != null) {
- dos.writeInt(rawKeyedState.size());
- for (KeyedStateHandle keyedStateHandle : rawKeyedState) {
- SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ Collection<KeyedStateHandle> rawKeyedState = state.getRawKeyedState();
+ if (rawKeyedState != null) {
+ dos.writeInt(rawKeyedState.size());
+ for (KeyedStateHandle keyedStateHandle : rawKeyedState) {
+ SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ }
+ } else {
+ // this means no operator states, not even an empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no operator states, not even an empty list
- dos.writeInt(-1);
- }
- Collection<KeyedStateHandle> managedKeyedState = state.getManagedKeyedState();
- if (managedKeyedState != null) {
- dos.writeInt(managedKeyedState.size());
- for (KeyedStateHandle keyedStateHandle : managedKeyedState) {
- SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ Collection<KeyedStateHandle> managedKeyedState = state.getManagedKeyedState();
+ if (managedKeyedState != null) {
+ dos.writeInt(managedKeyedState.size());
+ for (KeyedStateHandle keyedStateHandle : managedKeyedState) {
+ SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos);
+ }
+ } else {
+ // this means no operator states, not even an empty list
+ dos.writeInt(-1);
}
- } else {
- // this means no operator states, not even an empty list
- dos.writeInt(-1);
- }
- dos.flush();
- out.close();
+ dos.flush();
+ }
}
public static OperatorStateHandles readStateHandle(String path) throws IOException, ClassNotFoundException {
FileInputStream in = new FileInputStream(path);
- DataInputStream dis = new DataInputStream(in);
- int index = dis.readInt();
+ try (DataInputStream dis = new DataInputStream(in)) {
+ int index = dis.readInt();
- StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis);
+ StreamStateHandle legacyState = SavepointV1Serializer.deserializeStreamStateHandle(dis);
- List<OperatorStateHandle> rawOperatorState = null;
- int numRawOperatorStates = dis.readInt();
- if (numRawOperatorStates >= 0) {
- rawOperatorState = new ArrayList<>();
- for (int i = 0; i < numRawOperatorStates; i++) {
- OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+ List<OperatorStateHandle> rawOperatorState = null;
+ int numRawOperatorStates = dis.readInt();
+ if (numRawOperatorStates >= 0) {
+ rawOperatorState = new ArrayList<>();
+ for (int i = 0; i < numRawOperatorStates; i++) {
+ OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
dis);
- rawOperatorState.add(operatorState);
+ rawOperatorState.add(operatorState);
+ }
}
- }
- List<OperatorStateHandle> managedOperatorState = null;
- int numManagedOperatorStates = dis.readInt();
- if (numManagedOperatorStates >= 0) {
- managedOperatorState = new ArrayList<>();
- for (int i = 0; i < numManagedOperatorStates; i++) {
- OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
+ List<OperatorStateHandle> managedOperatorState = null;
+ int numManagedOperatorStates = dis.readInt();
+ if (numManagedOperatorStates >= 0) {
+ managedOperatorState = new ArrayList<>();
+ for (int i = 0; i < numManagedOperatorStates; i++) {
+ OperatorStateHandle operatorState = SavepointV1Serializer.deserializeOperatorStateHandle(
dis);
- managedOperatorState.add(operatorState);
+ managedOperatorState.add(operatorState);
+ }
}
- }
- List<KeyedStateHandle> rawKeyedState = null;
- int numRawKeyedStates = dis.readInt();
- if (numRawKeyedStates >= 0) {
- rawKeyedState = new ArrayList<>();
- for (int i = 0; i < numRawKeyedStates; i++) {
- KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
+ List<KeyedStateHandle> rawKeyedState = null;
+ int numRawKeyedStates = dis.readInt();
+ if (numRawKeyedStates >= 0) {
+ rawKeyedState = new ArrayList<>();
+ for (int i = 0; i < numRawKeyedStates; i++) {
+ KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
dis);
- rawKeyedState.add(keyedState);
+ rawKeyedState.add(keyedState);
+ }
}
- }
- List<KeyedStateHandle> managedKeyedState = null;
- int numManagedKeyedStates = dis.readInt();
- if (numManagedKeyedStates >= 0) {
- managedKeyedState = new ArrayList<>();
- for (int i = 0; i < numManagedKeyedStates; i++) {
- KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
+ List<KeyedStateHandle> managedKeyedState = null;
+ int numManagedKeyedStates = dis.readInt();
+ if (numManagedKeyedStates >= 0) {
+ managedKeyedState = new ArrayList<>();
+ for (int i = 0; i < numManagedKeyedStates; i++) {
+ KeyedStateHandle keyedState = SavepointV1Serializer.deserializeKeyedStateHandle(
dis);
- managedKeyedState.add(keyedState);
+ managedKeyedState.add(keyedState);
+ }
}
- }
- return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+ return new OperatorStateHandles(index, legacyState, managedKeyedState, rawKeyedState, managedOperatorState, rawOperatorState);
+ }
}
}