You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/11 10:05:31 UTC
[9/9] flink git commit: [FLINK-9584][connector] Properly close output
streams in Bucketing-/RollingSink
[FLINK-9584][connector] Properly close output streams in Bucketing-/RollingSink
This closes #6164.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04a7cd4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04a7cd4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04a7cd4d
Branch: refs/heads/master
Commit: 04a7cd4d9232e757560e156a684e224d54b71176
Parents: 5dbb6dd
Author: sihuazhou <su...@163.com>
Authored: Thu Jun 14 18:12:20 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 11 12:05:10 2018 +0200
----------------------------------------------------------------------
.../flink/streaming/connectors/fs/RollingSink.java | 11 ++++-------
.../streaming/connectors/fs/bucketing/BucketingSink.java | 11 ++++-------
2 files changed, 8 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/04a7cd4d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
index 709a7c9..9ec97b7 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
@@ -533,12 +533,9 @@ public class RollingSink<T> extends RichSinkFunction<T>
}
// verify that truncate actually works
- FSDataOutputStream outputStream;
Path testPath = new Path(UUID.randomUUID().toString());
- try {
- outputStream = fs.create(testPath);
+ try (FSDataOutputStream outputStream = fs.create(testPath)) {
outputStream.writeUTF("hello");
- outputStream.close();
} catch (IOException e) {
LOG.error("Could not create file for checking if truncate works.", e);
throw new RuntimeException("Could not create file for checking if truncate works.", e);
@@ -702,9 +699,9 @@ public class RollingSink<T> extends RichSinkFunction<T>
Path validLengthFilePath = getValidLengthPathFor(partPath);
if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
- FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
- lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
- lengthFileOut.close();
+ try (FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath)) {
+ lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04a7cd4d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 34fb1b7..e55aff5 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -631,12 +631,9 @@ public class BucketingSink<T>
}
// verify that truncate actually works
- FSDataOutputStream outputStream;
Path testPath = new Path(UUID.randomUUID().toString());
- try {
- outputStream = fs.create(testPath);
+ try (FSDataOutputStream outputStream = fs.create(testPath)) {
outputStream.writeUTF("hello");
- outputStream.close();
} catch (IOException e) {
LOG.error("Could not create file for checking if truncate works.", e);
throw new RuntimeException("Could not create file for checking if truncate works. " +
@@ -880,9 +877,9 @@ public class BucketingSink<T>
Path validLengthFilePath = getValidLengthPathFor(partPath);
if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, validLength);
- FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
- lengthFileOut.writeUTF(Long.toString(validLength));
- lengthFileOut.close();
+ try (FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath)) {
+ lengthFileOut.writeUTF(Long.toString(validLength));
+ }
}
}