You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/11/24 14:00:27 UTC
flink git commit: [FLINK-6294] Fix potential NPE in
BucketingSink.close()
Repository: flink
Updated Branches:
refs/heads/release-1.3 4ca1b3e7b -> 5aea49176
[FLINK-6294] Fix potential NPE in BucketingSink.close()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5aea4917
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5aea4917
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5aea4917
Branch: refs/heads/release-1.3
Commit: 5aea4917662d80899d3adff0378b97ae58aa2afc
Parents: 4ca1b3e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Nov 24 14:59:36 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Nov 24 15:00:12 2017 +0100
----------------------------------------------------------------------
.../flink/streaming/connectors/fs/bucketing/BucketingSink.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5aea4917/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 20e54b8..6a4549b 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -420,8 +420,10 @@ public class BucketingSink<T>
@Override
public void close() throws Exception {
- for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
- closeCurrentPartFile(entry.getValue());
+ if (state != null) {
+ for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
+ closeCurrentPartFile(entry.getValue());
+ }
}
}