You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/05/07 19:55:02 UTC
flink git commit: [FLINK-8237] [bucketSink] BucketingSink returns
error message if Writer.duplicate returns null.
Repository: flink
Updated Branches:
refs/heads/release-1.5 5c53dfacc -> 7d6c81d4b
[FLINK-8237] [bucketSink] BucketingSink returns error message if Writer.duplicate returns null.
This closes #5927.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d6c81d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d6c81d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d6c81d4
Branch: refs/heads/release-1.5
Commit: 7d6c81d4b8b7340c53717e38725dbe1e3199dfd2
Parents: 5c53dfa
Author: Pavel Shvetsov <pa...@epam.com>
Authored: Fri Apr 27 10:28:58 2018 +0300
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon May 7 21:23:05 2018 +0200
----------------------------------------------------------------------
.../flink/streaming/connectors/fs/bucketing/BucketingSink.java | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7d6c81d4/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 594ff87..fe712ae 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -550,6 +550,12 @@ public class BucketingSink<T>
Path inProgressPath = getInProgressPathFor(partPath);
if (bucketState.writer == null) {
bucketState.writer = writerTemplate.duplicate();
+ if (bucketState.writer == null) {
+ throw new UnsupportedOperationException(
+ "Could not duplicate writer. " +
+ "Class '" + writerTemplate.getClass().getCanonicalName() + "' must implement the 'Writer.duplicate()' method."
+ );
+ }
}
bucketState.writer.open(fs, inProgressPath);