You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/20 17:05:19 UTC
[3/5] flink git commit: [FLINK-9903] [DataStream API] Refactor
StreamingFileSink / add bulk encoders
[FLINK-9903] [DataStream API] Refactor StreamingFileSink / add bulk encoders
* Add supports for bulk encoders.
* Expose more options in the rolling policy and
* Allows to return any object as bucket id from the bucketer.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b56c75ca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b56c75ca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b56c75ca
Branch: refs/heads/master
Commit: b56c75ca375049b1d2c80d2d0945ae1ae04eb39e
Parents: d309e61
Author: kkloudas <kk...@gmail.com>
Authored: Tue Jul 17 11:52:02 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jul 20 16:12:29 2018 +0200
----------------------------------------------------------------------
.../api/common/serialization/BulkWriter.java | 95 ++++
.../core/fs/SafetyNetWrapperFileSystem.java | 5 +
.../api/functions/sink/filesystem/Bucket.java | 48 +-
.../sink/filesystem/BucketFactory.java | 29 +-
.../functions/sink/filesystem/BucketState.java | 12 +-
.../sink/filesystem/BucketStateSerializer.java | 39 +-
.../api/functions/sink/filesystem/Buckets.java | 339 ++++++++++++++
.../sink/filesystem/BulkPartWriter.java | 110 +++++
.../sink/filesystem/DefaultBucketFactory.java | 35 +-
.../sink/filesystem/DefaultRollingPolicy.java | 142 ------
.../sink/filesystem/PartFileHandler.java | 122 -----
.../functions/sink/filesystem/PartFileInfo.java | 4 +-
.../sink/filesystem/PartFileWriter.java | 141 ++++++
.../sink/filesystem/RollingPolicy.java | 20 +-
.../sink/filesystem/RowWisePartWriter.java | 96 ++++
.../sink/filesystem/StreamingFileSink.java | 457 ++++++++-----------
.../filesystem/bucketers/BasePathBucketer.java | 11 +-
.../sink/filesystem/bucketers/Bucketer.java | 22 +-
.../filesystem/bucketers/DateTimeBucketer.java | 12 +-
.../SimpleVersionedStringSerializer.java | 77 ++++
.../rolling/policies/DefaultRollingPolicy.java | 157 +++++++
.../policies/OnCheckpointRollingPolicy.java | 45 ++
.../filesystem/BucketStateSerializerTest.java | 47 +-
.../functions/sink/filesystem/BucketsTest.java | 119 +++++
.../sink/filesystem/BulkWriterTest.java | 144 ++++++
.../filesystem/LocalStreamingFileSinkTest.java | 267 ++++-------
.../sink/filesystem/RollingPolicyTest.java | 225 +++++++++
.../functions/sink/filesystem/TestUtils.java | 164 +++++++
28 files changed, 2177 insertions(+), 807 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java
new file mode 100644
index 0000000..44f5fbe
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/BulkWriter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * An encoder that encodes data in a bulk fashion, encoding many records together at a time.
+ *
+ * <p>Examples for bulk encoding are most compressed formats, including formats like
+ * Parquet and ORC which encode batches of records into blocks of column vectors.
+ *
+ * <p>The bulk encoder may be stateful and is bound to a single stream during its
+ * lifetime.
+ *
+ * @param <T> The type of the elements encoded through this encoder.
+ */
+@PublicEvolving
+public interface BulkWriter<T> {
+
+ /**
+ * Adds an element to the encoder. The encoder may temporarily buffer the element,
+ * or immediately write it to the stream.
+ *
+ * <p>It may be that adding this element fills up an internal buffer and causes the
+ * encoding and flushing of a batch of internally buffered elements.
+ *
+ * @param element The element to add.
+ * @throws IOException Thrown, if the element cannot be added to the encoder,
+ * or if the output stream throws an exception.
+ */
+ void addElement(T element) throws IOException;
+
+ /**
+ * Flushes all intermediate buffered data to the output stream.
+ * It is expected that flushing often may reduce the efficiency of the encoding.
+ *
+ * @throws IOException Thrown if the encoder cannot be flushed, or if the output
+ * stream throws an exception.
+ */
+ void flush() throws IOException;
+
+ /**
+ * Finishes the writing. This must flush all internal buffer, finish encoding, and write
+ * footers.
+ *
+ * <p>The writer is not expected to handle any more records via {@link #addElement(Object)} after
+ * this method is called.
+ *
+ * <p><b>Important:</b> This method MUST NOT close the stream that the writer writes to.
+ * Closing the stream is expected to happen through the invoker of this method afterwards.
+ *
+ * @throws IOException Thrown if the finalization fails.
+ */
+ void finish() throws IOException;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A factory that creates a {@link BulkWriter}.
+ * @param <T> The type of record to write.
+ */
+ @FunctionalInterface
+ interface Factory<T> extends Serializable {
+
+ /**
+ * Creates a writer that writes to the given stream.
+ *
+ * @param out The output stream to write the encoded data to.
+ * @throws IOException Thrown if the writer cannot be opened, or if the output
+ * stream throws an exception.
+ */
+ BulkWriter<T> create(FSDataOutputStream out) throws IOException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index 92b3a74c..04e6315 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -65,6 +65,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr
}
@Override
+ public RecoverableWriter createRecoverableWriter() throws IOException {
+ return unsafeFileSystem.createRecoverableWriter();
+ }
+
+ @Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
return unsafeFileSystem.getFileBlockLocations(file, start, len);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index d9a6d75..3e2d22c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.Preconditions;
@@ -30,6 +29,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
/**
* A bucket is the directory organization of the output of the {@link StreamingFileSink}.
@@ -39,17 +39,17 @@ import java.util.Map;
* queried to see in which bucket this element should be written to.
*/
@PublicEvolving
-public class Bucket<IN> {
+public class Bucket<IN, BucketID> {
private static final String PART_PREFIX = "part";
- private final String bucketId;
+ private final BucketID bucketId;
private final Path bucketPath;
private final int subtaskIndex;
- private final Encoder<IN> encoder;
+ private final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory;
private final RecoverableWriter fsWriter;
@@ -57,7 +57,7 @@ public class Bucket<IN> {
private long partCounter;
- private PartFileHandler<IN> currentPart;
+ private PartFileWriter<IN, BucketID> currentPart;
private List<RecoverableWriter.CommitRecoverable> pending;
@@ -68,10 +68,10 @@ public class Bucket<IN> {
RecoverableWriter fsWriter,
int subtaskIndex,
long initialPartCounter,
- Encoder<IN> writer,
- BucketState bucketstate) throws IOException {
+ PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
+ BucketState<BucketID> bucketState) throws IOException {
- this(fsWriter, subtaskIndex, bucketstate.getBucketId(), bucketstate.getBucketPath(), initialPartCounter, writer);
+ this(fsWriter, subtaskIndex, bucketState.getBucketId(), bucketState.getBucketPath(), initialPartCounter, partFileFactory);
// the constructor must have already initialized the filesystem writer
Preconditions.checkState(fsWriter != null);
@@ -79,15 +79,15 @@ public class Bucket<IN> {
// we try to resume the previous in-progress file, if the filesystem
// supports such operation. If not, we just commit the file and start fresh.
- final RecoverableWriter.ResumeRecoverable resumable = bucketstate.getCurrentInProgress();
+ final RecoverableWriter.ResumeRecoverable resumable = bucketState.getInProgress();
if (resumable != null) {
- currentPart = PartFileHandler.resumeFrom(
- bucketId, fsWriter, resumable, bucketstate.getCreationTime());
+ currentPart = partFileFactory.resumeFrom(
+ bucketId, fsWriter, resumable, bucketState.getCreationTime());
}
// we commit pending files for previous checkpoints to the last successful one
// (from which we are recovering from)
- for (List<RecoverableWriter.CommitRecoverable> commitables: bucketstate.getPendingPerCheckpoint().values()) {
+ for (List<RecoverableWriter.CommitRecoverable> commitables: bucketState.getPendingPerCheckpoint().values()) {
for (RecoverableWriter.CommitRecoverable commitable: commitables) {
fsWriter.recoverForCommit(commitable).commitAfterRecovery();
}
@@ -100,26 +100,26 @@ public class Bucket<IN> {
public Bucket(
RecoverableWriter fsWriter,
int subtaskIndex,
- String bucketId,
+ BucketID bucketId,
Path bucketPath,
long initialPartCounter,
- Encoder<IN> writer) {
+ PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory) {
this.fsWriter = Preconditions.checkNotNull(fsWriter);
this.subtaskIndex = subtaskIndex;
this.bucketId = Preconditions.checkNotNull(bucketId);
this.bucketPath = Preconditions.checkNotNull(bucketPath);
this.partCounter = initialPartCounter;
- this.encoder = Preconditions.checkNotNull(writer);
+ this.partFileFactory = Preconditions.checkNotNull(partFileFactory);
this.pending = new ArrayList<>();
}
- public PartFileInfo getInProgressPartInfo() {
+ public PartFileInfo<BucketID> getInProgressPartInfo() {
return currentPart;
}
- public String getBucketId() {
+ public BucketID getBucketId() {
return bucketId;
}
@@ -137,18 +137,18 @@ public class Bucket<IN> {
void write(IN element, long currentTime) throws IOException {
Preconditions.checkState(currentPart != null, "bucket has been closed");
- currentPart.write(element, encoder, currentTime);
+ currentPart.write(element, currentTime);
}
void rollPartFile(final long currentTime) throws IOException {
closePartFile();
- currentPart = PartFileHandler.openNew(bucketId, fsWriter, getNewPartPath(), currentTime);
+ currentPart = partFileFactory.openNew(bucketId, fsWriter, getNewPartPath(), currentTime);
partCounter++;
}
- void merge(final Bucket<IN> bucket) throws IOException {
+ void merge(final Bucket<IN, BucketID> bucket) throws IOException {
Preconditions.checkNotNull(bucket);
- Preconditions.checkState(bucket.getBucketPath().equals(getBucketPath()));
+ Preconditions.checkState(Objects.equals(bucket.getBucketPath(), bucketPath));
// there should be no pending files in the "to-merge" states.
Preconditions.checkState(bucket.pending.isEmpty());
@@ -176,7 +176,7 @@ public class Bucket<IN> {
}
}
- public void commitUpToCheckpoint(long checkpointId) throws IOException {
+ public void onCheckpointAcknowledgment(long checkpointId) throws IOException {
Preconditions.checkNotNull(fsWriter);
Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it =
@@ -193,7 +193,7 @@ public class Bucket<IN> {
}
}
- public BucketState snapshot(long checkpointId) throws IOException {
+ public BucketState<BucketID> onCheckpoint(long checkpointId) throws IOException {
RecoverableWriter.ResumeRecoverable resumable = null;
long creationTime = Long.MAX_VALUE;
@@ -206,7 +206,7 @@ public class Bucket<IN> {
pendingPerCheckpoint.put(checkpointId, pending);
pending = new ArrayList<>();
}
- return new BucketState(bucketId, bucketPath, creationTime, resumable, pendingPerCheckpoint);
+ return new BucketState<>(bucketId, bucketPath, creationTime, resumable, pendingPerCheckpoint);
}
private Path getNewPartPath() {
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
index 88f3c1a..0c6b587 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
@@ -30,20 +29,20 @@ import java.io.Serializable;
* A factory able to create {@link Bucket buckets} for the {@link StreamingFileSink}.
*/
@Internal
-public interface BucketFactory<IN> extends Serializable {
+interface BucketFactory<IN, BucketID> extends Serializable {
- Bucket<IN> getNewBucket(
- RecoverableWriter fsWriter,
- int subtaskIndex,
- String bucketId,
- Path bucketPath,
- long initialPartCounter,
- Encoder<IN> writer) throws IOException;
+ Bucket<IN, BucketID> getNewBucket(
+ final RecoverableWriter fsWriter,
+ final int subtaskIndex,
+ final BucketID bucketId,
+ final Path bucketPath,
+ final long initialPartCounter,
+ final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory) throws IOException;
- Bucket<IN> restoreBucket(
- RecoverableWriter fsWriter,
- int subtaskIndex,
- long initialPartCounter,
- Encoder<IN> writer,
- BucketState bucketstate) throws IOException;
+ Bucket<IN, BucketID> restoreBucket(
+ final RecoverableWriter fsWriter,
+ final int subtaskIndex,
+ final long initialPartCounter,
+ final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+ final BucketState<BucketID> bucketState) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
index 5ebc46c..bb49e3a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
@@ -32,9 +32,9 @@ import java.util.Map;
* The state of the {@link Bucket} that is to be checkpointed.
*/
@Internal
-public class BucketState {
+public class BucketState<BucketID> {
- private final String bucketId;
+ private final BucketID bucketId;
/**
* The base path for the bucket, i.e. the directory where all the part files are stored.
@@ -59,10 +59,10 @@ public class BucketState {
private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint;
public BucketState(
- final String bucketId,
+ final BucketID bucketId,
final Path bucketPath,
final long creationTime,
- final @Nullable RecoverableWriter.ResumeRecoverable inProgress,
+ @Nullable final RecoverableWriter.ResumeRecoverable inProgress,
final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint
) {
this.bucketId = Preconditions.checkNotNull(bucketId);
@@ -72,7 +72,7 @@ public class BucketState {
this.pendingPerCheckpoint = Preconditions.checkNotNull(pendingPerCheckpoint);
}
- public String getBucketId() {
+ public BucketID getBucketId() {
return bucketId;
}
@@ -85,7 +85,7 @@ public class BucketState {
}
@Nullable
- public RecoverableWriter.ResumeRecoverable getCurrentInProgress() {
+ public RecoverableWriter.ResumeRecoverable getInProgress() {
return inProgress;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
index a167ec9..cf9b805 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
@@ -42,7 +41,7 @@ import java.util.Map.Entry;
* A {@code SimpleVersionedSerializer} used to serialize the {@link BucketState BucketState}.
*/
@Internal
-class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
+class BucketStateSerializer<BucketID> implements SimpleVersionedSerializer<BucketState<BucketID>> {
private static final int MAGIC_NUMBER = 0x1e764b79;
@@ -50,12 +49,16 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer;
- public BucketStateSerializer(
- final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer,
- final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer) {
+ private final SimpleVersionedSerializer<BucketID> bucketIdSerializer;
+ BucketStateSerializer(
+ final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer,
+ final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer,
+ final SimpleVersionedSerializer<BucketID> bucketIdSerializer
+ ) {
this.resumableSerializer = Preconditions.checkNotNull(resumableSerializer);
this.commitableSerializer = Preconditions.checkNotNull(commitableSerializer);
+ this.bucketIdSerializer = Preconditions.checkNotNull(bucketIdSerializer);
}
@Override
@@ -64,7 +67,7 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
}
@Override
- public byte[] serialize(BucketState state) throws IOException {
+ public byte[] serialize(BucketState<BucketID> state) throws IOException {
DataOutputSerializer out = new DataOutputSerializer(256);
out.writeInt(MAGIC_NUMBER);
serializeV1(state, out);
@@ -72,7 +75,7 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
}
@Override
- public BucketState deserialize(int version, byte[] serialized) throws IOException {
+ public BucketState<BucketID> deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
DataInputDeserializer in = new DataInputDeserializer(serialized);
@@ -84,13 +87,13 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
}
@VisibleForTesting
- void serializeV1(BucketState state, DataOutputView out) throws IOException {
- out.writeUTF(state.getBucketId());
+ void serializeV1(BucketState<BucketID> state, DataOutputView out) throws IOException {
+ SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, state.getBucketId(), out);
out.writeUTF(state.getBucketPath().toString());
out.writeLong(state.getCreationTime());
// put the current open part file
- final RecoverableWriter.ResumeRecoverable currentPart = state.getCurrentInProgress();
+ final RecoverableWriter.ResumeRecoverable currentPart = state.getInProgress();
if (currentPart != null) {
out.writeBoolean(true);
SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, currentPart, out);
@@ -100,19 +103,19 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
}
// put the map of pending files per checkpoint
- final Map<Long, List<CommitRecoverable>> pendingCommitters = state.getPendingPerCheckpoint();
+ final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommitters = state.getPendingPerCheckpoint();
// manually keep the version here to safe some bytes
out.writeInt(commitableSerializer.getVersion());
out.writeInt(pendingCommitters.size());
- for (Entry<Long, List<CommitRecoverable>> resumablesForCheckpoint : pendingCommitters.entrySet()) {
- List<CommitRecoverable> resumables = resumablesForCheckpoint.getValue();
+ for (Entry<Long, List<RecoverableWriter.CommitRecoverable>> resumablesForCheckpoint : pendingCommitters.entrySet()) {
+ List<RecoverableWriter.CommitRecoverable> resumables = resumablesForCheckpoint.getValue();
out.writeLong(resumablesForCheckpoint.getKey());
out.writeInt(resumables.size());
- for (CommitRecoverable resumable : resumables) {
+ for (RecoverableWriter.CommitRecoverable resumable : resumables) {
byte[] serialized = commitableSerializer.serialize(resumable);
out.writeInt(serialized.length);
out.write(serialized);
@@ -121,8 +124,8 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
}
@VisibleForTesting
- BucketState deserializeV1(DataInputView in) throws IOException {
- final String bucketId = in.readUTF();
+ BucketState<BucketID> deserializeV1(DataInputView in) throws IOException {
+ final BucketID bucketId = SimpleVersionedSerialization.readVersionAndDeSerialize(bucketIdSerializer, in);
final String bucketPathStr = in.readUTF();
final long creationTime = in.readLong();
@@ -140,7 +143,7 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
final long checkpointId = in.readLong();
final int noOfResumables = in.readInt();
- final ArrayList<RecoverableWriter.CommitRecoverable> resumables = new ArrayList<>(noOfResumables);
+ final List<RecoverableWriter.CommitRecoverable> resumables = new ArrayList<>(noOfResumables);
for (int j = 0; j < noOfResumables; j++) {
final byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
@@ -149,7 +152,7 @@ class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
resumablesPerCheckpoint.put(checkpointId, resumables);
}
- return new BucketState(
+ return new BucketState<>(
bucketId,
new Path(bucketPathStr),
creationTime,
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
new file mode 100644
index 0000000..6afba17
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * The manager of the different active buckets in the {@link StreamingFileSink}.
+ *
+ * <p>This class is responsible for all bucket-related operations and the actual
+ * {@link StreamingFileSink} is just plugging in the functionality offered by
+ * this class to the lifecycle of the operator.
+ *
+ * @param <IN> The type of input elements.
+ * @param <BucketID> The type of ids for the buckets, as returned by the {@link Bucketer}.
+ */
+public class Buckets<IN, BucketID> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Buckets.class);
+
+ // ------------------------ configuration fields --------------------------
+
+ private final Path basePath;
+
+ private final BucketFactory<IN, BucketID> bucketFactory;
+
+ private final Bucketer<IN, BucketID> bucketer;
+
+ private final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory;
+
+ private final RollingPolicy<BucketID> rollingPolicy;
+
+ // --------------------------- runtime fields -----------------------------
+
+ private final int subtaskIndex;
+
+ private final Buckets.BucketerContext bucketerContext;
+
+ private final Map<BucketID, Bucket<IN, BucketID>> activeBuckets;
+
+ private long initMaxPartCounter;
+
+ private long maxPartCounterUsed;
+
+ private final RecoverableWriter fileSystemWriter;
+
+ // --------------------------- State Related Fields -----------------------------
+
+ private final BucketStateSerializer<BucketID> bucketStateSerializer;
+
+ /**
+ * A private constructor creating a new empty bucket manager.
+ *
+ * @param basePath The base path for our buckets.
+ * @param bucketer The {@link Bucketer} provided by the user.
+ * @param bucketFactory The {@link BucketFactory} to be used to create buckets.
+ * @param partFileWriterFactory The {@link PartFileWriter.PartFileFactory} to be used when writing data.
+ * @param rollingPolicy The {@link RollingPolicy} as specified by the user.
+ */
+ Buckets(
+ final Path basePath,
+ final Bucketer<IN, BucketID> bucketer,
+ final BucketFactory<IN, BucketID> bucketFactory,
+ final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+ final RollingPolicy<BucketID> rollingPolicy,
+ final int subtaskIndex) throws IOException {
+
+ this.basePath = Preconditions.checkNotNull(basePath);
+ this.bucketer = Preconditions.checkNotNull(bucketer);
+ this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
+ this.partFileWriterFactory = Preconditions.checkNotNull(partFileWriterFactory);
+ this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
+ this.subtaskIndex = subtaskIndex;
+
+ this.activeBuckets = new HashMap<>();
+ this.bucketerContext = new Buckets.BucketerContext();
+
+ this.fileSystemWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
+ this.bucketStateSerializer = new BucketStateSerializer<>(
+ fileSystemWriter.getResumeRecoverableSerializer(),
+ fileSystemWriter.getCommitRecoverableSerializer(),
+ bucketer.getSerializer()
+ );
+
+ this.initMaxPartCounter = 0L;
+ this.maxPartCounterUsed = 0L;
+ }
+
+ /**
+ * Initializes the state after recovery from a failure.
+ * @param bucketStates the state holding recovered state about active buckets.
+ * @param partCounterState the state holding the max previously used part counters.
+ * @throws Exception
+ */
+ void initializeState(final ListState<byte[]> bucketStates, final ListState<Long> partCounterState) throws Exception {
+
+ // When resuming after a failure:
+ // 1) we get the max part counter used before in order to make sure that we do not overwrite valid data
+ // 2) we commit any pending files for previous checkpoints (previous to the last successful one)
+ // 3) we resume writing to the previous in-progress file of each bucket, and
+ // 4) if we receive multiple states for the same bucket, we merge them.
+
+ // get the max counter
+ long maxCounter = 0L;
+ for (long partCounter: partCounterState.get()) {
+ maxCounter = Math.max(partCounter, maxCounter);
+ }
+ initMaxPartCounter = maxCounter;
+
+ // get the restored buckets
+ for (byte[] recoveredState : bucketStates.get()) {
+ final BucketState<BucketID> bucketState = SimpleVersionedSerialization.readVersionAndDeSerialize(
+ bucketStateSerializer, recoveredState);
+
+ final BucketID bucketId = bucketState.getBucketId();
+
+ LOG.info("Recovered bucket for {}", bucketId);
+
+ final Bucket<IN, BucketID> restoredBucket = bucketFactory.restoreBucket(
+ fileSystemWriter,
+ subtaskIndex,
+ initMaxPartCounter,
+ partFileWriterFactory,
+ bucketState
+ );
+
+ final Bucket<IN, BucketID> existingBucket = activeBuckets.get(bucketId);
+ if (existingBucket == null) {
+ activeBuckets.put(bucketId, restoredBucket);
+ } else {
+ existingBucket.merge(restoredBucket);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} idx {} restored state for bucket {}", getClass().getSimpleName(),
+ subtaskIndex, assembleBucketPath(bucketId));
+ }
+ }
+ }
+
+ void publishUpToCheckpoint(long checkpointId) throws IOException {
+ final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
+ activeBuckets.entrySet().iterator();
+
+ while (activeBucketIt.hasNext()) {
+ Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
+ bucket.onCheckpointAcknowledgment(checkpointId);
+
+ if (!bucket.isActive()) {
+ // We've dealt with all the pending files and the writer for this bucket is not currently open.
+ // Therefore this bucket is currently inactive and we can remove it from our state.
+ activeBucketIt.remove();
+ }
+ }
+ }
+
+ void snapshotState(
+ final long checkpointId,
+ final long checkpointTimestamp,
+ final ListState<byte[]> bucketStates,
+ final ListState<Long> partCounterState) throws Exception {
+
+ Preconditions.checkState(
+ fileSystemWriter != null && bucketStateSerializer != null,
+ "sink has not been initialized"
+ );
+
+ for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
+ final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
+
+ if (info != null &&
+ (rollingPolicy.shouldRollOnCheckpoint(info) ||
+ rollingPolicy.shouldRollOnEvent(info) ||
+ rollingPolicy.shouldRollOnProcessingTime(info, checkpointTimestamp))
+ ) {
+ // we also check here so that we do not have to always
+ // wait for the "next" element to arrive.
+ bucket.closePartFile();
+ }
+
+ final BucketState<BucketID> bucketState = bucket.onCheckpoint(checkpointId);
+ bucketStates.add(SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer, bucketState));
+ }
+
+ partCounterState.add(maxPartCounterUsed);
+ }
+
+ /**
+ * Called on every incoming element to write it to its final location.
+ * @param value the element itself.
+ * @param context the {@link SinkFunction.Context context} available to the sink function.
+ * @throws Exception
+ */
+ void onElement(IN value, SinkFunction.Context context) throws Exception {
+ final long currentProcessingTime = context.currentProcessingTime();
+
+ // setting the values in the bucketer context
+ bucketerContext.update(
+ context.timestamp(),
+ context.currentWatermark(),
+ currentProcessingTime);
+
+ final BucketID bucketId = bucketer.getBucketId(value, bucketerContext);
+
+ Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
+ if (bucket == null) {
+ final Path bucketPath = assembleBucketPath(bucketId);
+ bucket = bucketFactory.getNewBucket(
+ fileSystemWriter,
+ subtaskIndex,
+ bucketId,
+ bucketPath,
+ initMaxPartCounter,
+ partFileWriterFactory);
+ activeBuckets.put(bucketId, bucket);
+ }
+
+ final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
+ if (info == null || rollingPolicy.shouldRollOnEvent(info)) {
+ bucket.rollPartFile(currentProcessingTime);
+ }
+ bucket.write(value, currentProcessingTime);
+
+ // we update the counter here because as buckets become inactive and
+ // get removed in the initializeState(), at the time we snapshot they
+ // may not be there to take them into account during checkpointing.
+ updateMaxPartCounter(bucket.getPartCounter());
+ }
+
+ void onProcessingTime(long timestamp) throws Exception {
+ for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
+ final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
+ if (info != null && rollingPolicy.shouldRollOnProcessingTime(info, timestamp)) {
+ bucket.closePartFile();
+ }
+ }
+ }
+
+ void close() {
+ if (activeBuckets != null) {
+ activeBuckets.values().forEach(Bucket::dispose);
+ }
+ }
+
+ /**
+ * Assembles the final bucket {@link Path} that will be used for the provided bucket in the
+ * underlying filesystem.
+ * @param bucketId the id of the bucket as returned by the {@link Bucketer}.
+ * @return The resulting path.
+ */
+ private Path assembleBucketPath(BucketID bucketId) {
+ return new Path(basePath, bucketId.toString());
+ }
+
+ /**
+ * Updates the state keeping track of the maximum used part
+ * counter across all local active buckets.
+ * @param candidate the part counter that will potentially replace the current {@link #maxPartCounterUsed}.
+ */
+ private void updateMaxPartCounter(long candidate) {
+ maxPartCounterUsed = Math.max(maxPartCounterUsed, candidate);
+ }
+
+ /**
+ * The {@link Bucketer.Context} exposed to the
+ * {@link Bucketer#getBucketId(Object, Bucketer.Context)}
+ * whenever a new incoming element arrives.
+ */
+ private static final class BucketerContext implements Bucketer.Context {
+
+ @Nullable
+ private Long elementTimestamp;
+
+ private long currentWatermark;
+
+ private long currentProcessingTime;
+
+ private BucketerContext() {
+ this.elementTimestamp = null;
+ this.currentWatermark = Long.MIN_VALUE;
+ this.currentProcessingTime = Long.MIN_VALUE;
+ }
+
+ void update(@Nullable Long element, long watermark, long processingTime) {
+ this.elementTimestamp = element;
+ this.currentWatermark = watermark;
+ this.currentProcessingTime = processingTime;
+ }
+
+ @Override
+ public long currentProcessingTime() {
+ return currentProcessingTime;
+ }
+
+ @Override
+ public long currentWatermark() {
+ return currentWatermark;
+ }
+
+ @Override
+ @Nullable
+ public Long timestamp() {
+ return elementTimestamp;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
new file mode 100644
index 0000000..558b1bf
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A {@link PartFileWriter} for bulk-encoding formats that use an {@link BulkPartWriter}.
+ * This also implements the {@link PartFileInfo}.
+ */
+@Internal
+final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
+
+ private final BulkWriter<IN> writer;
+
+ private BulkPartWriter(
+ final BucketID bucketId,
+ final RecoverableFsDataOutputStream currentPartStream,
+ final BulkWriter<IN> writer,
+ final long creationTime) {
+ super(bucketId, currentPartStream, creationTime);
+ this.writer = Preconditions.checkNotNull(writer);
+ }
+
+ @Override
+ void write(IN element, long currentTime) throws IOException {
+ writer.addElement(element);
+ markWrite(currentTime);
+ }
+
+ @Override
+ RecoverableWriter.ResumeRecoverable persist() {
+ throw new UnsupportedOperationException("Bulk Part Writers do not support \"pause and resume\" operations.");
+ }
+
+ @Override
+ RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
+ writer.flush();
+ writer.finish();
+ return super.closeForCommit();
+ }
+
+ /**
+ * A factory that creates {@link BulkPartWriter BulkPartWriters}.
+ * @param <IN> The type of input elements.
+ * @param <BucketID> The type of ids for the buckets, as returned by the {@link Bucketer}.
+ */
+ static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> {
+
+ private final BulkWriter.Factory<IN> writerFactory;
+
+ Factory(BulkWriter.Factory<IN> writerFactory) {
+ this.writerFactory = writerFactory;
+ }
+
+ @Override
+ public PartFileWriter<IN, BucketID> resumeFrom(
+ final BucketID bucketId,
+ final RecoverableWriter fileSystemWriter,
+ final RecoverableWriter.ResumeRecoverable resumable,
+ final long creationTime) throws IOException {
+
+ Preconditions.checkNotNull(fileSystemWriter);
+ Preconditions.checkNotNull(resumable);
+
+ final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
+ final BulkWriter<IN> writer = writerFactory.create(stream);
+ return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
+ }
+
+ @Override
+ public PartFileWriter<IN, BucketID> openNew(
+ final BucketID bucketId,
+ final RecoverableWriter fileSystemWriter,
+ final Path path,
+ final long creationTime) throws IOException {
+
+ Preconditions.checkNotNull(fileSystemWriter);
+ Preconditions.checkNotNull(path);
+
+ final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
+ final BulkWriter<IN> writer = writerFactory.create(stream);
+ return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
index 795ba74..532138f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
@@ -29,18 +28,18 @@ import java.io.IOException;
* A factory returning {@link Bucket buckets}.
*/
@Internal
-public class DefaultBucketFactory<IN> implements BucketFactory<IN> {
+class DefaultBucketFactory<IN, BucketID> implements BucketFactory<IN, BucketID> {
- private static final long serialVersionUID = 3372881359208513357L;
+ private static final long serialVersionUID = 1L;
@Override
- public Bucket<IN> getNewBucket(
- RecoverableWriter fsWriter,
- int subtaskIndex,
- String bucketId,
- Path bucketPath,
- long initialPartCounter,
- Encoder<IN> writer) throws IOException {
+ public Bucket<IN, BucketID> getNewBucket(
+ final RecoverableWriter fsWriter,
+ final int subtaskIndex,
+ final BucketID bucketId,
+ final Path bucketPath,
+ final long initialPartCounter,
+ final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory) {
return new Bucket<>(
fsWriter,
@@ -48,22 +47,22 @@ public class DefaultBucketFactory<IN> implements BucketFactory<IN> {
bucketId,
bucketPath,
initialPartCounter,
- writer);
+ partFileWriterFactory);
}
@Override
- public Bucket<IN> restoreBucket(
- RecoverableWriter fsWriter,
- int subtaskIndex,
- long initialPartCounter,
- Encoder<IN> writer,
- BucketState bucketState) throws IOException {
+ public Bucket<IN, BucketID> restoreBucket(
+ final RecoverableWriter fsWriter,
+ final int subtaskIndex,
+ final long initialPartCounter,
+ final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+ final BucketState<BucketID> bucketState) throws IOException {
return new Bucket<>(
fsWriter,
subtaskIndex,
initialPartCounter,
- writer,
+ partFileWriterFactory,
bucketState);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
deleted file mode 100644
index 026ac70..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink.filesystem;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * The default implementation of the {@link RollingPolicy}.
- *
- * <p>This policy rolls a part file if:
- * <ol>
- * <li>there is no open part file,</li>
- * <li>the current file has reached the maximum bucket size (by default 128MB),</li>
- * <li>the current file is older than the roll over interval (by default 60 sec), or</li>
- * <li>the current file has not been written to for more than the allowed inactivityTime (by default 60 sec).</li>
- * </ol>
- */
-@PublicEvolving
-public final class DefaultRollingPolicy implements RollingPolicy {
-
- private static final long serialVersionUID = 1318929857047767030L;
-
- private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
-
- private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
-
- private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L;
-
- private final long partSize;
-
- private final long rolloverInterval;
-
- private final long inactivityInterval;
-
- /**
- * Private constructor to avoid direct instantiation.
- */
- private DefaultRollingPolicy(long partSize, long rolloverInterval, long inactivityInterval) {
- Preconditions.checkArgument(partSize > 0L);
- Preconditions.checkArgument(rolloverInterval > 0L);
- Preconditions.checkArgument(inactivityInterval > 0L);
-
- this.partSize = partSize;
- this.rolloverInterval = rolloverInterval;
- this.inactivityInterval = inactivityInterval;
- }
-
- @Override
- public boolean shouldRoll(final PartFileInfo state, final long currentTime) throws IOException {
- if (state == null) {
- // this means that there is no currently open part file.
- return true;
- }
-
- if (state.getSize() > partSize) {
- return true;
- }
-
- if (currentTime - state.getCreationTime() > rolloverInterval) {
- return true;
- }
-
- return currentTime - state.getLastUpdateTime() > inactivityInterval;
- }
-
- /**
- * Initiates the instantiation of a {@link DefaultRollingPolicy}.
- * To finalize it and have the actual policy, call {@code .create()}.
- */
- public static PolicyBuilder create() {
- return new PolicyBuilder();
- }
-
- /**
- * A helper class that holds the configuration properties for the {@link DefaultRollingPolicy}.
- */
- @PublicEvolving
- public static class PolicyBuilder {
-
- private long partSize = DEFAULT_MAX_PART_SIZE;
-
- private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL;
-
- private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL;
-
- /**
- * Sets the part size above which a part file will have to roll.
- * @param size the allowed part size.
- */
- public PolicyBuilder withMaxPartSize(long size) {
- Preconditions.checkState(size > 0L);
- this.partSize = size;
- return this;
- }
-
- /**
- * Sets the interval of allowed inactivity after which a part file will have to roll.
- * @param interval the allowed inactivity interval.
- */
- public PolicyBuilder withInactivityInterval(long interval) {
- Preconditions.checkState(interval > 0L);
- this.inactivityInterval = interval;
- return this;
- }
-
- /**
- * Sets the max time a part file can stay open before having to roll.
- * @param interval the desired rollover interval.
- */
- public PolicyBuilder withRolloverInterval(long interval) {
- Preconditions.checkState(interval > 0L);
- this.rolloverInterval = interval;
- return this;
- }
-
- /**
- * Creates the actual policy.
- */
- public DefaultRollingPolicy build() {
- return new DefaultRollingPolicy(partSize, rolloverInterval, inactivityInterval);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
deleted file mode 100644
index 10fd12b..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink.filesystem;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * A handler for the currently open part file in a specific {@link Bucket}.
- * This also implements the {@link PartFileInfo}.
- */
-@Internal
-class PartFileHandler<IN> implements PartFileInfo {
-
- private final String bucketId;
-
- private final long creationTime;
-
- private final RecoverableFsDataOutputStream currentPartStream;
-
- private long lastUpdateTime;
-
- private PartFileHandler(
- final String bucketId,
- final RecoverableFsDataOutputStream currentPartStream,
- final long creationTime) {
-
- Preconditions.checkArgument(creationTime >= 0L);
- this.bucketId = Preconditions.checkNotNull(bucketId);
- this.currentPartStream = Preconditions.checkNotNull(currentPartStream);
- this.creationTime = creationTime;
- this.lastUpdateTime = creationTime;
- }
-
- public static <IN> PartFileHandler<IN> resumeFrom(
- final String bucketId,
- final RecoverableWriter fileSystemWriter,
- final RecoverableWriter.ResumeRecoverable resumable,
- final long creationTime) throws IOException {
- Preconditions.checkNotNull(bucketId);
- Preconditions.checkNotNull(fileSystemWriter);
- Preconditions.checkNotNull(resumable);
-
- final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
- return new PartFileHandler<>(bucketId, stream, creationTime);
- }
-
- public static <IN> PartFileHandler<IN> openNew(
- final String bucketId,
- final RecoverableWriter fileSystemWriter,
- final Path path,
- final long creationTime) throws IOException {
- Preconditions.checkNotNull(bucketId);
- Preconditions.checkNotNull(fileSystemWriter);
- Preconditions.checkNotNull(path);
-
- final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
- return new PartFileHandler<>(bucketId, stream, creationTime);
- }
-
- void write(IN element, Encoder<IN> encoder, long currentTime) throws IOException {
- encoder.encode(element, currentPartStream);
- this.lastUpdateTime = currentTime;
- }
-
- RecoverableWriter.ResumeRecoverable persist() throws IOException {
- return currentPartStream.persist();
- }
-
- RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
- return currentPartStream.closeForCommit().getRecoverable();
- }
-
- void dispose() {
- // we can suppress exceptions here, because we do not rely on close() to
- // flush or persist any data
- IOUtils.closeQuietly(currentPartStream);
- }
-
- @Override
- public String getBucketId() {
- return bucketId;
- }
-
- @Override
- public long getCreationTime() {
- return creationTime;
- }
-
- @Override
- public long getSize() throws IOException {
- return currentPartStream.getPos();
- }
-
- @Override
- public long getLastUpdateTime() {
- return lastUpdateTime;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
index 9c3d047..5e72ea0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
@@ -29,13 +29,13 @@ import java.io.IOException;
* should roll the part file or not.
*/
@PublicEvolving
-public interface PartFileInfo {
+public interface PartFileInfo<BucketID> {
/**
* @return The bucket identifier of the current buffer, as returned by the
* {@link Bucketer#getBucketId(Object, Bucketer.Context)}.
*/
- String getBucketId();
+ BucketID getBucketId();
/**
* @return The creation time (in ms) of the currently open part file.
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
new file mode 100644
index 0000000..662454b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * An abstract writer for the currently open part file in a specific {@link Bucket}.
+ *
+ * <p>Currently, there are two subclasses, of this class:
+ * <ol>
+ * <li>One for row-wise formats: the {@link RowWisePartWriter}.</li>
+ * <li>One for bulk encoding formats: the {@link BulkPartWriter}.</li>
+ * </ol>
+ *
+ * <p>This also implements the {@link PartFileInfo}.
+ */
+@Internal
+abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
+
+ private final BucketID bucketId;
+
+ private final long creationTime;
+
+ protected final RecoverableFsDataOutputStream currentPartStream;
+
+ private long lastUpdateTime;
+
+ protected PartFileWriter(
+ final BucketID bucketId,
+ final RecoverableFsDataOutputStream currentPartStream,
+ final long creationTime) {
+
+ Preconditions.checkArgument(creationTime >= 0L);
+ this.bucketId = Preconditions.checkNotNull(bucketId);
+ this.currentPartStream = Preconditions.checkNotNull(currentPartStream);
+ this.creationTime = creationTime;
+ this.lastUpdateTime = creationTime;
+ }
+
+ abstract void write(IN element, long currentTime) throws IOException;
+
+ RecoverableWriter.ResumeRecoverable persist() throws IOException {
+ return currentPartStream.persist();
+ }
+
+ RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
+ return currentPartStream.closeForCommit().getRecoverable();
+ }
+
+ void dispose() {
+ // we can suppress exceptions here, because we do not rely on close() to
+ // flush or persist any data
+ IOUtils.closeQuietly(currentPartStream);
+ }
+
+ void markWrite(long now) {
+ this.lastUpdateTime = now;
+ }
+
+ @Override
+ public BucketID getBucketId() {
+ return bucketId;
+ }
+
+ @Override
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ @Override
+ public long getSize() throws IOException {
+ return currentPartStream.getPos();
+ }
+
+ @Override
+ public long getLastUpdateTime() {
+ return lastUpdateTime;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * An interface for factories that create the different {@link PartFileWriter writers}.
+ */
+ interface PartFileFactory<IN, BucketID> {
+
+ /**
+ * Used upon recovery from a failure to recover a {@link PartFileWriter writer}.
+ * @param bucketId the id of the bucket this writer is writing to.
+ * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem.
+ * @param resumable the state of the stream we are resurrecting.
+ * @param creationTime the creation time of the stream.
+ * @return the recovered {@link PartFileWriter writer}.
+ * @throws IOException
+ */
+ PartFileWriter<IN, BucketID> resumeFrom(
+ final BucketID bucketId,
+ final RecoverableWriter fileSystemWriter,
+ final RecoverableWriter.ResumeRecoverable resumable,
+ final long creationTime) throws IOException;
+
+ /**
+ * Used to create a new {@link PartFileWriter writer}.
+ * @param bucketId the id of the bucket this writer is writing to.
+ * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem.
+ * @param path the part this writer will write to.
+ * @param creationTime the creation time of the stream.
+ * @return the new {@link PartFileWriter writer}.
+ * @throws IOException
+ */
+ PartFileWriter<IN, BucketID> openNew(
+ final BucketID bucketId,
+ final RecoverableWriter fileSystemWriter,
+ final Path path,
+ final long creationTime) throws IOException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
index 936377e..24c38aa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
@@ -28,13 +28,27 @@ import java.io.Serializable;
* rolls its currently open part file and opens a new one.
*/
@PublicEvolving
-public interface RollingPolicy extends Serializable {
+public interface RollingPolicy<BucketID> extends Serializable {
/**
- * Determines if the in-progress part file for a bucket should roll.
+ * Determines if the in-progress part file for a bucket should roll on every checkpoint.
+ * @param partFileState the state of the currently open part file of the bucket.
+ * @return {@code True} if the part file should roll, {@link false} otherwise.
+ */
+ boolean shouldRollOnCheckpoint(final PartFileInfo<BucketID> partFileState) throws IOException;
+
+ /**
+ * Determines if the in-progress part file for a bucket should roll based on its current state, e.g. its size.
+ * @param partFileState the state of the currently open part file of the bucket.
+ * @return {@code True} if the part file should roll, {@link false} otherwise.
+ */
+ boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState) throws IOException;
+
+ /**
+ * Determines if the in-progress part file for a bucket should roll based on a time condition.
* @param partFileState the state of the currently open part file of the bucket.
* @param currentTime the current processing time.
* @return {@code True} if the part file should roll, {@link false} otherwise.
*/
- boolean shouldRoll(final PartFileInfo partFileState, final long currentTime) throws IOException;
+ boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> partFileState, final long currentTime) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b56c75ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
new file mode 100644
index 0000000..0b00b43
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A {@link PartFileWriter} for row-wise formats that use an {@link Encoder}.
+ * This also implements the {@link PartFileInfo}.
+ */
+@Internal
+final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
+
+ private final Encoder<IN> encoder;
+
+ private RowWisePartWriter(
+ final BucketID bucketId,
+ final RecoverableFsDataOutputStream currentPartStream,
+ final Encoder<IN> encoder,
+ final long creationTime) {
+ super(bucketId, currentPartStream, creationTime);
+ this.encoder = Preconditions.checkNotNull(encoder);
+ }
+
+ @Override
+ void write(IN element, long currentTime) throws IOException {
+ encoder.encode(element, currentPartStream);
+ markWrite(currentTime);
+ }
+
+ /**
+ * A factory that creates {@link RowWisePartWriter RowWisePartWriters}.
+ * @param <IN> The type of input elements.
+ * @param <BucketID> The type of ids for the buckets, as returned by the {@link Bucketer}.
+ */
+ static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> {
+
+ private final Encoder<IN> encoder;
+
+ Factory(Encoder<IN> encoder) {
+ this.encoder = encoder;
+ }
+
+ @Override
+ public PartFileWriter<IN, BucketID> resumeFrom(
+ final BucketID bucketId,
+ final RecoverableWriter fileSystemWriter,
+ final RecoverableWriter.ResumeRecoverable resumable,
+ final long creationTime) throws IOException {
+
+ Preconditions.checkNotNull(fileSystemWriter);
+ Preconditions.checkNotNull(resumable);
+
+ final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
+ return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
+ }
+
+ @Override
+ public PartFileWriter<IN, BucketID> openNew(
+ final BucketID bucketId,
+ final RecoverableWriter fileSystemWriter,
+ final Path path,
+ final long creationTime) throws IOException {
+
+ Preconditions.checkNotNull(fileSystemWriter);
+ Preconditions.checkNotNull(path);
+
+ final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
+ return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
+ }
+ }
+}