You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/31 09:22:18 UTC
[flink] branch master updated: [FLINK-9976][streaming] Rework
StreamingFileSink builders
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 23161ee [FLINK-9976][streaming] Rework StreamingFileSink builders
23161ee is described below
commit 23161ee91391fae4734a6f2f56e95e00d5d298ec
Author: Chesnay <ch...@apache.org>
AuthorDate: Tue Jul 31 11:22:15 2018 +0200
[FLINK-9976][streaming] Rework StreamingFileSink builders
---
.../sink/filesystem/StreamingFileSink.java | 70 ++++++++++++----------
1 file changed, 40 insertions(+), 30 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 0ebcc4f..7daefc8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -186,52 +186,56 @@ public class StreamingFileSink<IN>
private static final long serialVersionUID = 1L;
- private long bucketCheckInterval = 60L * 1000L;
+ private final long bucketCheckInterval;
private final Path basePath;
private final Encoder<IN> encoder;
- private Bucketer<IN, BucketID> bucketer;
+ private final Bucketer<IN, BucketID> bucketer;
- private RollingPolicy<IN, BucketID> rollingPolicy;
+ private final RollingPolicy<IN, BucketID> rollingPolicy;
- private BucketFactory<IN, BucketID> bucketFactory = new DefaultBucketFactory<>();
+ private final BucketFactory<IN, BucketID> bucketFactory;
RowFormatBuilder(Path basePath, Encoder<IN> encoder, Bucketer<IN, BucketID> bucketer) {
+ this(basePath, encoder, bucketer, DefaultRollingPolicy.create().build(), 60L * 1000L, new DefaultBucketFactory<>());
+ }
+
+ private RowFormatBuilder(
+ Path basePath,
+ Encoder<IN> encoder,
+ Bucketer<IN, BucketID> bucketer,
+ RollingPolicy<IN, BucketID> rollingPolicy,
+ long bucketCheckInterval,
+ BucketFactory<IN, BucketID> bucketFactory) {
this.basePath = Preconditions.checkNotNull(basePath);
this.encoder = Preconditions.checkNotNull(encoder);
this.bucketer = Preconditions.checkNotNull(bucketer);
- this.rollingPolicy = DefaultRollingPolicy.create().build();
+ this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
+ this.bucketCheckInterval = bucketCheckInterval;
+ this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
}
public StreamingFileSink.RowFormatBuilder<IN, BucketID> withBucketCheckInterval(final long interval) {
- this.bucketCheckInterval = interval;
- return this;
+ return new RowFormatBuilder<>(basePath, encoder, bucketer, rollingPolicy, interval, bucketFactory);
}
public StreamingFileSink.RowFormatBuilder<IN, BucketID> withBucketer(final Bucketer<IN, BucketID> bucketer) {
- this.bucketer = Preconditions.checkNotNull(bucketer);
- return this;
+ return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(bucketer), rollingPolicy, bucketCheckInterval, bucketFactory);
}
public StreamingFileSink.RowFormatBuilder<IN, BucketID> withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
- this.rollingPolicy = Preconditions.checkNotNull(policy);
- return this;
+ return new RowFormatBuilder<>(basePath, encoder, bucketer, Preconditions.checkNotNull(policy), bucketCheckInterval, bucketFactory);
}
public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<IN, ID> policy) {
- @SuppressWarnings("unchecked")
- StreamingFileSink.RowFormatBuilder<IN, ID> reInterpreted = (StreamingFileSink.RowFormatBuilder<IN, ID>) this;
- reInterpreted.bucketer = Preconditions.checkNotNull(bucketer);
- reInterpreted.rollingPolicy = Preconditions.checkNotNull(policy);
- return reInterpreted;
+ return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(bucketer), Preconditions.checkNotNull(policy), bucketCheckInterval, new DefaultBucketFactory<>());
}
@VisibleForTesting
StreamingFileSink.RowFormatBuilder<IN, BucketID> withBucketFactory(final BucketFactory<IN, BucketID> factory) {
- this.bucketFactory = Preconditions.checkNotNull(factory);
- return this;
+ return new RowFormatBuilder<>(basePath, encoder, bucketer, rollingPolicy, bucketCheckInterval, Preconditions.checkNotNull(factory));
}
/** Creates the actual sink. */
@@ -259,38 +263,44 @@ public class StreamingFileSink<IN>
private static final long serialVersionUID = 1L;
- private long bucketCheckInterval = 60L * 1000L;
+ private final long bucketCheckInterval;
private final Path basePath;
private final BulkWriter.Factory<IN> writerFactory;
- private Bucketer<IN, BucketID> bucketer;
+ private final Bucketer<IN, BucketID> bucketer;
- private BucketFactory<IN, BucketID> bucketFactory = new DefaultBucketFactory<>();
+ private final BucketFactory<IN, BucketID> bucketFactory;
BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, Bucketer<IN, BucketID> bucketer) {
+ this(basePath, writerFactory, bucketer, 60L * 1000L, new DefaultBucketFactory<>());
+ }
+
+ private BulkFormatBuilder(
+ Path basePath,
+ BulkWriter.Factory<IN> writerFactory,
+ Bucketer<IN, BucketID> bucketer,
+ long bucketCheckInterval,
+ BucketFactory<IN, BucketID> bucketFactory) {
this.basePath = Preconditions.checkNotNull(basePath);
- this.writerFactory = Preconditions.checkNotNull(writerFactory);
+ this.writerFactory = writerFactory;
this.bucketer = Preconditions.checkNotNull(bucketer);
+ this.bucketCheckInterval = bucketCheckInterval;
+ this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
}
public StreamingFileSink.BulkFormatBuilder<IN, BucketID> withBucketCheckInterval(long interval) {
- this.bucketCheckInterval = interval;
- return this;
+ return new BulkFormatBuilder<>(basePath, writerFactory, bucketer, interval, bucketFactory);
}
public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID> withBucketer(Bucketer<IN, ID> bucketer) {
- @SuppressWarnings("unchecked")
- StreamingFileSink.BulkFormatBuilder<IN, ID> reInterpreted = (StreamingFileSink.BulkFormatBuilder<IN, ID>) this;
- reInterpreted.bucketer = Preconditions.checkNotNull(bucketer);
- return reInterpreted;
+ return new BulkFormatBuilder<>(basePath, writerFactory, Preconditions.checkNotNull(bucketer), bucketCheckInterval, new DefaultBucketFactory<>());
}
@VisibleForTesting
StreamingFileSink.BulkFormatBuilder<IN, BucketID> withBucketFactory(final BucketFactory<IN, BucketID> factory) {
- this.bucketFactory = Preconditions.checkNotNull(factory);
- return this;
+ return new BulkFormatBuilder<>(basePath, writerFactory, bucketer, bucketCheckInterval, Preconditions.checkNotNull(factory));
}
/** Creates the actual sink. */