You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/31 08:55:47 UTC
[flink] branch master updated: [FLINK-13396] Deprecate the
BucketingSink
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 45bddfb [FLINK-13396] Deprecate the BucketingSink
45bddfb is described below
commit 45bddfb807708a5dcb6c4a484409feaee244ac40
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Jul 26 16:56:41 2019 +0200
[FLINK-13396] Deprecate the BucketingSink
---
docs/dev/connectors/filesystem_sink.md | 5 +++++
.../flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java | 1 +
.../main/java/org/apache/flink/streaming/connectors/fs/Clock.java | 1 +
.../apache/flink/streaming/connectors/fs/SequenceFileWriter.java | 1 +
.../org/apache/flink/streaming/connectors/fs/StreamWriterBase.java | 1 +
.../java/org/apache/flink/streaming/connectors/fs/StringWriter.java | 1 +
.../java/org/apache/flink/streaming/connectors/fs/SystemClock.java | 1 +
.../main/java/org/apache/flink/streaming/connectors/fs/Writer.java | 1 +
.../flink/streaming/connectors/fs/bucketing/BasePathBucketer.java | 1 +
.../apache/flink/streaming/connectors/fs/bucketing/Bucketer.java | 1 +
.../flink/streaming/connectors/fs/bucketing/BucketingSink.java | 6 ++++++
.../flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java | 1 +
.../streaming/api/functions/sink/OutputFormatSinkFunction.java | 3 ++-
.../org/apache/flink/streaming/api/functions/sink/WriteFormat.java | 3 ++-
.../apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java | 3 ++-
.../flink/streaming/api/functions/sink/WriteFormatAsText.java | 3 ++-
.../flink/streaming/api/functions/sink/WriteSinkFunction.java | 3 ++-
.../streaming/api/functions/sink/WriteSinkFunctionByMillis.java | 3 ++-
18 files changed, 33 insertions(+), 6 deletions(-)
diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md
index 874ffbd..f24478f 100644
--- a/docs/dev/connectors/filesystem_sink.md
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -23,6 +23,11 @@ specific language governing permissions and limitations
under the License.
-->
+<div class="alert alert-info" markdown="span">
+The `BucketingSink` has been **deprecated since Flink 1.9** and will be removed in subsequent releases.
+Please use the [__StreamingFileSink__]({{site.baseurl}}/dev/connectors/streamfile_sink.html) instead.
+</div>
+
This connector provides a Sink that writes partitioned files to any filesystem supported by
[Hadoop FileSystem](http://hadoop.apache.org). To use this connector, add the
following dependency to your project:
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
index 3294b55..59800a3 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
@@ -64,6 +64,7 @@ Usage:
}
</pre>
*/
+@Deprecated
public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>> implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
private static final long serialVersionUID = 1L;
public static final String CONF_OUTPUT_KEY_SCHEMA = "avro.schema.output.key";
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
index eb864c2..284e652 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
@@ -24,6 +24,7 @@ package org.apache.flink.streaming.connectors.fs;
*
* <p>Normally this would be system time, but for testing a custom {@code Clock} can be provided.
*/
+@Deprecated
public interface Clock {
/**
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
index 17b16dd..2d2502c 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -44,6 +44,7 @@ import java.io.IOException;
* @param <K> The type of the first tuple field.
* @param <V> The type of the second tuple field.
*/
+@Deprecated
public class SequenceFileWriter<K extends Writable, V extends Writable> extends StreamWriterBase<Tuple2<K, V>> implements InputTypeConfigurable {
private static final long serialVersionUID = 1L;
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
index d3035a5..8fe02b5 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
@@ -27,6 +27,7 @@ import java.io.IOException;
/**
* Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}.
*/
+@Deprecated
public abstract class StreamWriterBase<T> implements Writer<T> {
private static final long serialVersionUID = 2L;
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
index 122bc7f..84c961e 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
@@ -33,6 +33,7 @@ import java.nio.charset.UnsupportedCharsetException;
*
* @param <T> The type of the elements that are being written by the sink.
*/
+@Deprecated
public class StringWriter<T> extends StreamWriterBase<T> {
private static final long serialVersionUID = 1L;
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
index eedb370..9fb1fbc 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.fs;
/**
* A {@link Clock} that uses {@code System.currentTimeMillis()} to determine the system time.
*/
+@Deprecated
public class SystemClock implements Clock {
@Override
public long currentTimeMillis() {
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
index ab896c8..ee78bb1 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
@@ -33,6 +33,7 @@ import java.io.Serializable;
*
* @param <T> The type of the elements that are being written by the sink.
*/
+@Deprecated
public interface Writer<T> extends Serializable {
/**
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
index 2f325f6..c317de4 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
* A {@link Bucketer} that does not perform any
* bucketing of files. All files are written to the base path.
*/
+@Deprecated
public class BasePathBucketer<T> implements Bucketer<T> {
private static final long serialVersionUID = 1L;
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
index e712b2e..dea7631 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
@@ -34,6 +34,7 @@ import java.io.Serializable;
* path the element should fall in. The {@code Bucketer} can, for example, determine buckets based on
* system time.
*/
+@Deprecated
public interface Bucketer<T> extends Serializable {
/**
* Returns the {@link Path} of a bucket file.
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index a857a35..4cc49d6 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -159,7 +159,13 @@ import java.util.UUID;
* @see SequenceFileWriter
*
* @param <T> Type of the elements emitted by this sink
+ *
+ * @deprecated Please use the
+ * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
+ * instead.
+ *
*/
+@Deprecated
public class BucketingSink<T>
extends RichSinkFunction<T>
implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
index edabb6b..52599ae 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
@@ -56,6 +56,7 @@ import java.time.format.DateTimeFormatter;
* {@code /base/1976-12-31-14/}
*
*/
+@Deprecated
public class DateTimeBucketer<T> implements Bucketer<T> {
private static final long serialVersionUID = 1L;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
index 4130a50..8090e26 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
@@ -38,7 +38,8 @@ import java.io.IOException;
*
* @param <IN> Input type
*
- * @deprecated Please use the {@code BucketingSink} for writing to files from a streaming program.
+ * @deprecated Please use the {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
+ * for writing to files from a streaming program.
*/
@PublicEvolving
@Deprecated
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
index 6622ba5..c9e97c4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
@@ -29,7 +29,8 @@ import java.util.ArrayList;
* @param <IN>
* Input tuple type
*
- * @deprecated Please use the {@code BucketingSink} for writing to files from a streaming program.
+ * @deprecated Please use the {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
+ * for writing to files from a streaming program.
*/
@PublicEvolving
@Deprecated
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
index 8ac7a97..fc95455 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
@@ -31,7 +31,8 @@ import java.util.ArrayList;
* @param <IN>
* Input tuple type
*
- * @deprecated Please use the {@code BucketingSink} for writing to files from a streaming program.
+ * @deprecated Please use the {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
+ * for writing to files from a streaming program.
*/
@PublicEvolving
@Deprecated
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
index 1c64662..57c79a8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
@@ -31,7 +31,8 @@ import java.util.ArrayList;
* @param <IN>
* Input tuple type
*
- * @deprecated Please use the {@code BucketingSink} for writing to files from a streaming program.
+ * @deprecated Please use the {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
+ * for writing to files from a streaming program.
*/
@PublicEvolving
@Deprecated
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
index efd15cb..1484aca 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
@@ -32,7 +32,8 @@ import java.util.ArrayList;
* @param <IN>
* Input tuple type
*
- * @deprecated Please use the {@code BucketingSink} for writing to files from a streaming program.
+ * @deprecated Please use the {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
+ * for writing to files from a streaming program.
*/
@PublicEvolving
@Deprecated
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
index 354b086..c71beee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
@@ -26,7 +26,8 @@ import org.apache.flink.annotation.PublicEvolving;
* @param <IN>
* Input tuple type
*
- * @deprecated Please use the {@code BucketingSink} for writing to files from a streaming program.
+ * @deprecated Please use the {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
+ * for writing to files from a streaming program.
*/
@PublicEvolving
@Deprecated