You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/31 09:22:18 UTC

[flink] branch master updated: [FLINK-9976][streaming] Rework StreamingFileSink builders

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 23161ee  [FLINK-9976][streaming] Rework StreamingFileSink builders
23161ee is described below

commit 23161ee91391fae4734a6f2f56e95e00d5d298ec
Author: Chesnay <ch...@apache.org>
AuthorDate: Tue Jul 31 11:22:15 2018 +0200

    [FLINK-9976][streaming] Rework StreamingFileSink builders
---
 .../sink/filesystem/StreamingFileSink.java         | 70 ++++++++++++----------
 1 file changed, 40 insertions(+), 30 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 0ebcc4f..7daefc8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -186,52 +186,56 @@ public class StreamingFileSink<IN>
 
 		private static final long serialVersionUID = 1L;
 
-		private long bucketCheckInterval = 60L * 1000L;
+		private final long bucketCheckInterval;
 
 		private final Path basePath;
 
 		private final Encoder<IN> encoder;
 
-		private Bucketer<IN, BucketID> bucketer;
+		private final Bucketer<IN, BucketID> bucketer;
 
-		private RollingPolicy<IN, BucketID> rollingPolicy;
+		private final RollingPolicy<IN, BucketID> rollingPolicy;
 
-		private BucketFactory<IN, BucketID> bucketFactory = new DefaultBucketFactory<>();
+		private final BucketFactory<IN, BucketID> bucketFactory;
 
 		RowFormatBuilder(Path basePath, Encoder<IN> encoder, Bucketer<IN, BucketID> bucketer) {
+			this(basePath, encoder, bucketer, DefaultRollingPolicy.create().build(), 60L * 1000L, new DefaultBucketFactory<>());
+		}
+
+		private RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				Bucketer<IN, BucketID> bucketer,
+				RollingPolicy<IN, BucketID> rollingPolicy,
+				long bucketCheckInterval,
+				BucketFactory<IN, BucketID> bucketFactory) {
 			this.basePath = Preconditions.checkNotNull(basePath);
 			this.encoder = Preconditions.checkNotNull(encoder);
 			this.bucketer = Preconditions.checkNotNull(bucketer);
-			this.rollingPolicy = DefaultRollingPolicy.create().build();
+			this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
+			this.bucketCheckInterval = bucketCheckInterval;
+			this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
 		}
 
 		public StreamingFileSink.RowFormatBuilder<IN, BucketID> withBucketCheckInterval(final long interval) {
-			this.bucketCheckInterval = interval;
-			return this;
+			return new RowFormatBuilder<>(basePath, encoder, bucketer, rollingPolicy, interval, bucketFactory);
 		}
 
 		public StreamingFileSink.RowFormatBuilder<IN, BucketID> withBucketer(final Bucketer<IN, BucketID> bucketer) {
-			this.bucketer = Preconditions.checkNotNull(bucketer);
-			return this;
+			return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(bucketer), rollingPolicy, bucketCheckInterval, bucketFactory);
 		}
 
 		public StreamingFileSink.RowFormatBuilder<IN, BucketID> withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
-			this.rollingPolicy = Preconditions.checkNotNull(policy);
-			return this;
+			return new RowFormatBuilder<>(basePath, encoder, bucketer, Preconditions.checkNotNull(policy), bucketCheckInterval, bucketFactory);
 		}
 
 		public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<IN, ID> policy) {
-			@SuppressWarnings("unchecked")
-			StreamingFileSink.RowFormatBuilder<IN, ID> reInterpreted = (StreamingFileSink.RowFormatBuilder<IN, ID>) this;
-			reInterpreted.bucketer = Preconditions.checkNotNull(bucketer);
-			reInterpreted.rollingPolicy = Preconditions.checkNotNull(policy);
-			return reInterpreted;
+			return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(bucketer), Preconditions.checkNotNull(policy), bucketCheckInterval, new DefaultBucketFactory<>());
 		}
 
 		@VisibleForTesting
 		StreamingFileSink.RowFormatBuilder<IN, BucketID> withBucketFactory(final BucketFactory<IN, BucketID> factory) {
-			this.bucketFactory = Preconditions.checkNotNull(factory);
-			return this;
+			return new RowFormatBuilder<>(basePath, encoder, bucketer, rollingPolicy, bucketCheckInterval, Preconditions.checkNotNull(factory));
 		}
 
 		/** Creates the actual sink. */
@@ -259,38 +263,44 @@ public class StreamingFileSink<IN>
 
 		private static final long serialVersionUID = 1L;
 
-		private long bucketCheckInterval = 60L * 1000L;
+		private final long bucketCheckInterval;
 
 		private final Path basePath;
 
 		private final BulkWriter.Factory<IN> writerFactory;
 
-		private Bucketer<IN, BucketID> bucketer;
+		private final Bucketer<IN, BucketID> bucketer;
 
-		private BucketFactory<IN, BucketID> bucketFactory = new DefaultBucketFactory<>();
+		private final BucketFactory<IN, BucketID> bucketFactory;
 
 		BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, Bucketer<IN, BucketID> bucketer) {
+			this(basePath, writerFactory, bucketer, 60L * 1000L, new DefaultBucketFactory<>());
+		}
+
+		private BulkFormatBuilder(
+				Path basePath,
+				BulkWriter.Factory<IN> writerFactory,
+				Bucketer<IN, BucketID> bucketer,
+				long bucketCheckInterval,
+				BucketFactory<IN, BucketID> bucketFactory) {
 			this.basePath = Preconditions.checkNotNull(basePath);
-			this.writerFactory = Preconditions.checkNotNull(writerFactory);
+			this.writerFactory = writerFactory;
 			this.bucketer = Preconditions.checkNotNull(bucketer);
+			this.bucketCheckInterval = bucketCheckInterval;
+			this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
 		}
 
 		public StreamingFileSink.BulkFormatBuilder<IN, BucketID> withBucketCheckInterval(long interval) {
-			this.bucketCheckInterval = interval;
-			return this;
+			return new BulkFormatBuilder<>(basePath, writerFactory, bucketer, interval, bucketFactory);
 		}
 
 		public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID> withBucketer(Bucketer<IN, ID> bucketer) {
-			@SuppressWarnings("unchecked")
-			StreamingFileSink.BulkFormatBuilder<IN, ID> reInterpreted = (StreamingFileSink.BulkFormatBuilder<IN, ID>) this;
-			reInterpreted.bucketer = Preconditions.checkNotNull(bucketer);
-			return reInterpreted;
+			return new BulkFormatBuilder<>(basePath, writerFactory, Preconditions.checkNotNull(bucketer), bucketCheckInterval, new DefaultBucketFactory<>());
 		}
 
 		@VisibleForTesting
 		StreamingFileSink.BulkFormatBuilder<IN, BucketID> withBucketFactory(final BucketFactory<IN, BucketID> factory) {
-			this.bucketFactory = Preconditions.checkNotNull(factory);
-			return this;
+			return new BulkFormatBuilder<>(basePath, writerFactory, bucketer, bucketCheckInterval, Preconditions.checkNotNull(factory));
 		}
 
 		/** Creates the actual sink. */