You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/05/07 19:55:02 UTC

flink git commit: [FLINK-8237] [bucketSink] BucketingSink returns error message if Writer.duplicate returns null.

Repository: flink
Updated Branches:
  refs/heads/release-1.5 5c53dfacc -> 7d6c81d4b


[FLINK-8237] [bucketSink] BucketingSink returns error message if Writer.duplicate returns null.

This closes #5927.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d6c81d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d6c81d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d6c81d4

Branch: refs/heads/release-1.5
Commit: 7d6c81d4b8b7340c53717e38725dbe1e3199dfd2
Parents: 5c53dfa
Author: Pavel Shvetsov <pa...@epam.com>
Authored: Fri Apr 27 10:28:58 2018 +0300
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon May 7 21:23:05 2018 +0200

----------------------------------------------------------------------
 .../flink/streaming/connectors/fs/bucketing/BucketingSink.java | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d6c81d4/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 594ff87..fe712ae 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -550,6 +550,12 @@ public class BucketingSink<T>
 		Path inProgressPath = getInProgressPathFor(partPath);
 		if (bucketState.writer == null) {
 			bucketState.writer = writerTemplate.duplicate();
+			if (bucketState.writer == null) {
+				throw new UnsupportedOperationException(
+					"Could not duplicate writer. " +
+						"Class '" + writerTemplate.getClass().getCanonicalName() + "' must implement the 'Writer.duplicate()' method."
+				);
+			}
 		}
 
 		bucketState.writer.open(fs, inProgressPath);