You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/31 08:55:47 UTC

[flink] branch master updated: [FLINK-13396] Deprecate the BucketingSink

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 45bddfb  [FLINK-13396] Deprecate the BucketingSink
45bddfb is described below

commit 45bddfb807708a5dcb6c4a484409feaee244ac40
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Fri Jul 26 16:56:41 2019 +0200

    [FLINK-13396] Deprecate the BucketingSink
---
 docs/dev/connectors/filesystem_sink.md                              | 5 +++++
 .../flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java       | 1 +
 .../main/java/org/apache/flink/streaming/connectors/fs/Clock.java   | 1 +
 .../apache/flink/streaming/connectors/fs/SequenceFileWriter.java    | 1 +
 .../org/apache/flink/streaming/connectors/fs/StreamWriterBase.java  | 1 +
 .../java/org/apache/flink/streaming/connectors/fs/StringWriter.java | 1 +
 .../java/org/apache/flink/streaming/connectors/fs/SystemClock.java  | 1 +
 .../main/java/org/apache/flink/streaming/connectors/fs/Writer.java  | 1 +
 .../flink/streaming/connectors/fs/bucketing/BasePathBucketer.java   | 1 +
 .../apache/flink/streaming/connectors/fs/bucketing/Bucketer.java    | 1 +
 .../flink/streaming/connectors/fs/bucketing/BucketingSink.java      | 6 ++++++
 .../flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java   | 1 +
 .../streaming/api/functions/sink/OutputFormatSinkFunction.java      | 3 ++-
 .../org/apache/flink/streaming/api/functions/sink/WriteFormat.java  | 3 ++-
 .../apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java | 3 ++-
 .../flink/streaming/api/functions/sink/WriteFormatAsText.java       | 3 ++-
 .../flink/streaming/api/functions/sink/WriteSinkFunction.java       | 3 ++-
 .../streaming/api/functions/sink/WriteSinkFunctionByMillis.java     | 3 ++-
 18 files changed, 33 insertions(+), 6 deletions(-)

diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md
index 874ffbd..f24478f 100644
--- a/docs/dev/connectors/filesystem_sink.md
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -23,6 +23,11 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+<div class="alert alert-info" markdown="span">
+The `BucketingSink` has been **deprecated since Flink 1.9** and will be removed in subsequent releases.
+Please use the [__StreamingFileSink__]({{site.baseurl}}/dev/connectors/streamfile_sink.html) instead.
+</div>
+
 This connector provides a Sink that writes partitioned files to any filesystem supported by
 [Hadoop FileSystem](http://hadoop.apache.org). To use this connector, add the
 following dependency to your project:
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
index 3294b55..59800a3 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
@@ -64,6 +64,7 @@ Usage:
 }
 </pre>
 */
+@Deprecated
 public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>> implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
 	private static final long serialVersionUID = 1L;
 	public static final String CONF_OUTPUT_KEY_SCHEMA = "avro.schema.output.key";
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
index eb864c2..284e652 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Clock.java
@@ -24,6 +24,7 @@ package org.apache.flink.streaming.connectors.fs;
  *
  * <p>Normally this would be system time, but for testing a custom {@code Clock} can be provided.
  */
+@Deprecated
 public interface Clock {
 
 	/**
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
index 17b16dd..2d2502c 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
@@ -44,6 +44,7 @@ import java.io.IOException;
  * @param <K> The type of the first tuple field.
  * @param <V> The type of the second tuple field.
  */
+@Deprecated
 public class SequenceFileWriter<K extends Writable, V extends Writable> extends StreamWriterBase<Tuple2<K, V>> implements InputTypeConfigurable {
 	private static final long serialVersionUID = 1L;
 
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
index d3035a5..8fe02b5 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 /**
  * Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}.
  */
+@Deprecated
 public abstract class StreamWriterBase<T> implements Writer<T> {
 
 	private static final long serialVersionUID = 2L;
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
index 122bc7f..84c961e 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
@@ -33,6 +33,7 @@ import java.nio.charset.UnsupportedCharsetException;
  *
  * @param <T> The type of the elements that are being written by the sink.
  */
+@Deprecated
 public class StringWriter<T> extends StreamWriterBase<T> {
 	private static final long serialVersionUID = 1L;
 
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
index eedb370..9fb1fbc 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.fs;
 /**
  * A {@link Clock} that uses {@code System.currentTimeMillis()} to determine the system time.
  */
+@Deprecated
 public class SystemClock implements Clock {
 	@Override
 	public long currentTimeMillis() {
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
index ab896c8..ee78bb1 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
@@ -33,6 +33,7 @@ import java.io.Serializable;
  *
  * @param <T> The type of the elements that are being written by the sink.
  */
+@Deprecated
 public interface Writer<T> extends Serializable {
 
 	/**
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
index 2f325f6..c317de4 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
  * A {@link Bucketer} that does not perform any
  * bucketing of files. All files are written to the base path.
  */
+@Deprecated
 public class BasePathBucketer<T> implements Bucketer<T> {
 	private static final long serialVersionUID = 1L;
 
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
index e712b2e..dea7631 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
@@ -34,6 +34,7 @@ import java.io.Serializable;
  * path the element should fall in. The {@code Bucketer} can, for example, determine buckets based on
  * system time.
  */
+@Deprecated
 public interface Bucketer<T> extends Serializable {
 	/**
 	 * Returns the {@link Path} of a bucket file.
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index a857a35..4cc49d6 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -159,7 +159,13 @@ import java.util.UUID;
  * @see SequenceFileWriter
  *
  * @param <T> Type of the elements emitted by this sink
+ *
+ * @deprecated Please use the
+ * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
+ * instead.
+ *
  */
+@Deprecated
 public class BucketingSink<T>
 		extends RichSinkFunction<T>
 		implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
index edabb6b..52599ae 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
@@ -56,6 +56,7 @@ import java.time.format.DateTimeFormatter;
  * {@code /base/1976-12-31-14/}
  *
  */
+@Deprecated
 public class DateTimeBucketer<T> implements Bucketer<T> {
 
 	private static final long serialVersionUID = 1L;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
index 4130a50..8090e26 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/OutputFormatSinkFunction.java
@@ -38,7 +38,8 @@ import java.io.IOException;
  *
  * @param <IN> Input type
  *
- * @deprecated Please use the {@code BucketingSink} for writing to files from a streaming program.
+ * @deprecated Please use the {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
+ * for writing to files from a streaming program.
  */
 @PublicEvolving
 @Deprecated
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
index 6622ba5..c9e97c4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
@@ -29,7 +29,8 @@ import java.util.ArrayList;
  * @param <IN>
  *            Input tuple type
  *
- * @deprecated Please use the {@code BucketingSink} for writing to files from a streaming program.
+ * @deprecated Please use the {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
+ * for writing to files from a streaming program.
  */
 @PublicEvolving
 @Deprecated
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
index 8ac7a97..fc95455 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsCsv.java
@@ -31,7 +31,8 @@ import java.util.ArrayList;
  * @param <IN>
  *            Input tuple type
  *
- * @deprecated Please use the {@code BucketingSink} for writing to files from a streaming program.
+ * @deprecated Please use the {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
+ * for writing to files from a streaming program.
  */
 @PublicEvolving
 @Deprecated
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
index 1c64662..57c79a8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormatAsText.java
@@ -31,7 +31,8 @@ import java.util.ArrayList;
  * @param <IN>
  *            Input tuple type
  *
- * @deprecated Please use the {@code BucketingSink} for writing to files from a streaming program.
+ * @deprecated Please use the {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
+ * for writing to files from a streaming program.
  */
 @PublicEvolving
 @Deprecated
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
index efd15cb..1484aca 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunction.java
@@ -32,7 +32,8 @@ import java.util.ArrayList;
  * @param <IN>
  *            Input tuple type
  *
- * @deprecated Please use the {@code BucketingSink} for writing to files from a streaming program.
+ * @deprecated Please use the {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
+ * for writing to files from a streaming program.
  */
 @PublicEvolving
 @Deprecated
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
index 354b086..c71beee 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteSinkFunctionByMillis.java
@@ -26,7 +26,8 @@ import org.apache.flink.annotation.PublicEvolving;
  * @param <IN>
  *            Input tuple type
  *
- * @deprecated Please use the {@code BucketingSink} for writing to files from a streaming program.
+ * @deprecated Please use the {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink StreamingFileSink}
+ * for writing to files from a streaming program.
  */
 @PublicEvolving
 @Deprecated