You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/07/16 20:10:08 UTC
[3/3] flink git commit: [FLINK-9750] [DataStream API] Add new
StreamingFileSink on top of the ResumableWriter.
[FLINK-9750] [DataStream API] Add new StreamingFileSink on top of the ResumableWriter.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bbc91eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bbc91eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bbc91eb
Branch: refs/heads/master
Commit: 0bbc91eb1b23fa535e28fa8403f8040db46ebf98
Parents: 975fdbe
Author: kkloudas <kk...@gmail.com>
Authored: Mon Jul 16 20:51:44 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jul 16 20:54:02 2018 +0200
----------------------------------------------------------------------
.../flink/api/common/serialization/Encoder.java | 43 ++
.../serialization/SimpleStringEncoder.java | 69 ++
.../core/io/SimpleVersionedSerialization.java | 159 +++++
.../core/io/SimpleVersionedSerializer.java | 19 +-
.../io/SimpleVersionedSerializationTest.java | 159 +++++
.../api/functions/sink/filesystem/Bucket.java | 215 ++++++
.../sink/filesystem/BucketFactory.java | 49 ++
.../functions/sink/filesystem/BucketState.java | 95 +++
.../sink/filesystem/BucketStateSerializer.java | 166 +++++
.../sink/filesystem/DefaultBucketFactory.java | 69 ++
.../sink/filesystem/DefaultRollingPolicy.java | 142 ++++
.../sink/filesystem/PartFileHandler.java | 122 ++++
.../functions/sink/filesystem/PartFileInfo.java | 54 ++
.../sink/filesystem/RollingPolicy.java | 40 ++
.../sink/filesystem/StreamingFileSink.java | 425 ++++++++++++
.../filesystem/bucketers/BasePathBucketer.java | 41 ++
.../sink/filesystem/bucketers/Bucketer.java | 80 +++
.../filesystem/bucketers/DateTimeBucketer.java | 94 +++
.../filesystem/BucketStateSerializerTest.java | 265 +++++++
.../filesystem/LocalStreamingFileSinkTest.java | 689 +++++++++++++++++++
20 files changed, 2985 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-core/src/main/java/org/apache/flink/api/common/serialization/Encoder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/Encoder.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/Encoder.java
new file mode 100644
index 0000000..f60ddc9
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/Encoder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * A {@link Encoder} is used by the streaming file sink to perform the actual writing
+ * of the incoming elements to the files in a bucket.
+ *
+ * @param <IN> The type of the elements that are being written by the sink.
+ */
+@PublicEvolving
+public interface Encoder<IN> extends Serializable {
+
+ /**
+ * Writes one element to the bucket file.
+ * @param element the element to be written.
+ * @param stream the stream to write the element to.
+ */
+ void encode(IN element, OutputStream stream) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java
new file mode 100644
index 0000000..e50229b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringEncoder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+
+/**
+ * A simple {@link Encoder} that uses {@code toString()} on the input elements and
+ * writes them to the output bucket file separated by newline.
+ *
+ * @param <IN> The type of the elements that are being written by the sink.
+ */
+@PublicEvolving
+public class SimpleStringEncoder<IN> implements Encoder<IN> {
+
+ private static final long serialVersionUID = -6865107843734614452L;
+
+ private String charsetName;
+
+ private transient Charset charset;
+
+ /**
+ * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert
+ * strings to bytes.
+ */
+ public SimpleStringEncoder() {
+ this("UTF-8");
+ }
+
+ /**
+ * Creates a new {@code StringWriter} that uses the given charset to convert
+ * strings to bytes.
+ *
+ * @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}
+ */
+ public SimpleStringEncoder(String charsetName) {
+ this.charsetName = charsetName;
+ }
+
+ @Override
+ public void encode(IN element, OutputStream stream) throws IOException {
+ if (charset == null) {
+ charset = Charset.forName(charsetName);
+ }
+
+ stream.write(element.toString().getBytes(charset));
+ stream.write('\n');
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
new file mode 100644
index 0000000..8bead11
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerialization.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple serialization / deserialization methods for the {@link SimpleVersionedSerializer}.
+ */
+@PublicEvolving
+public class SimpleVersionedSerialization {
+
+ /**
+ * Serializes the version and datum into a stream.
+ *
+ * <p>Data serialized via this method can be deserialized via
+ * {@link #readVersionAndDeSerialize(SimpleVersionedSerializer, DataInputView)}.
+ *
+ * <p>The first four bytes will be occupied by the version, as returned by
+ * {@link SimpleVersionedSerializer#getVersion()}. The remaining bytes will be the serialized
+ * datum, as produced by {@link SimpleVersionedSerializer#serialize(Object)}, plus its length.
+ * The resulting array will hence be eight bytes larger than the serialized datum.
+ *
+ * @param serializer The serializer to serialize the datum with.
+ * @param datum The datum to serialize.
+ * @param out The stream to serialize to.
+ */
+ public static <T> void writeVersionAndSerialize(SimpleVersionedSerializer<T> serializer, T datum, DataOutputView out) throws IOException {
+ checkNotNull(serializer, "serializer");
+ checkNotNull(datum, "datum");
+ checkNotNull(out, "out");
+
+ final byte[] data = serializer.serialize(datum);
+
+ out.writeInt(serializer.getVersion());
+ out.writeInt(data.length);
+ out.write(data);
+ }
+
+ /**
+ * Deserializes the version and datum from a stream.
+ *
+ * <p>This method deserializes data serialized via
+ * {@link #writeVersionAndSerialize(SimpleVersionedSerializer, Object, DataOutputView)}.
+ *
+ * <p>The first four bytes will be interpreted as the version. The next four bytes will be
+ * interpreted as the length of the datum bytes, then length-many bytes will be read.
+ * Finally, the datum is deserialized via the {@link SimpleVersionedSerializer#deserialize(int, byte[])}
+ * method.
+ *
+ * @param serializer The serializer to serialize the datum with.
+ * @param in The stream to deserialize from.
+ */
+ public static <T> T readVersionAndDeSerialize(SimpleVersionedSerializer<T> serializer, DataInputView in) throws IOException {
+ checkNotNull(serializer, "serializer");
+ checkNotNull(in, "in");
+
+ final int version = in.readInt();
+ final int length = in.readInt();
+ final byte[] data = new byte[length];
+ in.readFully(data);
+
+ return serializer.deserialize(version, data);
+ }
+
+ /**
+ * Serializes the version and datum into a byte array. The first four bytes will be occupied by
+ * the version (as returned by {@link SimpleVersionedSerializer#getVersion()}),
+ * written in <i>big-endian</i> encoding. The remaining bytes will be the serialized
+ * datum, as produced by {@link SimpleVersionedSerializer#serialize(Object)}. The resulting array
+ * will hence be four bytes larger than the serialized datum.
+ *
+ * <p>Data serialized via this method can be deserialized via
+ * {@link #readVersionAndDeSerialize(SimpleVersionedSerializer, byte[])}.
+ *
+ * @param serializer The serializer to serialize the datum with.
+ * @param datum The datum to serialize.
+ *
+ * @return A byte array containing the serialized version and serialized datum.
+ *
+ * @throws IOException Exceptions from the {@link SimpleVersionedSerializer#serialize(Object)}
+ * method are forwarded.
+ */
+ public static <T> byte[] writeVersionAndSerialize(SimpleVersionedSerializer<T> serializer, T datum) throws IOException {
+ checkNotNull(serializer, "serializer");
+ checkNotNull(datum, "datum");
+
+ final byte[] data = serializer.serialize(datum);
+ final byte[] versionAndData = new byte[data.length + 4];
+
+ final int version = serializer.getVersion();
+ versionAndData[0] = (byte) (version >> 24);
+ versionAndData[1] = (byte) (version >> 16);
+ versionAndData[2] = (byte) (version >> 8);
+ versionAndData[3] = (byte) version;
+
+ // move the data to the array
+ System.arraycopy(data, 0, versionAndData, 4, data.length);
+
+ return versionAndData;
+ }
+
+ /**
+ * Deserializes the version and datum from a byte array. The first four bytes will be read as
+ * the version, in <i>big-endian</i> encoding. The remaining bytes will be passed to the serializer
+ * for deserialization, via {@link SimpleVersionedSerializer#deserialize(int, byte[])}.
+ *
+ * @param serializer The serializer to deserialize the datum with.
+ * @param bytes The bytes to deserialize from.
+ *
+ * @return The deserialized datum.
+ *
+ * @throws IOException Exceptions from the {@link SimpleVersionedSerializer#deserialize(int, byte[])}
+ * method are forwarded.
+ */
+ public static <T> T readVersionAndDeSerialize(SimpleVersionedSerializer<T> serializer, byte[] bytes) throws IOException {
+ checkNotNull(serializer, "serializer");
+ checkNotNull(bytes, "bytes");
+ checkArgument(bytes.length >= 4, "byte array below minimum length (4 bytes)");
+
+ final byte[] dataOnly = Arrays.copyOfRange(bytes, 4, bytes.length);
+ final int version =
+ ((bytes[0] & 0xff) << 24) |
+ ((bytes[1] & 0xff) << 16) |
+ ((bytes[2] & 0xff) << 8) |
+ (bytes[3] & 0xff);
+
+ return serializer.deserialize(version, dataOnly);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** Utility class, not meant to be instantiated. */
+ private SimpleVersionedSerialization() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
index 4dfeea2..07ba12d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
@@ -24,11 +24,11 @@ import java.io.IOException;
/**
* A simple serializer interface for versioned serialization.
- *
+ *
* <p>The serializer has a version (returned by {@link #getVersion()}) which can be attached
* to the serialized data. When the serializer evolves, the version can be used to identify
* with which prior version the data was serialized.
- *
+ *
* <pre>{@code
* MyType someObject = ...;
* SimpleVersionedSerializer<MyType> serializer = ...;
@@ -37,13 +37,13 @@ import java.io.IOException;
* int version = serializer.getVersion();
*
* MyType deserialized = serializer.deserialize(version, serializedData);
- *
+ *
* byte[] someOldData = ...;
* int oldVersion = ...;
* MyType deserializedOldObject = serializer.deserialize(oldVersion, someOldData);
- *
+ *
* }</pre>
- *
+ *
* @param <E> The data type serialized / deserialized by this serializer.
*/
@Internal
@@ -51,7 +51,7 @@ public interface SimpleVersionedSerializer<E> extends Versioned {
/**
* Gets the version with which this serializer serializes.
- *
+ *
* @return The version of the serialization schema.
*/
@Override
@@ -61,10 +61,9 @@ public interface SimpleVersionedSerializer<E> extends Versioned {
* Serializes the given object. The serialization is assumed to correspond to the
* current serialization version (as returned by {@link #getVersion()}.
*
- *
* @param obj The object to serialize.
* @return The serialized data (bytes).
- *
+ *
* @throws IOException Thrown, if the serialization fails.
*/
byte[] serialize(E obj) throws IOException;
@@ -72,11 +71,11 @@ public interface SimpleVersionedSerializer<E> extends Versioned {
/**
* De-serializes the given data (bytes) which was serialized with the scheme of the
* indicated version.
- *
+ *
* @param version The version in which the data was serialized
* @param serialized The serialized data
* @return The deserialized object
- *
+ *
* @throws IOException Thrown, if the deserialization fails.
*/
E deserialize(int version, byte[] serialized) throws IOException;
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
new file mode 100644
index 0000000..89a6b27
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/io/SimpleVersionedSerializationTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for the {@link SimpleVersionedSerialization} class.
+ */
+public class SimpleVersionedSerializationTest {
+
+ @Test
+ public void testStreamSerializationRoundTrip() throws IOException {
+ final SimpleVersionedSerializer<String> utfEncoder = new SimpleVersionedSerializer<String>() {
+
+ private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public byte[] serialize(String str) throws IOException {
+ return str.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public String deserialize(int version, byte[] serialized) throws IOException {
+ assertEquals(VERSION, version);
+ return new String(serialized, StandardCharsets.UTF_8);
+ }
+ };
+
+ final String testString = "dugfakgs";
+ final DataOutputSerializer out = new DataOutputSerializer(32);
+ SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString, out);
+
+ final DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+ final String deserialized = SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, in);
+ assertEquals(testString, deserialized);
+ }
+
+ @Test
+ public void testStreamSerializeEmpty() throws IOException {
+ final String testString = "beeeep!";
+
+ SimpleVersionedSerializer<String> emptySerializer = new SimpleVersionedSerializer<String>() {
+
+ @Override
+ public int getVersion() {
+ return 42;
+ }
+
+ @Override
+ public byte[] serialize(String obj) throws IOException {
+ return new byte[0];
+ }
+
+ @Override
+ public String deserialize(int version, byte[] serialized) throws IOException {
+ assertEquals(42, version);
+ assertEquals(0, serialized.length);
+ return testString;
+ }
+ };
+
+ final DataOutputSerializer out = new DataOutputSerializer(32);
+ SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc", out);
+
+ final DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+ assertEquals(testString, SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, in));
+ }
+
+ @Test
+ public void testSerializationRoundTrip() throws IOException {
+ final SimpleVersionedSerializer<String> utfEncoder = new SimpleVersionedSerializer<String>() {
+
+ private static final int VERSION = Integer.MAX_VALUE / 2; // version should occupy many bytes
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public byte[] serialize(String str) throws IOException {
+ return str.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public String deserialize(int version, byte[] serialized) throws IOException {
+ assertEquals(VERSION, version);
+ return new String(serialized, StandardCharsets.UTF_8);
+ }
+ };
+
+ final String testString = "dugfakgs";
+ byte[] serialized = SimpleVersionedSerialization.writeVersionAndSerialize(utfEncoder, testString);
+
+ final String deserialized = SimpleVersionedSerialization.readVersionAndDeSerialize(utfEncoder, serialized);
+ assertEquals(testString, deserialized);
+ }
+
+ @Test
+ public void testSerializeEmpty() throws IOException {
+ final String testString = "beeeep!";
+
+ SimpleVersionedSerializer<String> emptySerializer = new SimpleVersionedSerializer<String>() {
+
+ @Override
+ public int getVersion() {
+ return 42;
+ }
+
+ @Override
+ public byte[] serialize(String obj) throws IOException {
+ return new byte[0];
+ }
+
+ @Override
+ public String deserialize(int version, byte[] serialized) throws IOException {
+ assertEquals(42, version);
+ assertEquals(0, serialized.length);
+ return testString;
+ }
+ };
+
+ byte[] serialized = SimpleVersionedSerialization.writeVersionAndSerialize(emptySerializer, "abc");
+ assertNotNull(serialized);
+
+ assertEquals(testString, SimpleVersionedSerialization.readVersionAndDeSerialize(emptySerializer, serialized));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
new file mode 100644
index 0000000..d9a6d75
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A bucket is the directory organization of the output of the {@link StreamingFileSink}.
+ *
+ * <p>For each incoming element in the {@code BucketingSink}, the user-specified
+ * {@link org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer Bucketer} is
+ * queried to see in which bucket this element should be written to.
+ */
+@PublicEvolving
+public class Bucket<IN> {
+
+ private static final String PART_PREFIX = "part";
+
+ private final String bucketId;
+
+ private final Path bucketPath;
+
+ private final int subtaskIndex;
+
+ private final Encoder<IN> encoder;
+
+ private final RecoverableWriter fsWriter;
+
+ private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint = new HashMap<>();
+
+ private long partCounter;
+
+ private PartFileHandler<IN> currentPart;
+
+ private List<RecoverableWriter.CommitRecoverable> pending;
+
+ /**
+ * Constructor to restore a bucket from checkpointed state.
+ */
+ public Bucket(
+ RecoverableWriter fsWriter,
+ int subtaskIndex,
+ long initialPartCounter,
+ Encoder<IN> writer,
+ BucketState bucketstate) throws IOException {
+
+ this(fsWriter, subtaskIndex, bucketstate.getBucketId(), bucketstate.getBucketPath(), initialPartCounter, writer);
+
+ // the constructor must have already initialized the filesystem writer
+ Preconditions.checkState(fsWriter != null);
+
+ // we try to resume the previous in-progress file, if the filesystem
+ // supports such operation. If not, we just commit the file and start fresh.
+
+ final RecoverableWriter.ResumeRecoverable resumable = bucketstate.getCurrentInProgress();
+ if (resumable != null) {
+ currentPart = PartFileHandler.resumeFrom(
+ bucketId, fsWriter, resumable, bucketstate.getCreationTime());
+ }
+
+ // we commit pending files for previous checkpoints to the last successful one
+ // (from which we are recovering from)
+ for (List<RecoverableWriter.CommitRecoverable> commitables: bucketstate.getPendingPerCheckpoint().values()) {
+ for (RecoverableWriter.CommitRecoverable commitable: commitables) {
+ fsWriter.recoverForCommit(commitable).commitAfterRecovery();
+ }
+ }
+ }
+
+ /**
+ * Constructor to create a new empty bucket.
+ */
+ public Bucket(
+ RecoverableWriter fsWriter,
+ int subtaskIndex,
+ String bucketId,
+ Path bucketPath,
+ long initialPartCounter,
+ Encoder<IN> writer) {
+
+ this.fsWriter = Preconditions.checkNotNull(fsWriter);
+ this.subtaskIndex = subtaskIndex;
+ this.bucketId = Preconditions.checkNotNull(bucketId);
+ this.bucketPath = Preconditions.checkNotNull(bucketPath);
+ this.partCounter = initialPartCounter;
+ this.encoder = Preconditions.checkNotNull(writer);
+
+ this.pending = new ArrayList<>();
+ }
+
+ public PartFileInfo getInProgressPartInfo() {
+ return currentPart;
+ }
+
+ public String getBucketId() {
+ return bucketId;
+ }
+
+ public Path getBucketPath() {
+ return bucketPath;
+ }
+
+ public long getPartCounter() {
+ return partCounter;
+ }
+
+ public boolean isActive() {
+ return currentPart != null || !pending.isEmpty() || !pendingPerCheckpoint.isEmpty();
+ }
+
+ void write(IN element, long currentTime) throws IOException {
+ Preconditions.checkState(currentPart != null, "bucket has been closed");
+ currentPart.write(element, encoder, currentTime);
+ }
+
+ void rollPartFile(final long currentTime) throws IOException {
+ closePartFile();
+ currentPart = PartFileHandler.openNew(bucketId, fsWriter, getNewPartPath(), currentTime);
+ partCounter++;
+ }
+
+ void merge(final Bucket<IN> bucket) throws IOException {
+ Preconditions.checkNotNull(bucket);
+ Preconditions.checkState(bucket.getBucketPath().equals(getBucketPath()));
+
+ // there should be no pending files in the "to-merge" states.
+ Preconditions.checkState(bucket.pending.isEmpty());
+ Preconditions.checkState(bucket.pendingPerCheckpoint.isEmpty());
+
+ RecoverableWriter.CommitRecoverable commitable = bucket.closePartFile();
+ if (commitable != null) {
+ pending.add(commitable);
+ }
+ }
+
+ RecoverableWriter.CommitRecoverable closePartFile() throws IOException {
+ RecoverableWriter.CommitRecoverable commitable = null;
+ if (currentPart != null) {
+ commitable = currentPart.closeForCommit();
+ pending.add(commitable);
+ currentPart = null;
+ }
+ return commitable;
+ }
+
+ public void dispose() {
+ if (currentPart != null) {
+ currentPart.dispose();
+ }
+ }
+
+ public void commitUpToCheckpoint(long checkpointId) throws IOException {
+ Preconditions.checkNotNull(fsWriter);
+
+ Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it =
+ pendingPerCheckpoint.entrySet().iterator();
+
+ while (it.hasNext()) {
+ Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>> entry = it.next();
+ if (entry.getKey() <= checkpointId) {
+ for (RecoverableWriter.CommitRecoverable commitable : entry.getValue()) {
+ fsWriter.recoverForCommit(commitable).commit();
+ }
+ it.remove();
+ }
+ }
+ }
+
+ public BucketState snapshot(long checkpointId) throws IOException {
+ RecoverableWriter.ResumeRecoverable resumable = null;
+ long creationTime = Long.MAX_VALUE;
+
+ if (currentPart != null) {
+ resumable = currentPart.persist();
+ creationTime = currentPart.getCreationTime();
+ }
+
+ if (!pending.isEmpty()) {
+ pendingPerCheckpoint.put(checkpointId, pending);
+ pending = new ArrayList<>();
+ }
+ return new BucketState(bucketId, bucketPath, creationTime, resumable, pendingPerCheckpoint);
+ }
+
+ private Path getNewPartPath() {
+ return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
new file mode 100644
index 0000000..88f3c1a
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A factory able to create {@link Bucket buckets} for the {@link StreamingFileSink}.
+ */
+@Internal
+public interface BucketFactory<IN> extends Serializable {
+
+ Bucket<IN> getNewBucket(
+ RecoverableWriter fsWriter,
+ int subtaskIndex,
+ String bucketId,
+ Path bucketPath,
+ long initialPartCounter,
+ Encoder<IN> writer) throws IOException;
+
+ Bucket<IN> restoreBucket(
+ RecoverableWriter fsWriter,
+ int subtaskIndex,
+ long initialPartCounter,
+ Encoder<IN> writer,
+ BucketState bucketstate) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
new file mode 100644
index 0000000..5ebc46c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The state of the {@link Bucket} that is to be checkpointed.
+ */
+@Internal
+public class BucketState {
+
+ private final String bucketId;
+
+ /**
+ * The base path for the bucket, i.e. the directory where all the part files are stored.
+ */
+ private final Path bucketPath;
+
+ /**
+ * The creation time of the currently open part file, or {@code Long.MAX_VALUE} if there is no open part file.
+ */
+ private final long creationTime;
+
+ /**
+ * A {@link RecoverableWriter.ResumeRecoverable} for the currently open part file, or null
+ * if there is no currently open part file.
+ */
+ @Nullable
+ private final RecoverableWriter.ResumeRecoverable inProgress;
+
+ /**
+ * The {@link RecoverableWriter.CommitRecoverable files} pending to be committed, organized by checkpoint id.
+ */
+ private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint;
+
+ public BucketState(
+ final String bucketId,
+ final Path bucketPath,
+ final long creationTime,
+ final @Nullable RecoverableWriter.ResumeRecoverable inProgress,
+ final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPerCheckpoint
+ ) {
+ this.bucketId = Preconditions.checkNotNull(bucketId);
+ this.bucketPath = Preconditions.checkNotNull(bucketPath);
+ this.creationTime = creationTime;
+ this.inProgress = inProgress;
+ this.pendingPerCheckpoint = Preconditions.checkNotNull(pendingPerCheckpoint);
+ }
+
+ public String getBucketId() {
+ return bucketId;
+ }
+
+ public Path getBucketPath() {
+ return bucketPath;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ @Nullable
+ public RecoverableWriter.ResumeRecoverable getCurrentInProgress() {
+ return inProgress;
+ }
+
+ public Map<Long, List<RecoverableWriter.CommitRecoverable>> getPendingPerCheckpoint() {
+ return pendingPerCheckpoint;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
new file mode 100644
index 0000000..a167ec9
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * A {@code SimpleVersionedSerializer} used to serialize the {@link BucketState BucketState}.
+ */
+@Internal
+class BucketStateSerializer implements SimpleVersionedSerializer<BucketState> {
+
+ private static final int MAGIC_NUMBER = 0x1e764b79;
+
+ private final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer;
+
+ private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer;
+
+ public BucketStateSerializer(
+ final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer,
+ final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer) {
+
+ this.resumableSerializer = Preconditions.checkNotNull(resumableSerializer);
+ this.commitableSerializer = Preconditions.checkNotNull(commitableSerializer);
+ }
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(BucketState state) throws IOException {
+ DataOutputSerializer out = new DataOutputSerializer(256);
+ out.writeInt(MAGIC_NUMBER);
+ serializeV1(state, out);
+ return out.getCopyOfBuffer();
+ }
+
+ @Override
+ public BucketState deserialize(int version, byte[] serialized) throws IOException {
+ switch (version) {
+ case 1:
+ DataInputDeserializer in = new DataInputDeserializer(serialized);
+ validateMagicNumber(in);
+ return deserializeV1(in);
+ default:
+ throw new IOException("Unrecognized version or corrupt state: " + version);
+ }
+ }
+
+ @VisibleForTesting
+ void serializeV1(BucketState state, DataOutputView out) throws IOException {
+ out.writeUTF(state.getBucketId());
+ out.writeUTF(state.getBucketPath().toString());
+ out.writeLong(state.getCreationTime());
+
+ // put the current open part file
+ final RecoverableWriter.ResumeRecoverable currentPart = state.getCurrentInProgress();
+ if (currentPart != null) {
+ out.writeBoolean(true);
+ SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, currentPart, out);
+ }
+ else {
+ out.writeBoolean(false);
+ }
+
+ // put the map of pending files per checkpoint
+ final Map<Long, List<CommitRecoverable>> pendingCommitters = state.getPendingPerCheckpoint();
+
+ // manually keep the version here to safe some bytes
+ out.writeInt(commitableSerializer.getVersion());
+
+ out.writeInt(pendingCommitters.size());
+ for (Entry<Long, List<CommitRecoverable>> resumablesForCheckpoint : pendingCommitters.entrySet()) {
+ List<CommitRecoverable> resumables = resumablesForCheckpoint.getValue();
+
+ out.writeLong(resumablesForCheckpoint.getKey());
+ out.writeInt(resumables.size());
+
+ for (CommitRecoverable resumable : resumables) {
+ byte[] serialized = commitableSerializer.serialize(resumable);
+ out.writeInt(serialized.length);
+ out.write(serialized);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ BucketState deserializeV1(DataInputView in) throws IOException {
+ final String bucketId = in.readUTF();
+ final String bucketPathStr = in.readUTF();
+ final long creationTime = in.readLong();
+
+ // then get the current resumable stream
+ RecoverableWriter.ResumeRecoverable current = null;
+ if (in.readBoolean()) {
+ current = SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, in);
+ }
+
+ final int committableVersion = in.readInt();
+ final int numCheckpoints = in.readInt();
+ final HashMap<Long, List<RecoverableWriter.CommitRecoverable>> resumablesPerCheckpoint = new HashMap<>(numCheckpoints);
+
+ for (int i = 0; i < numCheckpoints; i++) {
+ final long checkpointId = in.readLong();
+ final int noOfResumables = in.readInt();
+
+ final ArrayList<RecoverableWriter.CommitRecoverable> resumables = new ArrayList<>(noOfResumables);
+ for (int j = 0; j < noOfResumables; j++) {
+ final byte[] bytes = new byte[in.readInt()];
+ in.readFully(bytes);
+ resumables.add(commitableSerializer.deserialize(committableVersion, bytes));
+ }
+ resumablesPerCheckpoint.put(checkpointId, resumables);
+ }
+
+ return new BucketState(
+ bucketId,
+ new Path(bucketPathStr),
+ creationTime,
+ current,
+ resumablesPerCheckpoint);
+ }
+
+ private static void validateMagicNumber(DataInputView in) throws IOException {
+ final int magicNumber = in.readInt();
+ if (magicNumber != MAGIC_NUMBER) {
+ throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
new file mode 100644
index 0000000..795ba74
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import java.io.IOException;
+
+/**
+ * A factory returning {@link Bucket buckets}.
+ */
+@Internal
+public class DefaultBucketFactory<IN> implements BucketFactory<IN> {
+
+ private static final long serialVersionUID = 3372881359208513357L;
+
+ @Override
+ public Bucket<IN> getNewBucket(
+ RecoverableWriter fsWriter,
+ int subtaskIndex,
+ String bucketId,
+ Path bucketPath,
+ long initialPartCounter,
+ Encoder<IN> writer) throws IOException {
+
+ return new Bucket<>(
+ fsWriter,
+ subtaskIndex,
+ bucketId,
+ bucketPath,
+ initialPartCounter,
+ writer);
+ }
+
+ @Override
+ public Bucket<IN> restoreBucket(
+ RecoverableWriter fsWriter,
+ int subtaskIndex,
+ long initialPartCounter,
+ Encoder<IN> writer,
+ BucketState bucketState) throws IOException {
+
+ return new Bucket<>(
+ fsWriter,
+ subtaskIndex,
+ initialPartCounter,
+ writer,
+ bucketState);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
new file mode 100644
index 0000000..026ac70
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultRollingPolicy.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * The default implementation of the {@link RollingPolicy}.
+ *
+ * <p>This policy rolls a part file if:
+ * <ol>
+ * <li>there is no open part file,</li>
+ * <li>the current file has reached the maximum bucket size (by default 128MB),</li>
+ * <li>the current file is older than the roll over interval (by default 60 sec), or</li>
+ * <li>the current file has not been written to for more than the allowed inactivityTime (by default 60 sec).</li>
+ * </ol>
+ */
+@PublicEvolving
+public final class DefaultRollingPolicy implements RollingPolicy {
+
+ private static final long serialVersionUID = 1318929857047767030L;
+
+ private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
+
+ private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
+
+ private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L;
+
+ private final long partSize;
+
+ private final long rolloverInterval;
+
+ private final long inactivityInterval;
+
+ /**
+ * Private constructor to avoid direct instantiation.
+ */
+ private DefaultRollingPolicy(long partSize, long rolloverInterval, long inactivityInterval) {
+ Preconditions.checkArgument(partSize > 0L);
+ Preconditions.checkArgument(rolloverInterval > 0L);
+ Preconditions.checkArgument(inactivityInterval > 0L);
+
+ this.partSize = partSize;
+ this.rolloverInterval = rolloverInterval;
+ this.inactivityInterval = inactivityInterval;
+ }
+
+ @Override
+ public boolean shouldRoll(final PartFileInfo state, final long currentTime) throws IOException {
+ if (state == null) {
+ // this means that there is no currently open part file.
+ return true;
+ }
+
+ if (state.getSize() > partSize) {
+ return true;
+ }
+
+ if (currentTime - state.getCreationTime() > rolloverInterval) {
+ return true;
+ }
+
+ return currentTime - state.getLastUpdateTime() > inactivityInterval;
+ }
+
+ /**
+ * Initiates the instantiation of a {@link DefaultRollingPolicy}.
+ * To finalize it and have the actual policy, call {@code .create()}.
+ */
+ public static PolicyBuilder create() {
+ return new PolicyBuilder();
+ }
+
+ /**
+ * A helper class that holds the configuration properties for the {@link DefaultRollingPolicy}.
+ */
+ @PublicEvolving
+ public static class PolicyBuilder {
+
+ private long partSize = DEFAULT_MAX_PART_SIZE;
+
+ private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL;
+
+ private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL;
+
+ /**
+ * Sets the part size above which a part file will have to roll.
+ * @param size the allowed part size.
+ */
+ public PolicyBuilder withMaxPartSize(long size) {
+ Preconditions.checkState(size > 0L);
+ this.partSize = size;
+ return this;
+ }
+
+ /**
+ * Sets the interval of allowed inactivity after which a part file will have to roll.
+ * @param interval the allowed inactivity interval.
+ */
+ public PolicyBuilder withInactivityInterval(long interval) {
+ Preconditions.checkState(interval > 0L);
+ this.inactivityInterval = interval;
+ return this;
+ }
+
+ /**
+ * Sets the max time a part file can stay open before having to roll.
+ * @param interval the desired rollover interval.
+ */
+ public PolicyBuilder withRolloverInterval(long interval) {
+ Preconditions.checkState(interval > 0L);
+ this.rolloverInterval = interval;
+ return this;
+ }
+
+ /**
+ * Creates the actual policy.
+ */
+ public DefaultRollingPolicy build() {
+ return new DefaultRollingPolicy(partSize, rolloverInterval, inactivityInterval);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
new file mode 100644
index 0000000..10fd12b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileHandler.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A handler for the currently open part file in a specific {@link Bucket}.
+ * This also implements the {@link PartFileInfo}.
+ */
+@Internal
+class PartFileHandler<IN> implements PartFileInfo {
+
+ private final String bucketId;
+
+ private final long creationTime;
+
+ private final RecoverableFsDataOutputStream currentPartStream;
+
+ private long lastUpdateTime;
+
+ private PartFileHandler(
+ final String bucketId,
+ final RecoverableFsDataOutputStream currentPartStream,
+ final long creationTime) {
+
+ Preconditions.checkArgument(creationTime >= 0L);
+ this.bucketId = Preconditions.checkNotNull(bucketId);
+ this.currentPartStream = Preconditions.checkNotNull(currentPartStream);
+ this.creationTime = creationTime;
+ this.lastUpdateTime = creationTime;
+ }
+
+ public static <IN> PartFileHandler<IN> resumeFrom(
+ final String bucketId,
+ final RecoverableWriter fileSystemWriter,
+ final RecoverableWriter.ResumeRecoverable resumable,
+ final long creationTime) throws IOException {
+ Preconditions.checkNotNull(bucketId);
+ Preconditions.checkNotNull(fileSystemWriter);
+ Preconditions.checkNotNull(resumable);
+
+ final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
+ return new PartFileHandler<>(bucketId, stream, creationTime);
+ }
+
+ public static <IN> PartFileHandler<IN> openNew(
+ final String bucketId,
+ final RecoverableWriter fileSystemWriter,
+ final Path path,
+ final long creationTime) throws IOException {
+ Preconditions.checkNotNull(bucketId);
+ Preconditions.checkNotNull(fileSystemWriter);
+ Preconditions.checkNotNull(path);
+
+ final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
+ return new PartFileHandler<>(bucketId, stream, creationTime);
+ }
+
+ void write(IN element, Encoder<IN> encoder, long currentTime) throws IOException {
+ encoder.encode(element, currentPartStream);
+ this.lastUpdateTime = currentTime;
+ }
+
+ RecoverableWriter.ResumeRecoverable persist() throws IOException {
+ return currentPartStream.persist();
+ }
+
+ RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
+ return currentPartStream.closeForCommit().getRecoverable();
+ }
+
+ void dispose() {
+ // we can suppress exceptions here, because we do not rely on close() to
+ // flush or persist any data
+ IOUtils.closeQuietly(currentPartStream);
+ }
+
+ @Override
+ public String getBucketId() {
+ return bucketId;
+ }
+
+ @Override
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ @Override
+ public long getSize() throws IOException {
+ return currentPartStream.getPos();
+ }
+
+ @Override
+ public long getLastUpdateTime() {
+ return lastUpdateTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
new file mode 100644
index 0000000..9c3d047
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileInfo.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+
+import java.io.IOException;
+
+/**
+ * An interface exposing the information concerning the current (open) part file
+ * that is necessary to the {@link RollingPolicy} in order to determine if it
+ * should roll the part file or not.
+ */
+@PublicEvolving
+public interface PartFileInfo {
+
+ /**
+ * @return The bucket identifier of the current buffer, as returned by the
+ * {@link Bucketer#getBucketId(Object, Bucketer.Context)}.
+ */
+ String getBucketId();
+
+ /**
+ * @return The creation time (in ms) of the currently open part file.
+ */
+ long getCreationTime();
+
+ /**
+ * @return The size of the currently open part file.
+ */
+ long getSize() throws IOException;
+
+ /**
+ * @return The last time (in ms) the currently open part file was written to.
+ */
+ long getLastUpdateTime();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
new file mode 100644
index 0000000..936377e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The policy based on which a {@link Bucket} in the {@link StreamingFileSink}
+ * rolls its currently open part file and opens a new one.
+ */
+@PublicEvolving
+public interface RollingPolicy extends Serializable {
+
+ /**
+ * Determines if the in-progress part file for a bucket should roll.
+ * @param partFileState the state of the currently open part file of the bucket.
+ * @param currentTime the current processing time.
+ * @return {@code True} if the part file should roll, {@link false} otherwise.
+ */
+ boolean shouldRoll(final PartFileInfo partFileState, final long currentTime) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
new file mode 100644
index 0000000..b6fff03
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Sink that emits its input elements to {@link FileSystem} files within buckets. This is
+ * integrated with the checkpointing mechanism to provide exactly once semantics.
+ *
+ *
+ * <p>When creating the sink a {@code basePath} must be specified. The base directory contains
+ * one directory for every bucket. The bucket directories themselves contain several part files,
+ * with at least one for each parallel subtask of the sink which is writing data to that bucket.
+ * These part files contain the actual output data.
+ *
+ *
+ * <p>The sink uses a {@link Bucketer} to determine in which bucket directory each element should
+ * be written to inside the base directory. The {@code Bucketer} can, for example, use time or
+ * a property of the element to determine the bucket directory. The default {@code Bucketer} is a
+ * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify
+ * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
+ *
+ *
+ * <p>The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink
+ * and a rolling counter. For example the file {@code "part-1-17"} contains the data from
+ * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask.
+ * Part files roll based on the user-specified {@link RollingPolicy}. By default, a {@link DefaultRollingPolicy}
+ * is used.
+ *
+ * <p>In some scenarios, the open buckets are required to change based on time. In these cases, the user
+ * can specify a {@code bucketCheckInterval} (by default 1m) and the sink will check periodically and roll
+ * the part file if the specified rolling policy says so.
+ *
+ * <p>Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
+ * The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
+ * semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once
+ * a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently
+ * pending files will be moved to {@code finished}.
+ *
+ *
+ * <p>If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it
+ * had when that last successful checkpoint occurred. To this end, when restoring, the restored files in {@code pending}
+ * state are transferred into the {@code finished} state while any {@code in-progress} files are rolled back, so that
+ * they do not contain data that arrived after the checkpoint from which we restore.
+ *
+ * <p><b>NOTE:</b>
+ * <ol>
+ * <li>
+ * If checkpointing is not enabled the pending files will never be moved to the finished state.
+ * </li>
+ * <li>
+ * The part files are written using an instance of {@link Encoder}. By default, a
+ * {@link SimpleStringEncoder} is used, which writes the result of {@code toString()} for
+ * every element, separated by newlines. You can configure the writer using the
+ * {@link #setEncoder(Encoder)}.
+ * </li>
+ * </ol>
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+@PublicEvolving
+public class StreamingFileSink<IN>
+ extends RichSinkFunction<IN>
+ implements CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamingFileSink.class);
+
+ // -------------------------- state descriptors ---------------------------
+
+ private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC =
+ new ListStateDescriptor<>("bucket-states", BytePrimitiveArraySerializer.INSTANCE);
+
+ private static final ListStateDescriptor<Long> MAX_PART_COUNTER_STATE_DESC =
+ new ListStateDescriptor<>("max-part-counter", LongSerializer.INSTANCE);
+
+ // ------------------------ configuration fields --------------------------
+
+ private final Path basePath;
+
+ private final BucketFactory<IN> bucketFactory;
+
+ private long bucketCheckInterval = 60L * 1000L;
+
+ private Bucketer<IN> bucketer;
+
+ private Encoder<IN> encoder;
+
+ private RollingPolicy rollingPolicy;
+
+ // --------------------------- runtime fields -----------------------------
+
+ private transient BucketerContext bucketerContext;
+
+ private transient RecoverableWriter fileSystemWriter;
+
+ private transient ProcessingTimeService processingTimeService;
+
+ private transient Map<String, Bucket<IN>> activeBuckets;
+
+ ////////////////// State Related Fields /////////////////////
+
+ private transient BucketStateSerializer bucketStateSerializer;
+
+ private transient ListState<byte[]> restoredBucketStates;
+
+ private transient ListState<Long> restoredMaxCounters;
+
+ private transient long initMaxPartCounter;
+
+ private transient long maxPartCounterUsed;
+
+ /**
+ * Creates a new {@code StreamingFileSink} that writes files to the given base directory.
+ *
+ * <p>This uses a {@link DateTimeBucketer} as {@link Bucketer} and a {@link SimpleStringEncoder} as a writer.
+ *
+ * @param basePath The directory to which to write the bucket files.
+ */
+ public StreamingFileSink(Path basePath) {
+ this(basePath, new DefaultBucketFactory<>());
+ }
+
+ @VisibleForTesting
+ StreamingFileSink(Path basePath, BucketFactory<IN> bucketFactory) {
+ this.basePath = Preconditions.checkNotNull(basePath);
+ this.bucketer = new DateTimeBucketer<>();
+ this.encoder = new SimpleStringEncoder<>();
+ this.rollingPolicy = DefaultRollingPolicy.create().build();
+ this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
+ }
+
+ public StreamingFileSink<IN> setEncoder(Encoder<IN> encoder) {
+ this.encoder = Preconditions.checkNotNull(encoder);
+ return this;
+ }
+
+ public StreamingFileSink<IN> setBucketer(Bucketer<IN> bucketer) {
+ this.bucketer = Preconditions.checkNotNull(bucketer);
+ return this;
+ }
+
+ public StreamingFileSink<IN> setBucketCheckInterval(long interval) {
+ this.bucketCheckInterval = interval;
+ return this;
+ }
+
+ public StreamingFileSink<IN> setRollingPolicy(RollingPolicy policy) {
+ this.rollingPolicy = Preconditions.checkNotNull(policy);
+ return this;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ final Iterator<Map.Entry<String, Bucket<IN>>> activeBucketIt =
+ activeBuckets.entrySet().iterator();
+
+ while (activeBucketIt.hasNext()) {
+ Bucket<IN> bucket = activeBucketIt.next().getValue();
+ bucket.commitUpToCheckpoint(checkpointId);
+
+ if (!bucket.isActive()) {
+ // We've dealt with all the pending files and the writer for this bucket is not currently open.
+ // Therefore this bucket is currently inactive and we can remove it from our state.
+ activeBucketIt.remove();
+ }
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ Preconditions.checkState(
+ restoredBucketStates != null && fileSystemWriter != null && bucketStateSerializer != null,
+ "sink has not been initialized");
+
+ restoredBucketStates.clear();
+ for (Bucket<IN> bucket : activeBuckets.values()) {
+
+ final PartFileInfo info = bucket.getInProgressPartInfo();
+ final long checkpointTimestamp = context.getCheckpointTimestamp();
+
+ if (info != null && rollingPolicy.shouldRoll(info, checkpointTimestamp)) {
+ // we also check here so that we do not have to always
+ // wait for the "next" element to arrive.
+ bucket.closePartFile();
+ }
+
+ final BucketState bucketState = bucket.snapshot(context.getCheckpointId());
+ restoredBucketStates.add(SimpleVersionedSerialization.writeVersionAndSerialize(bucketStateSerializer, bucketState));
+ }
+
+ restoredMaxCounters.clear();
+ restoredMaxCounters.add(maxPartCounterUsed);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ initFileSystemWriter();
+
+ this.activeBuckets = new HashMap<>();
+
+ // When resuming after a failure:
+ // 1) we get the max part counter used before in order to make sure that we do not overwrite valid data
+ // 2) we commit any pending files for previous checkpoints (previous to the last successful one)
+ // 3) we resume writing to the previous in-progress file of each bucket, and
+ // 4) if we receive multiple states for the same bucket, we merge them.
+
+ final OperatorStateStore stateStore = context.getOperatorStateStore();
+
+ restoredBucketStates = stateStore.getListState(BUCKET_STATE_DESC);
+ restoredMaxCounters = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
+
+ if (context.isRestored()) {
+ final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+
+ LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
+
+ long maxCounter = 0L;
+ for (long partCounter: restoredMaxCounters.get()) {
+ maxCounter = Math.max(partCounter, maxCounter);
+ }
+ initMaxPartCounter = maxCounter;
+
+ for (byte[] recoveredState : restoredBucketStates.get()) {
+ final BucketState bucketState = SimpleVersionedSerialization.readVersionAndDeSerialize(
+ bucketStateSerializer, recoveredState);
+
+ final String bucketId = bucketState.getBucketId();
+
+ LOG.info("Recovered bucket for {}", bucketId);
+
+ final Bucket<IN> restoredBucket = bucketFactory.restoreBucket(
+ fileSystemWriter,
+ subtaskIndex,
+ initMaxPartCounter,
+ encoder,
+ bucketState
+ );
+
+ final Bucket<IN> existingBucket = activeBuckets.get(bucketId);
+ if (existingBucket == null) {
+ activeBuckets.put(bucketId, restoredBucket);
+ } else {
+ existingBucket.merge(restoredBucket);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} idx {} restored state for bucket {}", getClass().getSimpleName(),
+ subtaskIndex, assembleBucketPath(bucketId));
+ }
+ }
+ }
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ processingTimeService = ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService();
+ long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
+ processingTimeService.registerTimer(currentProcessingTime + bucketCheckInterval, this);
+ this.bucketerContext = new BucketerContext();
+ }
+
+ @Override
+ public void onProcessingTime(long timestamp) throws Exception {
+ final long currentTime = processingTimeService.getCurrentProcessingTime();
+ for (Bucket<IN> bucket : activeBuckets.values()) {
+ final PartFileInfo info = bucket.getInProgressPartInfo();
+ if (info != null && rollingPolicy.shouldRoll(info, currentTime)) {
+ bucket.closePartFile();
+ }
+ }
+ processingTimeService.registerTimer(timestamp + bucketCheckInterval, this);
+ }
+
+ @Override
+ public void invoke(IN value, Context context) throws Exception {
+ final long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
+ final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
+
+ // setting the values in the bucketer context
+ bucketerContext.update(context.timestamp(), currentProcessingTime, context.currentWatermark());
+
+ final String bucketId = bucketer.getBucketId(value, bucketerContext);
+
+ Bucket<IN> bucket = activeBuckets.get(bucketId);
+ if (bucket == null) {
+ final Path bucketPath = assembleBucketPath(bucketId);
+ bucket = bucketFactory.getNewBucket(
+ fileSystemWriter,
+ subtaskIndex,
+ bucketId,
+ bucketPath,
+ initMaxPartCounter,
+ encoder);
+ activeBuckets.put(bucketId, bucket);
+ }
+
+ final PartFileInfo info = bucket.getInProgressPartInfo();
+ if (info == null || rollingPolicy.shouldRoll(info, currentProcessingTime)) {
+ bucket.rollPartFile(currentProcessingTime);
+ }
+ bucket.write(value, currentProcessingTime);
+
+ // we update the counter here because as buckets become inactive and
+ // get removed in the initializeState(), at the time we snapshot they
+ // may not be there to take them into account during checkpointing.
+ updateMaxPartCounter(bucket.getPartCounter());
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (activeBuckets != null) {
+ activeBuckets.values().forEach(Bucket::dispose);
+ }
+ }
+
+ private void initFileSystemWriter() throws IOException {
+ if (fileSystemWriter == null) {
+ fileSystemWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
+ bucketStateSerializer = new BucketStateSerializer(
+ fileSystemWriter.getResumeRecoverableSerializer(),
+ fileSystemWriter.getCommitRecoverableSerializer()
+ );
+ }
+ }
+
+ private void updateMaxPartCounter(long candidate) {
+ maxPartCounterUsed = Math.max(maxPartCounterUsed, candidate);
+ }
+
+ private Path assembleBucketPath(String bucketId) {
+ return new Path(basePath, bucketId);
+ }
+
+ /**
+ * The {@link Bucketer.Context} exposed to the
+ * {@link Bucketer#getBucketId(Object, Bucketer.Context)}
+ * whenever a new incoming element arrives.
+ */
+ private static class BucketerContext implements Bucketer.Context {
+
+ @Nullable
+ private Long elementTimestamp;
+
+ private long currentWatermark;
+
+ private long currentProcessingTime;
+
+ void update(@Nullable Long elementTimestamp, long currentWatermark, long currentProcessingTime) {
+ this.elementTimestamp = elementTimestamp;
+ this.currentWatermark = currentWatermark;
+ this.currentProcessingTime = currentProcessingTime;
+ }
+
+ @Override
+ public long currentProcessingTime() {
+ return currentProcessingTime;
+ }
+
+ @Override
+ public long currentWatermark() {
+ return currentWatermark;
+ }
+
+ @Override
+ @Nullable
+ public Long timestamp() {
+ return elementTimestamp;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
new file mode 100644
index 0000000..5ffe152
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/BasePathBucketer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A {@link Bucketer} that does not perform any
+ * bucketing of files. All files are written to the base path.
+ */
+@PublicEvolving
+public class BasePathBucketer<T> implements Bucketer<T> {
+
+ private static final long serialVersionUID = -6033643155550226022L;
+
+ @Override
+ public String getBucketId(T element, Context context) {
+ return "";
+ }
+
+ @Override
+ public String toString() {
+ return "BasePathBucketer";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
new file mode 100644
index 0000000..5c30927
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/Bucketer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/**
+ * A bucketer is used with a {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}
+ * to determine the {@link org.apache.flink.streaming.api.functions.sink.filesystem.Bucket} each incoming element
+ * should be put into.
+ *
+ * <p>The {@code StreamingFileSink} can be writing to many buckets at a time, and it is responsible for managing
+ * a set of active buckets. Whenever a new element arrives it will ask the {@code Bucketer} for the bucket the
+ * element should fall in. The {@code Bucketer} can, for example, determine buckets based on system time.
+ */
+@PublicEvolving
+public interface Bucketer<T> extends Serializable {
+
+ /**
+ * Returns the identifier of the bucket the provided element should be put into.
+ * @param element The current element being processed.
+ * @param context The {@link SinkFunction.Context context} used by the
+ * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink sink}.
+ *
+ * @return A string representing the identifier of the bucket the element should be put into.
+ * This actual path to the bucket will result from the concatenation of the returned string
+ * and the {@code base path} provided during the initialization of the
+ * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink sink}.
+ */
+ String getBucketId(T element, Context context);
+
+ /**
+ * Context that the {@link Bucketer} can use for getting additional data about
+ * an input record.
+ *
+ * <p>The context is only valid for the duration of a {@link Bucketer#getBucketId(Object, Context)} call.
+ * Do not store the context and use afterwards!
+ */
+ @PublicEvolving
+ interface Context {
+
+ /**
+ * Returns the current processing time.
+ */
+ long currentProcessingTime();
+
+ /**
+ * Returns the current event-time watermark.
+ */
+ long currentWatermark();
+
+ /**
+ * Returns the timestamp of the current input record or
+ * {@code null} if the element does not have an assigned timestamp.
+ */
+ @Nullable
+ Long timestamp();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bbc91eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
new file mode 100644
index 0000000..515468c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketers/DateTimeBucketer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.bucketers;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * A {@link Bucketer} that assigns to buckets based on current system time.
+ *
+ *
+ * <p>The {@code DateTimeBucketer} will create directories of the following form:
+ * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
+ * that was specified as a base path when creating the
+ * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}.
+ * The {@code dateTimePath} is determined based on the current system time and the
+ * user provided format string.
+ *
+ *
+ * <p>{@link SimpleDateFormat} is used to derive a date string from the current system time and
+ * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
+ * files will have a granularity of hours.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
+ * }</pre>
+ *
+ * <p>This will create for example the following bucket path:
+ * {@code /base/1976-12-31-14/}
+ *
+ */
+@PublicEvolving
+public class DateTimeBucketer<T> implements Bucketer<T> {
+
+ private static final long serialVersionUID = 3284420879277893804L;
+
+ private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
+
+ private final String formatString;
+
+ private transient SimpleDateFormat dateFormatter;
+
+ /**
+ * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}.
+ */
+ public DateTimeBucketer() {
+ this(DEFAULT_FORMAT_STRING);
+ }
+
+ /**
+ * Creates a new {@code DateTimeBucketer} with the given date/time format string.
+ *
+ * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
+ * the bucket path.
+ */
+ public DateTimeBucketer(String formatString) {
+ this.formatString = formatString;
+ }
+
+ @Override
+ public String getBucketId(T element, Context context) {
+ if (dateFormatter == null) {
+ dateFormatter = new SimpleDateFormat(formatString);
+ }
+ return dateFormatter.format(new Date(context.currentProcessingTime()));
+ }
+
+ @Override
+ public String toString() {
+ return "DateTimeBucketer{" +
+ "formatString='" + formatString + '\'' +
+ '}';
+ }
+}