You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/11 10:05:31 UTC

[9/9] flink git commit: [FLINK-9584][connector] Properly close output streams in Bucketing-/RollingSink

[FLINK-9584][connector] Properly close output streams in Bucketing-/RollingSink

This closes #6164.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04a7cd4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04a7cd4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04a7cd4d

Branch: refs/heads/master
Commit: 04a7cd4d9232e757560e156a684e224d54b71176
Parents: 5dbb6dd
Author: sihuazhou <su...@163.com>
Authored: Thu Jun 14 18:12:20 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 11 12:05:10 2018 +0200

----------------------------------------------------------------------
 .../flink/streaming/connectors/fs/RollingSink.java       | 11 ++++-------
 .../streaming/connectors/fs/bucketing/BucketingSink.java | 11 ++++-------
 2 files changed, 8 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/04a7cd4d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 709a7c9..9ec97b7 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -533,12 +533,9 @@ public class RollingSink<T> extends RichSinkFunction<T>
 			}
 
 			// verify that truncate actually works
-			FSDataOutputStream outputStream;
 			Path testPath = new Path(UUID.randomUUID().toString());
-			try {
-				outputStream = fs.create(testPath);
+			try (FSDataOutputStream outputStream = fs.create(testPath)) {
 				outputStream.writeUTF("hello");
-				outputStream.close();
 			} catch (IOException e) {
 				LOG.error("Could not create file for checking if truncate works.", e);
 				throw new RuntimeException("Could not create file for checking if truncate works.", e);
@@ -702,9 +699,9 @@ public class RollingSink<T> extends RichSinkFunction<T>
 					Path validLengthFilePath = getValidLengthPathFor(partPath);
 					if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
 						LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
-						FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
-						lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
-						lengthFileOut.close();
+						try (FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath)) {
+							lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
+						}
 					}
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/04a7cd4d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 34fb1b7..e55aff5 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -631,12 +631,9 @@ public class BucketingSink<T>
 			}
 
 			// verify that truncate actually works
-			FSDataOutputStream outputStream;
 			Path testPath = new Path(UUID.randomUUID().toString());
-			try {
-				outputStream = fs.create(testPath);
+			try (FSDataOutputStream outputStream = fs.create(testPath)) {
 				outputStream.writeUTF("hello");
-				outputStream.close();
 			} catch (IOException e) {
 				LOG.error("Could not create file for checking if truncate works.", e);
 				throw new RuntimeException("Could not create file for checking if truncate works. " +
@@ -880,9 +877,9 @@ public class BucketingSink<T>
 					Path validLengthFilePath = getValidLengthPathFor(partPath);
 					if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
 						LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, validLength);
-						FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
-						lengthFileOut.writeUTF(Long.toString(validLength));
-						lengthFileOut.close();
+						try (FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath)) {
+							lengthFileOut.writeUTF(Long.toString(validLength));
+						}
 					}
 				}