You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/20 17:05:18 UTC
[2/5] flink git commit: [FLINK-9903] [DataStream API] Refactor
StreamingFileSink / add bulk encoders
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 0406afc..c208079 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
@@ -30,29 +30,23 @@ import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerial
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
+import java.io.Serializable;
/**
* Sink that emits its input elements to {@link FileSystem} files within buckets. This is
@@ -69,7 +63,9 @@ import java.util.Map;
* be written to inside the base directory. The {@code Bucketer} can, for example, use time or
* a property of the element to determine the bucket directory. The default {@code Bucketer} is a
* {@link DateTimeBucketer} which will create one new bucket every hour. You can specify
- * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
+ * a custom {@code Bucketer} using the {@code setBucketer(Bucketer)} method, after calling
+ * {@link StreamingFileSink#forRowFormat(Path, Encoder)} or
+ * {@link StreamingFileSink#forBulkFormat(Path, BulkWriter.Factory)}.
*
*
* <p>The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink
@@ -94,19 +90,6 @@ import java.util.Map;
* state are transferred into the {@code finished} state while any {@code in-progress} files are rolled back, so that
* they do not contain data that arrived after the checkpoint from which we restore.
*
- * <p><b>NOTE:</b>
- * <ol>
- * <li>
- * If checkpointing is not enabled the pending files will never be moved to the finished state.
- * </li>
- * <li>
- * The part files are written using an instance of {@link Encoder}. By default, a
- * {@link SimpleStringEncoder} is used, which writes the result of {@code toString()} for
- * every element, separated by newlines. You can configure the writer using the
- * {@link #setEncoder(Encoder)}.
- * </li>
- * </ol>
- *
* @param <IN> Type of the elements emitted by this sink
*/
@PublicEvolving
@@ -116,8 +99,6 @@ public class StreamingFileSink<IN>
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(StreamingFileSink.class);
-
// -------------------------- state descriptors ---------------------------
private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC =
@@ -128,298 +109,264 @@ public class StreamingFileSink<IN>
// ------------------------ configuration fields --------------------------
- private final Path basePath;
-
- private final BucketFactory<IN> bucketFactory;
-
- private long bucketCheckInterval = 60L * 1000L;
-
- private Bucketer<IN> bucketer;
+ private final long bucketCheckInterval;
- private Encoder<IN> encoder;
-
- private RollingPolicy rollingPolicy;
+ private final StreamingFileSink.BucketsBuilder<IN, ?> bucketsBuilder;
// --------------------------- runtime fields -----------------------------
- private transient BucketerContext bucketerContext;
-
- private transient RecoverableWriter fileSystemWriter;
+ private transient Buckets<IN, ?> buckets;
private transient ProcessingTimeService processingTimeService;
- private transient Map<String, Bucket<IN>> activeBuckets;
+ // --------------------------- State Related Fields -----------------------------
- ////////////////// State Related Fields /////////////////////
+ private transient ListState<byte[]> bucketStates;
- private transient BucketStateSerializer bucketStateSerializer;
+ private transient ListState<Long> maxPartCountersState;
- private transient ListState<byte[]> restoredBucketStates;
+ /**
+ * Creates a new {@code StreamingFileSink} that writes files to the given base directory.
+ */
+ private StreamingFileSink(
+ final StreamingFileSink.BucketsBuilder<IN, ?> bucketsBuilder,
+ final long bucketCheckInterval) {
+
+ Preconditions.checkArgument(bucketCheckInterval > 0L);
- private transient ListState<Long> restoredMaxCounters;
+ this.bucketsBuilder = Preconditions.checkNotNull(bucketsBuilder);
+ this.bucketCheckInterval = bucketCheckInterval;
+ }
- private transient long initMaxPartCounter;
+ // ------------------------------------------------------------------------
- private transient long maxPartCounterUsed;
+ // --------------------------- Sink Builders -----------------------------
/**
- * Creates a new {@code StreamingFileSink} that writes files to the given base directory.
- *
- * <p>This uses a {@link DateTimeBucketer} as {@link Bucketer} and a {@link SimpleStringEncoder} as a writer.
- *
- * @param basePath The directory to which to write the bucket files.
+ * Creates the builder for a {@code StreamingFileSink} with row-encoding format.
+ * @param basePath the base path where all the buckets are going to be created as sub-directories.
+ * @param encoder the {@link Encoder} to be used when writing elements in the buckets.
+ * @param <IN> the type of incoming elements
+ * @return The builder where the remaining of the configuration parameters for the sink can be configured.
+ * In order to instantiate the sink, call {@link RowFormatBuilder#build()} after specifying the desired parameters.
*/
- public StreamingFileSink(Path basePath) {
- this(basePath, new DefaultBucketFactory<>());
+ public static <IN> StreamingFileSink.RowFormatBuilder<IN, String> forRowFormat(
+ final Path basePath, final Encoder<IN> encoder) {
+ return new StreamingFileSink.RowFormatBuilder<>(basePath, encoder, new DateTimeBucketer<>());
}
- @VisibleForTesting
- StreamingFileSink(Path basePath, BucketFactory<IN> bucketFactory) {
- this.basePath = Preconditions.checkNotNull(basePath);
- this.bucketer = new DateTimeBucketer<>();
- this.encoder = new SimpleStringEncoder<>();
- this.rollingPolicy = DefaultRollingPolicy.create().build();
- this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
+ /**
+ * Creates the builder for a {@link StreamingFileSink} with row-encoding format.
+ * @param basePath the base path where all the buckets are going to be created as sub-directories.
+ * @param writerFactory the {@link BulkWriter.Factory} to be used when writing elements in the buckets.
+ * @param <IN> the type of incoming elements
+ * @return The builder where the remaining of the configuration parameters for the sink can be configured.
+ * In order to instantiate the sink, call {@link RowFormatBuilder#build()} after specifying the desired parameters.
+ */
+ public static <IN> StreamingFileSink.BulkFormatBuilder<IN, String> forBulkFormat(
+ final Path basePath, final BulkWriter.Factory<IN> writerFactory) {
+ return new StreamingFileSink.BulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketer<>());
}
- public StreamingFileSink<IN> setEncoder(Encoder<IN> encoder) {
- this.encoder = Preconditions.checkNotNull(encoder);
- return this;
- }
+ /**
+ * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}.
+ */
+ private abstract static class BucketsBuilder<IN, BucketID> implements Serializable {
- public StreamingFileSink<IN> setBucketer(Bucketer<IN> bucketer) {
- this.bucketer = Preconditions.checkNotNull(bucketer);
- return this;
- }
+ private static final long serialVersionUID = 1L;
- public StreamingFileSink<IN> setBucketCheckInterval(long interval) {
- this.bucketCheckInterval = interval;
- return this;
+ abstract Buckets<IN, BucketID> createBuckets(final int subtaskIndex) throws IOException;
}
- public StreamingFileSink<IN> setRollingPolicy(RollingPolicy policy) {
- this.rollingPolicy = Preconditions.checkNotNull(policy);
- return this;
- }
+ /**
+ * A builder for configuring the sink for row-wise encoding formats.
+ */
+ @PublicEvolving
+ public static class RowFormatBuilder<IN, BucketID> extends StreamingFileSink.BucketsBuilder<IN, BucketID> {
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- final Iterator<Map.Entry<String, Bucket<IN>>> activeBucketIt =
- activeBuckets.entrySet().iterator();
-
- while (activeBucketIt.hasNext()) {
- Bucket<IN> bucket = activeBucketIt.next().getValue();
- bucket.commitUpToCheckpoint(checkpointId);
-
- if (!bucket.isActive()) {
- // We've dealt with all the pending files and the writer for this bucket is not currently open.
- // Therefore this bucket is currently inactive and we can remove it from our state.
- activeBucketIt.remove();
- }
- }
- }
+ private static final long serialVersionUID = 1L;
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- Preconditions.checkState(
- restoredBucketStates != null && fileSystemWriter != null && bucketStateSerializer != null,
- "sink has not been initialized");
+ private long bucketCheckInterval = 60L * 1000L;
+
+ private final Path basePath;
- restoredBucketStates.clear();
- for (Bucket<IN> bucket : activeBuckets.values()) {
+ private final Encoder<IN> encoder;
- final PartFileInfo info = bucket.getInProgressPartInfo();
- final long checkpointTimestamp = context.getCheckpointTimestamp();
+ private Bucketer<IN, BucketID> bucketer;
- if (info != null && rollingPolicy.shouldRoll(info, checkpointTimestamp)) {
- // we also check here so that we do not have to always
- // wait for the "next" element to arrive.
- bucket.closePartFile();
- }
+ private RollingPolicy<BucketID> rollingPolicy;
- final BucketState bucketState = bucket.snapshot(context.getCheckpointId());
- restoredBucketStates.add(SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer, bucketState));
+ private BucketFactory<IN, BucketID> bucketFactory = new DefaultBucketFactory<>();
+
+ RowFormatBuilder(Path basePath, Encoder<IN> encoder, Bucketer<IN, BucketID> bucketer) {
+ this.basePath = Preconditions.checkNotNull(basePath);
+ this.encoder = Preconditions.checkNotNull(encoder);
+ this.bucketer = Preconditions.checkNotNull(bucketer);
+ this.rollingPolicy = DefaultRollingPolicy.create().build();
}
- restoredMaxCounters.clear();
- restoredMaxCounters.add(maxPartCounterUsed);
- }
+ public StreamingFileSink.RowFormatBuilder<IN, BucketID> withBucketCheckInterval(final long interval) {
+ this.bucketCheckInterval = interval;
+ return this;
+ }
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- initFileSystemWriter();
+ public StreamingFileSink.RowFormatBuilder<IN, BucketID> withBucketer(final Bucketer<IN, BucketID> bucketer) {
+ this.bucketer = Preconditions.checkNotNull(bucketer);
+ return this;
+ }
- this.activeBuckets = new HashMap<>();
+ public StreamingFileSink.RowFormatBuilder<IN, BucketID> withRollingPolicy(final RollingPolicy<BucketID> policy) {
+ this.rollingPolicy = Preconditions.checkNotNull(policy);
+ return this;
+ }
- // When resuming after a failure:
- // 1) we get the max part counter used before in order to make sure that we do not overwrite valid data
- // 2) we commit any pending files for previous checkpoints (previous to the last successful one)
- // 3) we resume writing to the previous in-progress file of each bucket, and
- // 4) if we receive multiple states for the same bucket, we merge them.
+ public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<ID> policy) {
+ @SuppressWarnings("unchecked")
+ StreamingFileSink.RowFormatBuilder<IN, ID> reInterpreted = (StreamingFileSink.RowFormatBuilder<IN, ID>) this;
+ reInterpreted.bucketer = Preconditions.checkNotNull(bucketer);
+ reInterpreted.rollingPolicy = Preconditions.checkNotNull(policy);
+ return reInterpreted;
+ }
- final OperatorStateStore stateStore = context.getOperatorStateStore();
+ @VisibleForTesting
+ StreamingFileSink.RowFormatBuilder<IN, BucketID> withBucketFactory(final BucketFactory<IN, BucketID> factory) {
+ this.bucketFactory = Preconditions.checkNotNull(factory);
+ return this;
+ }
- restoredBucketStates = stateStore.getListState(BUCKET_STATE_DESC);
- restoredMaxCounters = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
+ /** Creates the actual sink. */
+ public StreamingFileSink<IN> build() {
+ return new StreamingFileSink<>(this, bucketCheckInterval);
+ }
- if (context.isRestored()) {
- final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-
- LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
-
- long maxCounter = 0L;
- for (long partCounter: restoredMaxCounters.get()) {
- maxCounter = Math.max(partCounter, maxCounter);
- }
- initMaxPartCounter = maxCounter;
-
- for (byte[] recoveredState : restoredBucketStates.get()) {
- final BucketState bucketState = SimpleVersionedSerialization.readVersionAndDeSerialize(
- bucketStateSerializer, recoveredState);
-
- final String bucketId = bucketState.getBucketId();
-
- LOG.info("Recovered bucket for {}", bucketId);
-
- final Bucket<IN> restoredBucket = bucketFactory.restoreBucket(
- fileSystemWriter,
- subtaskIndex,
- initMaxPartCounter,
- encoder,
- bucketState
- );
-
- final Bucket<IN> existingBucket = activeBuckets.get(bucketId);
- if (existingBucket == null) {
- activeBuckets.put(bucketId, restoredBucket);
- } else {
- existingBucket.merge(restoredBucket);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} idx {} restored state for bucket {}", getClass().getSimpleName(),
- subtaskIndex, assembleBucketPath(bucketId));
- }
- }
+ @Override
+ Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
+ return new Buckets<>(
+ basePath,
+ bucketer,
+ bucketFactory,
+ new RowWisePartWriter.Factory<>(encoder),
+ rollingPolicy,
+ subtaskIndex);
}
}
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
+ /**
+ * A builder for configuring the sink for bulk-encoding formats, e.g. Parquet/ORC.
+ */
+ @PublicEvolving
+ public static class BulkFormatBuilder<IN, BucketID> extends StreamingFileSink.BucketsBuilder<IN, BucketID> {
- processingTimeService = ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService();
- long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
- processingTimeService.registerTimer(currentProcessingTime + bucketCheckInterval, this);
- this.bucketerContext = new BucketerContext();
- }
+ private static final long serialVersionUID = 1L;
- @Override
- public void onProcessingTime(long timestamp) throws Exception {
- final long currentTime = processingTimeService.getCurrentProcessingTime();
- for (Bucket<IN> bucket : activeBuckets.values()) {
- final PartFileInfo info = bucket.getInProgressPartInfo();
- if (info != null && rollingPolicy.shouldRoll(info, currentTime)) {
- bucket.closePartFile();
- }
- }
- processingTimeService.registerTimer(timestamp + bucketCheckInterval, this);
- }
+ private long bucketCheckInterval = 60L * 1000L;
- @Override
- public void invoke(IN value, Context context) throws Exception {
- final long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
- final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+ private final Path basePath;
+
+ private final BulkWriter.Factory<IN> writerFactory;
+
+ private Bucketer<IN, BucketID> bucketer;
- // setting the values in the bucketer context
- bucketerContext.update(context.timestamp(), context.currentWatermark(), currentProcessingTime);
-
- final String bucketId = bucketer.getBucketId(value, bucketerContext);
-
- Bucket<IN> bucket = activeBuckets.get(bucketId);
- if (bucket == null) {
- final Path bucketPath = assembleBucketPath(bucketId);
- bucket = bucketFactory.getNewBucket(
- fileSystemWriter,
- subtaskIndex,
- bucketId,
- bucketPath,
- initMaxPartCounter,
- encoder);
- activeBuckets.put(bucketId, bucket);
+ private BucketFactory<IN, BucketID> bucketFactory = new DefaultBucketFactory<>();
+
+ BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, Bucketer<IN, BucketID> bucketer) {
+ this.basePath = Preconditions.checkNotNull(basePath);
+ this.writerFactory = Preconditions.checkNotNull(writerFactory);
+ this.bucketer = Preconditions.checkNotNull(bucketer);
}
- final PartFileInfo info = bucket.getInProgressPartInfo();
- if (info == null || rollingPolicy.shouldRoll(info, currentProcessingTime)) {
- bucket.rollPartFile(currentProcessingTime);
+ public StreamingFileSink.BulkFormatBuilder<IN, BucketID> withBucketCheckInterval(long interval) {
+ this.bucketCheckInterval = interval;
+ return this;
}
- bucket.write(value, currentProcessingTime);
- // we update the counter here because as buckets become inactive and
- // get removed in the initializeState(), at the time we snapshot they
- // may not be there to take them into account during checkpointing.
- updateMaxPartCounter(bucket.getPartCounter());
- }
+ public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID> withBucketer(Bucketer<IN, ID> bucketer) {
+ @SuppressWarnings("unchecked")
+ StreamingFileSink.BulkFormatBuilder<IN, ID> reInterpreted = (StreamingFileSink.BulkFormatBuilder<IN, ID>) this;
+ reInterpreted.bucketer = Preconditions.checkNotNull(bucketer);
+ return reInterpreted;
+ }
- @Override
- public void close() throws Exception {
- if (activeBuckets != null) {
- activeBuckets.values().forEach(Bucket::dispose);
+ @VisibleForTesting
+ StreamingFileSink.BulkFormatBuilder<IN, BucketID> withBucketFactory(final BucketFactory<IN, BucketID> factory) {
+ this.bucketFactory = Preconditions.checkNotNull(factory);
+ return this;
}
- }
- private void initFileSystemWriter() throws IOException {
- if (fileSystemWriter == null) {
- fileSystemWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
- bucketStateSerializer = new BucketStateSerializer(
- fileSystemWriter.getResumeRecoverableSerializer(),
- fileSystemWriter.getCommitRecoverableSerializer()
- );
+ /** Creates the actual sink. */
+ public StreamingFileSink<IN> build() {
+ return new StreamingFileSink<>(this, bucketCheckInterval);
}
- }
- private void updateMaxPartCounter(long candidate) {
- maxPartCounterUsed = Math.max(maxPartCounterUsed, candidate);
+ @Override
+ Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
+ return new Buckets<>(
+ basePath,
+ bucketer,
+ bucketFactory,
+ new BulkPartWriter.Factory<>(writerFactory),
+ new OnCheckpointRollingPolicy<>(),
+ subtaskIndex);
+ }
}
- private Path assembleBucketPath(String bucketId) {
- return new Path(basePath, bucketId);
+ // --------------------------- Sink Methods -----------------------------
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+ this.buckets = bucketsBuilder.createBuckets(subtaskIndex);
+
+ final OperatorStateStore stateStore = context.getOperatorStateStore();
+ bucketStates = stateStore.getListState(BUCKET_STATE_DESC);
+ maxPartCountersState = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
+
+ if (context.isRestored()) {
+ buckets.initializeState(bucketStates, maxPartCountersState);
+ }
}
- /**
- * The {@link Bucketer.Context} exposed to the
- * {@link Bucketer#getBucketId(Object, Bucketer.Context)}
- * whenever a new incoming element arrives.
- */
- private static class BucketerContext implements Bucketer.Context {
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ buckets.publishUpToCheckpoint(checkpointId);
+ }
- @Nullable
- private Long elementTimestamp;
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ Preconditions.checkState(bucketStates != null && maxPartCountersState != null, "sink has not been initialized");
- private long currentWatermark;
+ bucketStates.clear();
+ maxPartCountersState.clear();
- private long currentProcessingTime;
+ buckets.snapshotState(
+ context.getCheckpointId(),
+ context.getCheckpointTimestamp(),
+ bucketStates,
+ maxPartCountersState);
+ }
- void update(@Nullable Long elementTimestamp, long currentWatermark, long currentProcessingTime) {
- this.elementTimestamp = elementTimestamp;
- this.currentWatermark = currentWatermark;
- this.currentProcessingTime = currentProcessingTime;
- }
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.processingTimeService = ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService();
+ long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
+ processingTimeService.registerTimer(currentProcessingTime + bucketCheckInterval, this);
+ }
- @Override
- public long currentProcessingTime() {
- return currentProcessingTime;
- }
+ @Override
+ public void onProcessingTime(long timestamp) throws Exception {
+ final long currentTime = processingTimeService.getCurrentProcessingTime();
+ buckets.onProcessingTime(currentTime);
+ processingTimeService.registerTimer(currentTime + bucketCheckInterval, this);
+ }
- @Override
- public long currentWatermark() {
- return currentWatermark;
- }
+ @Override
+ public void invoke(IN value, SinkFunction.Context context) throws Exception {
+ buckets.onElement(value, context);
+ }
- @Override
- @Nullable
- public Long timestamp() {
- return elementTimestamp;
- }
+ @Override
+ public void close() throws Exception {
+ buckets.close();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
index 5ffe152..d7b2013 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
@@ -19,22 +19,29 @@
package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
/**
* A {@link Bucketer} that does not perform any
* bucketing of files. All files are written to the base path.
*/
@PublicEvolving
-public class BasePathBucketer<T> implements Bucketer<T> {
+public class BasePathBucketer<T> implements Bucketer<T, String> {
private static final long serialVersionUID = -6033643155550226022L;
@Override
- public String getBucketId(T element, Context context) {
+ public String getBucketId(T element, Bucketer.Context context) {
return "";
}
@Override
+ public SimpleVersionedSerializer<String> getSerializer() {
+ // in the future this could be optimized as it is the empty string.
+ return SimpleVersionedStringSerializer.INSTANCE;
+ }
+
+ @Override
public String toString() {
return "BasePathBucketer";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
index 5c30927..503e361 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
@@ -19,6 +19,8 @@
package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import javax.annotation.Nullable;
@@ -33,9 +35,16 @@ import java.io.Serializable;
* <p>The {@code StreamingFileSink} can be writing to many buckets at a time, and it is responsible for managing
* a set of active buckets. Whenever a new element arrives it will ask the {@code Bucketer} for the bucket the
* element should fall in. The {@code Bucketer} can, for example, determine buckets based on system time.
+ *
+ * @param <IN> The type of input elements.
+ * @param <BucketID> The type of the object returned by the {@link #getBucketId(Object, Bucketer.Context)}. This has to have
+ * a correct {@link #hashCode()} and {@link #equals(Object)} method. In addition, the {@link Path}
+ * to the created bucket will be the result of the {@link #toString()} of this method, appended to
+ * the {@code basePath} specified in the
+ * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
*/
@PublicEvolving
-public interface Bucketer<T> extends Serializable {
+public interface Bucketer<IN, BucketID> extends Serializable {
/**
* Returns the identifier of the bucket the provided element should be put into.
@@ -48,13 +57,20 @@ public interface Bucketer<T> extends Serializable {
* and the {@code base path} provided during the initialization of the
* {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink sink}.
*/
- String getBucketId(T element, Context context);
+ BucketID getBucketId(IN element, Bucketer.Context context);
+
+ /**
+ * @return A {@link SimpleVersionedSerializer} capable of serializing/deserializing the elements
+ * of type {@code BucketID}. That is the type of the objects returned by the
+ * {@link #getBucketId(Object, Bucketer.Context)}.
+ */
+ SimpleVersionedSerializer<BucketID> getSerializer();
/**
* Context that the {@link Bucketer} can use for getting additional data about
* an input record.
*
- * <p>The context is only valid for the duration of a {@link Bucketer#getBucketId(Object, Context)} call.
+ * <p>The context is only valid for the duration of a {@link Bucketer#getBucketId(Object, Bucketer.Context)} call.
* Do not store the context and use afterwards!
*/
@PublicEvolving
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
index 515468c..eed0b79 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -50,9 +51,9 @@ import java.util.Date;
*
*/
@PublicEvolving
-public class DateTimeBucketer<T> implements Bucketer<T> {
+public class DateTimeBucketer<IN> implements Bucketer<IN, String> {
- private static final long serialVersionUID = 3284420879277893804L;
+ private static final long serialVersionUID = 1L;
private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
@@ -78,7 +79,7 @@ public class DateTimeBucketer<T> implements Bucketer<T> {
}
@Override
- public String getBucketId(T element, Context context) {
+ public String getBucketId(IN element, Bucketer.Context context) {
if (dateFormatter == null) {
dateFormatter = new SimpleDateFormat(formatString);
}
@@ -86,6 +87,11 @@ public class DateTimeBucketer<T> implements Bucketer<T> {
}
@Override
+ public SimpleVersionedSerializer<String> getSerializer() {
+ return SimpleVersionedStringSerializer.INSTANCE;
+ }
+
+ @Override
public String toString() {
return "DateTimeBucketer{" +
"formatString='" + formatString + '\'' +
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
new file mode 100644
index 0000000..d025af9
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/SimpleVersionedStringSerializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A {@link SimpleVersionedSerializer} implementation for Strings.
+ */
+public final class SimpleVersionedStringSerializer implements SimpleVersionedSerializer<String> {
+
+ private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+ public static final SimpleVersionedStringSerializer INSTANCE = new SimpleVersionedStringSerializer();
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(String value) {
+ final byte[] serialized = value.getBytes(StandardCharsets.UTF_8);
+ final byte[] targetBytes = new byte[Integer.BYTES + serialized.length];
+
+ final ByteBuffer bb = ByteBuffer.wrap(targetBytes).order(ByteOrder.LITTLE_ENDIAN);
+ bb.putInt(serialized.length);
+ bb.put(serialized);
+ return targetBytes;
+ }
+
+ @Override
+ public String deserialize(int version, byte[] serialized) throws IOException {
+ switch (version) {
+ case 1:
+ return deserializeV1(serialized);
+ default:
+ throw new IOException("Unrecognized version or corrupt state: " + version);
+ }
+ }
+
+ private static String deserializeV1(byte[] serialized) {
+ final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+ final byte[] targetStringBytes = new byte[bb.getInt()];
+ bb.get(targetStringBytes);
+ return new String(targetStringBytes, CHARSET);
+ }
+
+ /**
+ * Private constructor to prevent instantiation.
+ * Access the serializer through the {@link #INSTANCE}.
+ */
+ private SimpleVersionedStringSerializer() {}
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
new file mode 100644
index 0000000..a9ff617
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * The default implementation of the {@link RollingPolicy}.
+ *
+ * <p>This policy rolls a part file if:
+ * <ol>
+ * <li>there is no open part file,</li>
+ * <li>the current file has reached the maximum bucket size (by default 128MB),</li>
+ * <li>the current file is older than the roll over interval (by default 60 sec), or</li>
+ * <li>the current file has not been written to for more than the allowed inactivityTime (by default 60 sec).</li>
+ * </ol>
+ */
+@PublicEvolving
+public final class DefaultRollingPolicy<BucketID> implements RollingPolicy<BucketID> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
+
+ private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
+
+ private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L;
+
+ private final long partSize;
+
+ private final long rolloverInterval;
+
+ private final long inactivityInterval;
+
+ /**
+ * Private constructor to avoid direct instantiation.
+ */
+ private DefaultRollingPolicy(long partSize, long rolloverInterval, long inactivityInterval) {
+ Preconditions.checkArgument(partSize > 0L);
+ Preconditions.checkArgument(rolloverInterval > 0L);
+ Preconditions.checkArgument(inactivityInterval > 0L);
+
+ this.partSize = partSize;
+ this.rolloverInterval = rolloverInterval;
+ this.inactivityInterval = inactivityInterval;
+ }
+
+ @Override
+ public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) {
+ return false;
+ }
+
+ @Override
+ public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) throws IOException {
+ if (partFileState == null) {
+ // this means that there is no currently open part file.
+ return true;
+ }
+
+ return partFileState.getSize() > partSize;
+ }
+
+ @Override
+ public boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> partFileState, final long currentTime) {
+ if (partFileState == null) {
+ // this means that there is no currently open part file.
+ return true;
+ }
+
+ if (currentTime - partFileState.getCreationTime() > rolloverInterval) {
+ return true;
+ }
+
+ return currentTime - partFileState.getLastUpdateTime() > inactivityInterval;
+ }
+
+ /**
+ * Initiates the instantiation of a {@code DefaultRollingPolicy}.
+ * To finalize it and have the actual policy, call {@code .create()}.
+ */
+ public static DefaultRollingPolicy.PolicyBuilder create() {
+ return new DefaultRollingPolicy.PolicyBuilder();
+ }
+
+ /**
+ * A helper class that holds the configuration properties for the {@link DefaultRollingPolicy}.
+ */
+ @PublicEvolving
+ public static final class PolicyBuilder {
+
+ private long partSize = DEFAULT_MAX_PART_SIZE;
+
+ private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL;
+
+ private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL;
+
+ private PolicyBuilder() {}
+
+ /**
+ * Sets the part size above which a part file will have to roll.
+ * @param size the allowed part size.
+ */
+ public DefaultRollingPolicy.PolicyBuilder withMaxPartSize(long size) {
+ Preconditions.checkState(size > 0L);
+ this.partSize = size;
+ return this;
+ }
+
+ /**
+ * Sets the interval of allowed inactivity after which a part file will have to roll.
+ * @param interval the allowed inactivity interval.
+ */
+ public DefaultRollingPolicy.PolicyBuilder withInactivityInterval(long interval) {
+ Preconditions.checkState(interval > 0L);
+ this.inactivityInterval = interval;
+ return this;
+ }
+
+ /**
+ * Sets the max time a part file can stay open before having to roll.
+ * @param interval the desired rollover interval.
+ */
+ public DefaultRollingPolicy.PolicyBuilder withRolloverInterval(long interval) {
+ Preconditions.checkState(interval > 0L);
+ this.rolloverInterval = interval;
+ return this;
+ }
+
+ /**
+ * Creates the actual policy.
+ */
+ public <BucketID> DefaultRollingPolicy<BucketID> build() {
+ return new DefaultRollingPolicy<>(partSize, rolloverInterval, inactivityInterval);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
new file mode 100644
index 0000000..4361941
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies;
+
+import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+/**
+ * A {@link RollingPolicy} which rolls on every checkpoint.
+ */
+public class OnCheckpointRollingPolicy<BucketID> implements RollingPolicy<BucketID> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) {
+ return true;
+ }
+
+ @Override
+ public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) {
+ return false;
+ }
+
+ @Override
+ public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
index 353ac00..3d5be63 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
import org.junit.Assert;
import org.junit.ClassRule;
@@ -60,20 +61,21 @@ public class BucketStateSerializerTest {
final Path testBucket = new Path(testFolder.getPath(), "test");
- final BucketState bucketState = new BucketState(
+ final BucketState<String> bucketState = new BucketState<>(
"test", testBucket, Long.MAX_VALUE, null, new HashMap<>());
- final SimpleVersionedSerializer<BucketState> serializer =
- new BucketStateSerializer(
+ final SimpleVersionedSerializer<BucketState<String>> serializer =
+ new BucketStateSerializer<>(
writer.getResumeRecoverableSerializer(),
- writer.getCommitRecoverableSerializer()
+ writer.getCommitRecoverableSerializer(),
+ SimpleVersionedStringSerializer.INSTANCE
);
byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
- final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+ final BucketState<String> recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(testBucket, recoveredState.getBucketPath());
- Assert.assertNull(recoveredState.getCurrentInProgress());
+ Assert.assertNull(recoveredState.getInProgress());
Assert.assertTrue(recoveredState.getPendingPerCheckpoint().isEmpty());
}
@@ -90,13 +92,14 @@ public class BucketStateSerializerTest {
final RecoverableWriter.ResumeRecoverable current = stream.persist();
- final BucketState bucketState = new BucketState(
+ final BucketState<String> bucketState = new BucketState<>(
"test", testBucket, Long.MAX_VALUE, current, new HashMap<>());
- final SimpleVersionedSerializer<BucketState> serializer =
- new BucketStateSerializer(
+ final SimpleVersionedSerializer<BucketState<String>> serializer =
+ new BucketStateSerializer<>(
writer.getResumeRecoverableSerializer(),
- writer.getCommitRecoverableSerializer()
+ writer.getCommitRecoverableSerializer(),
+ SimpleVersionedStringSerializer.INSTANCE
);
final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
@@ -104,7 +107,7 @@ public class BucketStateSerializerTest {
// to simulate that everything is over for file.
stream.close();
- final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+ final BucketState<String> recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(testBucket, recoveredState.getBucketPath());
@@ -147,18 +150,19 @@ public class BucketStateSerializerTest {
final RecoverableWriter.ResumeRecoverable current = stream.persist();
- final BucketState bucketState = new BucketState(
+ final BucketState<String> bucketState = new BucketState<>(
"test-2", bucketPath, Long.MAX_VALUE, current, commitRecoverables);
- final SimpleVersionedSerializer<BucketState> serializer =
- new BucketStateSerializer(
+ final SimpleVersionedSerializer<BucketState<String>> serializer =
+ new BucketStateSerializer<>(
writer.getResumeRecoverableSerializer(),
- writer.getCommitRecoverableSerializer()
+ writer.getCommitRecoverableSerializer(),
+ SimpleVersionedStringSerializer.INSTANCE
);
stream.close();
byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
- final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+ final BucketState<String> recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
@@ -220,20 +224,21 @@ public class BucketStateSerializerTest {
final RecoverableWriter.ResumeRecoverable current = null;
- final BucketState bucketState = new BucketState(
+ final BucketState<String> bucketState = new BucketState<>(
"", bucketPath, Long.MAX_VALUE, current, commitRecoverables);
- final SimpleVersionedSerializer<BucketState> serializer = new BucketStateSerializer(
+ final SimpleVersionedSerializer<BucketState<String>> serializer = new BucketStateSerializer<>(
writer.getResumeRecoverableSerializer(),
- writer.getCommitRecoverableSerializer()
+ writer.getCommitRecoverableSerializer(),
+ SimpleVersionedStringSerializer.INSTANCE
);
byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, bucketState);
- final BucketState recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+ final BucketState<String> recoveredState = SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
Assert.assertEquals(bucketPath, recoveredState.getBucketPath());
- Assert.assertNull(recoveredState.getCurrentInProgress());
+ Assert.assertNull(recoveredState.getInProgress());
final Map<Long, List<RecoverableWriter.CommitRecoverable>> recoveredRecoverables = recoveredState.getPendingPerCheckpoint();
Assert.assertEquals(5L, recoveredRecoverables.size());
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
new file mode 100644
index 0000000..042ba4e
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
+
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+/**
+ * Tests for {@link Buckets}.
+ */
+public class BucketsTest {
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+ @Test
+ public void testContextPassingNormalExecution() throws Exception {
+ testCorrectPassingOfContext(1L, 2L, 3L);
+ }
+
+ @Test
+ public void testContextPassingNullTimestamp() throws Exception {
+ testCorrectPassingOfContext(null, 2L, 3L);
+ }
+
+ private void testCorrectPassingOfContext(Long timestamp, long watermark, long processingTime) throws Exception {
+ final File outDir = TEMP_FOLDER.newFolder();
+
+ final Long expectedTimestamp = timestamp;
+ final long expectedWatermark = watermark;
+ final long expectedProcessingTime = processingTime;
+
+ final Buckets<String, String> buckets = StreamingFileSink
+ .<String>forRowFormat(new Path(outDir.toURI()), new SimpleStringEncoder<>())
+ .withBucketer(new VarifyingBucketer(expectedTimestamp, expectedWatermark, expectedProcessingTime))
+ .createBuckets(2);
+
+ buckets.onElement("TEST", new SinkFunction.Context() {
+ @Override
+ public long currentProcessingTime() {
+ return expectedProcessingTime;
+ }
+
+ @Override
+ public long currentWatermark() {
+ return expectedWatermark;
+ }
+
+ @Override
+ public Long timestamp() {
+ return expectedTimestamp;
+ }
+ });
+ }
+
+ private static class VarifyingBucketer implements Bucketer<String, String> {
+
+ private static final long serialVersionUID = 7729086510972377578L;
+
+ private final Long expectedTimestamp;
+ private final long expectedWatermark;
+ private final long expectedProcessingTime;
+
+ VarifyingBucketer(
+ final Long expectedTimestamp,
+ final long expectedWatermark,
+ final long expectedProcessingTime
+ ) {
+ this.expectedTimestamp = expectedTimestamp;
+ this.expectedWatermark = expectedWatermark;
+ this.expectedProcessingTime = expectedProcessingTime;
+ }
+
+ @Override
+ public String getBucketId(String element, Context context) {
+ final Long elementTimestamp = context.timestamp();
+ final long watermark = context.currentWatermark();
+ final long processingTime = context.currentProcessingTime();
+
+ Assert.assertEquals(expectedTimestamp, elementTimestamp);
+ Assert.assertEquals(expectedProcessingTime, processingTime);
+ Assert.assertEquals(expectedWatermark, watermark);
+
+ return element;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<String> getSerializer() {
+ return SimpleVersionedStringSerializer.INSTANCE;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
new file mode 100644
index 0000000..7b6b82c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * Tests for the {@link StreamingFileSink} with {@link BulkWriter}.
+ */
+public class BulkWriterTest extends TestLogger {
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+ @Test
+ public void testCustomBulkWriter() throws Exception {
+ final File outDir = TEMP_FOLDER.newFolder();
+
+ // we set the max bucket size to small so that we can know when it rolls
+ try (
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
+ TestUtils.createTestSinkWithBulkEncoder(
+ outDir,
+ 1,
+ 0,
+ 10L,
+ new TestUtils.TupleToStringBucketer(),
+ new TestBulkWriterFactory(),
+ new DefaultBucketFactory<>())
+ ) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ // this creates a new bucket "test1" and part-0-0
+ testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
+ TestUtils.checkLocalFs(outDir, 1, 0);
+
+ // we take a checkpoint so we roll.
+ testHarness.snapshot(1L, 1L);
+
+ // these will close part-0-0 and open part-0-1
+ testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
+ testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L));
+
+ // we take a checkpoint so we roll again.
+ testHarness.snapshot(2L, 2L);
+
+ TestUtils.checkLocalFs(outDir, 2, 0);
+
+ Map<File, String> contents = TestUtils.getFileContentByPath(outDir);
+ int fileCounter = 0;
+ for (Map.Entry<File, String> fileContents : contents.entrySet()) {
+ if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
+ fileCounter++;
+ Assert.assertEquals("test1@1\n", fileContents.getValue());
+ } else if (fileContents.getKey().getName().contains(".part-0-1.inprogress")) {
+ fileCounter++;
+ Assert.assertEquals("test1@2\ntest1@3\n", fileContents.getValue());
+ }
+ }
+ Assert.assertEquals(2L, fileCounter);
+
+ // we acknowledge the latest checkpoint, so everything should be published.
+ testHarness.notifyOfCompletedCheckpoint(2L);
+ TestUtils.checkLocalFs(outDir, 0, 2);
+ }
+ }
+
+ /**
+ * A {@link BulkWriter} used for the tests.
+ */
+ private static class TestBulkWriter implements BulkWriter<Tuple2<String, Integer>> {
+
+ private static final Charset CHARSET = StandardCharsets.UTF_8;
+
+ private final FSDataOutputStream stream;
+
+ TestBulkWriter(final FSDataOutputStream stream) {
+ this.stream = Preconditions.checkNotNull(stream);
+ }
+
+ @Override
+ public void addElement(Tuple2<String, Integer> element) throws IOException {
+ stream.write((element.f0 + '@' + element.f1 + '\n').getBytes(CHARSET));
+ }
+
+ @Override
+ public void flush() throws IOException {
+ stream.flush();
+ }
+
+ @Override
+ public void finish() throws IOException {
+ flush();
+ }
+ }
+
+ /**
+ * A {@link BulkWriter.Factory} used for the tests.
+ */
+ private static class TestBulkWriterFactory implements BulkWriter.Factory<Tuple2<String, Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public BulkWriter<Tuple2<String, Integer>> create(FSDataOutputStream out) {
+ return new TestBulkWriter(out);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index b6f73ac..6e942e9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -18,20 +18,17 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
-import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
-import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rolling.policies.DefaultRollingPolicy;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.TestLogger;
-import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
@@ -39,9 +36,6 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
/**
@@ -56,12 +50,13 @@ public class LocalStreamingFileSinkTest extends TestLogger {
public void testClosingWithoutInput() throws Exception {
final File outDir = TEMP_FOLDER.newFolder();
- OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
- createRescalingTestSink(outDir, 1, 0, 100L, 124L);
- testHarness.setup();
- testHarness.open();
-
- testHarness.close();
+ try (
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
+ TestUtils.createRescalingTestSink(outDir, 1, 0, 100L, 124L);
+ ) {
+ testHarness.setup();
+ testHarness.open();
+ }
}
@Test
@@ -70,7 +65,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
OperatorSubtaskState snapshot;
// we set the max bucket size to small so that we can know when it rolls
- try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink(
+ try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createRescalingTestSink(
outDir, 1, 0, 100L, 10L)) {
testHarness.setup();
@@ -78,7 +73,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
// this creates a new bucket "test1" and part-0-0
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
- checkLocalFs(outDir, 1, 0);
+ TestUtils.checkLocalFs(outDir, 1, 0);
// we take a checkpoint so that we keep the in-progress file offset.
snapshot = testHarness.snapshot(1L, 1L);
@@ -87,9 +82,9 @@ public class LocalStreamingFileSinkTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L));
- checkLocalFs(outDir, 2, 0);
+ TestUtils.checkLocalFs(outDir, 2, 0);
- Map<File, String> contents = getFileContentByPath(outDir);
+ Map<File, String> contents = TestUtils.getFileContentByPath(outDir);
int fileCounter = 0;
for (Map.Entry<File, String> fileContents : contents.entrySet()) {
if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
@@ -103,7 +98,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
Assert.assertEquals(2L, fileCounter);
}
- try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink(
+ try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createRescalingTestSink(
outDir, 1, 0, 100L, 10L)) {
testHarness.setup();
@@ -111,11 +106,11 @@ public class LocalStreamingFileSinkTest extends TestLogger {
testHarness.open();
// the in-progress is the not cleaned up one and the pending is truncated and finalized
- checkLocalFs(outDir, 2, 0);
+ TestUtils.checkLocalFs(outDir, 2, 0);
// now we go back to the first checkpoint so it should truncate part-0-0 and restart part-0-1
int fileCounter = 0;
- for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+ for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
// truncated
fileCounter++;
@@ -132,7 +127,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L));
fileCounter = 0;
- for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+ for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
fileCounter++;
Assert.assertEquals("test1@1\ntest1@4\n", fileContents.getValue());
@@ -145,16 +140,16 @@ public class LocalStreamingFileSinkTest extends TestLogger {
Assert.assertEquals(2L, fileCounter);
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L));
- checkLocalFs(outDir, 3, 0); // the previous part-0-1 in progress is simply ignored (random extension)
+ TestUtils.checkLocalFs(outDir, 3, 0); // the previous part-0-1 in progress is simply ignored (random extension)
testHarness.snapshot(2L, 2L);
// this will close the new part-0-1
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 6), 6L));
- checkLocalFs(outDir, 3, 0);
+ TestUtils.checkLocalFs(outDir, 3, 0);
fileCounter = 0;
- for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+ for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
if (fileContents.getKey().getName().contains(".part-0-0.inprogress")) {
fileCounter++;
Assert.assertEquals("test1@1\ntest1@4\n", fileContents.getValue());
@@ -169,10 +164,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
// this will publish part-0-0
testHarness.notifyOfCompletedCheckpoint(2L);
- checkLocalFs(outDir, 2, 1);
+ TestUtils.checkLocalFs(outDir, 2, 1);
fileCounter = 0;
- for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+ for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
if (fileContents.getKey().getName().equals("part-0-0")) {
fileCounter++;
Assert.assertEquals("test1@1\ntest1@4\n", fileContents.getValue());
@@ -192,7 +187,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
final File outDir = TEMP_FOLDER.newFolder();
// we set the max bucket size to small so that we can know when it rolls
- try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink(
+ try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createRescalingTestSink(
outDir, 1, 0, 100L, 10L)) {
testHarness.setup();
@@ -203,11 +198,11 @@ public class LocalStreamingFileSinkTest extends TestLogger {
// these 2 create a new bucket "test1", with a .part-0-0.inprogress and also fill it
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
- checkLocalFs(outDir, 1, 0);
+ TestUtils.checkLocalFs(outDir, 1, 0);
// this will open .part-0-1.inprogress
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L));
- checkLocalFs(outDir, 2, 0);
+ TestUtils.checkLocalFs(outDir, 2, 0);
// we take a checkpoint so that we keep the in-progress file offset.
testHarness.snapshot(1L, 1L);
@@ -218,13 +213,13 @@ public class LocalStreamingFileSinkTest extends TestLogger {
// and open and fill .part-0-2.inprogress
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L));
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 6), 6L));
- checkLocalFs(outDir, 3, 0); // nothing committed yet
+ TestUtils.checkLocalFs(outDir, 3, 0); // nothing committed yet
testHarness.snapshot(2L, 2L);
// open .part-0-3.inprogress
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 7), 7L));
- checkLocalFs(outDir, 4, 0);
+ TestUtils.checkLocalFs(outDir, 4, 0);
// this will close the part file (time)
testHarness.setProcessingTime(101L);
@@ -232,10 +227,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
testHarness.snapshot(3L, 3L);
testHarness.notifyOfCompletedCheckpoint(1L); // the pending for checkpoint 1 are committed
- checkLocalFs(outDir, 3, 1);
+ TestUtils.checkLocalFs(outDir, 3, 1);
int fileCounter = 0;
- for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+ for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
if (fileContents.getKey().getName().equals("part-0-0")) {
fileCounter++;
Assert.assertEquals("test1@1\ntest1@2\n", fileContents.getValue());
@@ -253,10 +248,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
Assert.assertEquals(4L, fileCounter);
testHarness.notifyOfCompletedCheckpoint(3L); // all the pending for checkpoint 2 and 3 are committed
- checkLocalFs(outDir, 0, 4);
+ TestUtils.checkLocalFs(outDir, 0, 4);
fileCounter = 0;
- for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+ for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
if (fileContents.getKey().getName().equals("part-0-0")) {
fileCounter++;
Assert.assertEquals("test1@1\ntest1@2\n", fileContents.getValue());
@@ -280,7 +275,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
final File outDir = TEMP_FOLDER.newFolder();
// we set a big bucket size so that it does not close by size, but by timers.
- try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink(
+ try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createRescalingTestSink(
outDir, 1, 0, 100L, 124L)) {
testHarness.setup();
@@ -290,10 +285,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L));
- checkLocalFs(outDir, 2, 0);
+ TestUtils.checkLocalFs(outDir, 2, 0);
int bucketCounter = 0;
- for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+ for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
if (fileContents.getKey().getParentFile().getName().equals("test1")) {
bucketCounter++;
} else if (fileContents.getKey().getParentFile().getName().equals("test2")) {
@@ -303,10 +298,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
Assert.assertEquals(2L, bucketCounter); // verifies that we have 2 buckets, "test1" and "test2"
testHarness.setProcessingTime(101L); // put them in pending
- checkLocalFs(outDir, 2, 0);
+ TestUtils.checkLocalFs(outDir, 2, 0);
testHarness.snapshot(0L, 0L); // put them in pending for 0
- checkLocalFs(outDir, 2, 0);
+ TestUtils.checkLocalFs(outDir, 2, 0);
// create another 2 buckets with 1 inprogress file each
testHarness.processElement(new StreamRecord<>(Tuple2.of("test3", 1), 1L));
@@ -315,13 +310,13 @@ public class LocalStreamingFileSinkTest extends TestLogger {
testHarness.setProcessingTime(202L); // put them in pending
testHarness.snapshot(1L, 0L); // put them in pending for 1
- checkLocalFs(outDir, 4, 0);
+ TestUtils.checkLocalFs(outDir, 4, 0);
testHarness.notifyOfCompletedCheckpoint(0L); // put the pending for 0 to the "committed" state
- checkLocalFs(outDir, 2, 2);
+ TestUtils.checkLocalFs(outDir, 2, 2);
bucketCounter = 0;
- for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+ for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
if (fileContents.getKey().getParentFile().getName().equals("test1")) {
bucketCounter++;
Assert.assertEquals("part-0-0", fileContents.getKey().getName());
@@ -339,10 +334,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
Assert.assertEquals(4L, bucketCounter);
testHarness.notifyOfCompletedCheckpoint(1L); // put the pending for 1 to the "committed" state
- checkLocalFs(outDir, 0, 4);
+ TestUtils.checkLocalFs(outDir, 0, 4);
bucketCounter = 0;
- for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+ for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
if (fileContents.getKey().getParentFile().getName().equals("test1")) {
bucketCounter++;
Assert.assertEquals("test1@1\n", fileContents.getValue());
@@ -367,9 +362,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
public void testClosingOnSnapshot() throws Exception {
final File outDir = TEMP_FOLDER.newFolder();
- try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
- createRescalingTestSink(outDir, 1, 0, 100L, 2L)) {
-
+ try (
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness =
+ TestUtils.createRescalingTestSink(outDir, 1, 0, 100L, 2L)
+ ) {
testHarness.setup();
testHarness.open();
@@ -377,29 +373,29 @@ public class LocalStreamingFileSinkTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L));
- checkLocalFs(outDir, 2, 0);
+ TestUtils.checkLocalFs(outDir, 2, 0);
// this is to check the inactivity threshold
testHarness.setProcessingTime(101L);
- checkLocalFs(outDir, 2, 0);
+ TestUtils.checkLocalFs(outDir, 2, 0);
testHarness.processElement(new StreamRecord<>(Tuple2.of("test3", 1), 1L));
- checkLocalFs(outDir, 3, 0);
+ TestUtils.checkLocalFs(outDir, 3, 0);
testHarness.snapshot(0L, 1L);
- checkLocalFs(outDir, 3, 0);
+ TestUtils.checkLocalFs(outDir, 3, 0);
testHarness.notifyOfCompletedCheckpoint(0L);
- checkLocalFs(outDir, 0, 3);
+ TestUtils.checkLocalFs(outDir, 0, 3);
testHarness.snapshot(1L, 0L);
testHarness.processElement(new StreamRecord<>(Tuple2.of("test4", 10), 10L));
- checkLocalFs(outDir, 1, 3);
+ TestUtils.checkLocalFs(outDir, 1, 3);
}
// at close it is not moved to final.
- checkLocalFs(outDir, 1, 3);
+ TestUtils.checkLocalFs(outDir, 1, 3);
}
@Test
@@ -408,11 +404,11 @@ public class LocalStreamingFileSinkTest extends TestLogger {
OperatorSubtaskState mergedSnapshot;
- // we set small file size so that the part file rolls.
+ // we set small file size so that the part file rolls on every element.
try (
- OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = createRescalingTestSink(
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = TestUtils.createRescalingTestSink(
outDir, 2, 0, 100L, 10L);
- OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = createRescalingTestSink(
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = TestUtils.createRescalingTestSink(
outDir, 2, 1, 100L, 10L)
) {
testHarness1.setup();
@@ -422,16 +418,16 @@ public class LocalStreamingFileSinkTest extends TestLogger {
testHarness2.open();
testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L));
- checkLocalFs(outDir, 1, 0);
+ TestUtils.checkLocalFs(outDir, 1, 0);
testHarness2.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
testHarness2.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L));
// all the files are in-progress
- checkLocalFs(outDir, 3, 0);
+ TestUtils.checkLocalFs(outDir, 3, 0);
int counter = 0;
- for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+ for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
final String parentFilename = fileContents.getKey().getParentFile().getName();
final String inProgressFilename = fileContents.getKey().getName();
@@ -456,7 +452,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
}
try (
- OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = createRescalingTestSink(
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createRescalingTestSink(
outDir, 1, 0, 100L, 10L)
) {
testHarness.setup();
@@ -464,13 +460,13 @@ public class LocalStreamingFileSinkTest extends TestLogger {
testHarness.open();
// still everything in-progress but the in-progress for prev task 1 should be put in pending now
- checkLocalFs(outDir, 3, 0);
+ TestUtils.checkLocalFs(outDir, 3, 0);
testHarness.snapshot(2L, 2L);
testHarness.notifyOfCompletedCheckpoint(2L);
int counter = 0;
- for (Map.Entry<File, String> fileContents : getFileContentByPath(outDir).entrySet()) {
+ for (Map.Entry<File, String> fileContents : TestUtils.getFileContentByPath(outDir).entrySet()) {
final String parentFilename = fileContents.getKey().getParentFile().getName();
final String filename = fileContents.getKey().getName();
@@ -498,11 +494,18 @@ public class LocalStreamingFileSinkTest extends TestLogger {
final TestBucketFactory first = new TestBucketFactory();
final TestBucketFactory second = new TestBucketFactory();
+ final RollingPolicy<String> rollingPolicy = DefaultRollingPolicy
+ .create()
+ .withMaxPartSize(2L)
+ .withRolloverInterval(100L)
+ .withInactivityInterval(100L)
+ .build();
+
try (
- OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = createCustomRescalingTestSink(
- outDir, 2, 0, 100L, 2L, first, new SimpleStringEncoder<>());
- OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = createCustomRescalingTestSink(
- outDir, 2, 1, 100L, 2L, second, new SimpleStringEncoder<>())
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = TestUtils.createCustomRescalingTestSink(
+ outDir, 2, 0, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, first);
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = TestUtils.createCustomRescalingTestSink(
+ outDir, 2, 1, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, second)
) {
testHarness1.setup();
testHarness1.open();
@@ -514,7 +517,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L));
testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L));
testHarness1.processElement(new StreamRecord<>(Tuple2.of("test1", 0), 0L));
- checkLocalFs(outDir, 3, 0);
+ TestUtils.checkLocalFs(outDir, 3, 0);
// intentionally we snapshot them in the reverse order so that the states are shuffled
mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
@@ -527,10 +530,10 @@ public class LocalStreamingFileSinkTest extends TestLogger {
final TestBucketFactory secondRecovered = new TestBucketFactory();
try (
- OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = createCustomRescalingTestSink(
- outDir, 2, 0, 100L, 2L, firstRecovered, new SimpleStringEncoder<>());
- OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = createCustomRescalingTestSink(
- outDir, 2, 1, 100L, 2L, secondRecovered, new SimpleStringEncoder<>())
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness1 = TestUtils.createCustomRescalingTestSink(
+ outDir, 2, 0, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, firstRecovered);
+ OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness2 = TestUtils.createCustomRescalingTestSink(
+ outDir, 2, 1, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder<>(), rollingPolicy, secondRecovered)
) {
testHarness1.setup();
testHarness1.initializeState(mergedSnapshot);
@@ -540,7 +543,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
testHarness1.processElement(new StreamRecord<>(Tuple2.of("test4", 0), 0L));
Assert.assertEquals(3L, firstRecovered.getInitialCounter());
- checkLocalFs(outDir, 1, 3);
+ TestUtils.checkLocalFs(outDir, 1, 3);
testHarness2.setup();
testHarness2.initializeState(mergedSnapshot);
@@ -550,78 +553,26 @@ public class LocalStreamingFileSinkTest extends TestLogger {
testHarness2.processElement(new StreamRecord<>(Tuple2.of("test2", 0), 0L));
Assert.assertEquals(3L, secondRecovered.getInitialCounter());
- checkLocalFs(outDir, 2, 3);
+ TestUtils.checkLocalFs(outDir, 2, 3);
}
}
////////////////////// Helper Methods //////////////////////
- private OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createRescalingTestSink(
- File outDir,
- int totalParallelism,
- int taskIdx,
- long inactivityInterval,
- long partMaxSize) throws Exception {
-
- return createCustomRescalingTestSink(
- outDir,
- totalParallelism,
- taskIdx,
- inactivityInterval,
- partMaxSize,
- new DefaultBucketFactory<>(),
- (Encoder<Tuple2<String, Integer>>) (element, stream) -> {
- stream.write((element.f0 + '@' + element.f1).getBytes(StandardCharsets.UTF_8));
- stream.write('\n');
- });
- }
-
- private OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createCustomRescalingTestSink(
- File outDir,
- int totalParallelism,
- int taskIdx,
- long inactivityInterval,
- long partMaxSize,
- BucketFactory<Tuple2<String, Integer>> factory,
- Encoder<Tuple2<String, Integer>> writer) throws Exception {
-
- StreamingFileSink<Tuple2<String, Integer>> sink = new StreamingFileSink<>(new Path(outDir.toURI()), factory)
- .setBucketer(new Bucketer<Tuple2<String, Integer>>() {
-
- private static final long serialVersionUID = -3086487303018372007L;
-
- @Override
- public String getBucketId(Tuple2<String, Integer> element, Context context) {
- return element.f0;
- }
- })
- .setEncoder(writer)
- .setRollingPolicy(
- DefaultRollingPolicy
- .create()
- .withMaxPartSize(partMaxSize)
- .withRolloverInterval(inactivityInterval)
- .withInactivityInterval(inactivityInterval)
- .build())
- .setBucketCheckInterval(10L);
-
- return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx);
- }
-
- static class TestBucketFactory extends DefaultBucketFactory<Tuple2<String, Integer>> {
+ static class TestBucketFactory extends DefaultBucketFactory<Tuple2<String, Integer>, String> {
private static final long serialVersionUID = 2794824980604027930L;
private long initialCounter = -1L;
@Override
- public Bucket<Tuple2<String, Integer>> getNewBucket(
- RecoverableWriter fsWriter,
- int subtaskIndex,
- String bucketId,
- Path bucketPath,
- long initialPartCounter,
- Encoder<Tuple2<String, Integer>> writer) throws IOException {
+ public Bucket<Tuple2<String, Integer>, String> getNewBucket(
+ final RecoverableWriter fsWriter,
+ final int subtaskIndex,
+ final String bucketId,
+ final Path bucketPath,
+ final long initialPartCounter,
+ final PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String> partFileWriterFactory) {
this.initialCounter = initialPartCounter;
@@ -631,16 +582,16 @@ public class LocalStreamingFileSinkTest extends TestLogger {
bucketId,
bucketPath,
initialPartCounter,
- writer);
+ partFileWriterFactory);
}
@Override
- public Bucket<Tuple2<String, Integer>> restoreBucket(
- RecoverableWriter fsWriter,
- int subtaskIndex,
- long initialPartCounter,
- Encoder<Tuple2<String, Integer>> writer,
- BucketState bucketState) throws IOException {
+ public Bucket<Tuple2<String, Integer>, String> restoreBucket(
+ final RecoverableWriter fsWriter,
+ final int subtaskIndex,
+ final long initialPartCounter,
+ final PartFileWriter.PartFileFactory<Tuple2<String, Integer>, String> partFileWriterFactory,
+ final BucketState<String> bucketState) throws IOException {
this.initialCounter = initialPartCounter;
@@ -648,7 +599,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
fsWriter,
subtaskIndex,
initialPartCounter,
- writer,
+ partFileWriterFactory,
bucketState);
}
@@ -656,34 +607,4 @@ public class LocalStreamingFileSinkTest extends TestLogger {
return initialCounter;
}
}
-
- private static void checkLocalFs(File outDir, int expectedInProgress, int expectedCompleted) {
- int inProgress = 0;
- int finished = 0;
-
- for (File file: FileUtils.listFiles(outDir, null, true)) {
- if (file.getAbsolutePath().endsWith("crc")) {
- continue;
- }
-
- if (file.toPath().getFileName().toString().startsWith(".")) {
- inProgress++;
- } else {
- finished++;
- }
- }
-
- Assert.assertEquals(expectedInProgress, inProgress);
- Assert.assertEquals(expectedCompleted, finished);
- }
-
- private static Map<File, String> getFileContentByPath(File directory) throws IOException {
- Map<File, String> contents = new HashMap<>(4);
-
- final Collection<File> filesInBucket = FileUtils.listFiles(directory, null, true);
- for (File file : filesInBucket) {
- contents.put(file, FileUtils.readFileToString(file));
- }
- return contents;
- }
}