You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/11/24 14:00:27 UTC

flink git commit: [FLINK-6294] Fix potential NPE in BucketingSink.close()

Repository: flink
Updated Branches:
  refs/heads/release-1.3 4ca1b3e7b -> 5aea49176


[FLINK-6294] Fix potential NPE in BucketingSink.close()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5aea4917
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5aea4917
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5aea4917

Branch: refs/heads/release-1.3
Commit: 5aea4917662d80899d3adff0378b97ae58aa2afc
Parents: 4ca1b3e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Nov 24 14:59:36 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Nov 24 15:00:12 2017 +0100

----------------------------------------------------------------------
 .../flink/streaming/connectors/fs/bucketing/BucketingSink.java | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5aea4917/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 20e54b8..6a4549b 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -420,8 +420,10 @@ public class BucketingSink<T>
 
 	@Override
 	public void close() throws Exception {
-		for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
-			closeCurrentPartFile(entry.getValue());
+		if (state != null) {
+			for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
+				closeCurrentPartFile(entry.getValue());
+			}
 		}
 	}