You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/01/09 12:26:45 UTC

[flink] branch revert-21371-30166 created (now f77032efb9f)

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a change to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git


      at f77032efb9f Revert "[FLINK-30166][AVRO] Refactor deprecated StreamingFileSink usage with target FileSink"

This branch includes the following new commits:

     new cc12d4dad09 Revert "[FLINK-30166][ORC] Refactor deprecated StreamingFileSink usage with target FileSink"
     new 5dbe3b12e89 Revert "[FLINK-30166][Hadoop Compress] Refactor deprecated StreamingFileSink usage with target FileSink"
     new dd4d4444fd1 Revert "[FLINK-30166][Hadoop Sequence Format] Refactor deprecated StreamingFileSink usage with target FileSink"
     new 889626d4960 Revert "[FLINK-30166][SQL E2E Test] Refactor deprecated StreamingFileSink usage with target FileSink"
     new a9a23ab5552 Revert "[FLINK-30166][Parquet] Refactor deprecated StreamingFileSink usage with target FileSink"
     new 37258fbe630 Revert "[FLINK-30166][Tests] Remove StreamingFileSink as option for tests"
     new 93c7505d313 Revert "[FLINK-30166][Tests] Remove no longer necessary test"
     new f77032efb9f Revert "[FLINK-30166][AVRO] Refactor deprecated StreamingFileSink usage with target FileSink"

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[flink] 03/08: Revert "[FLINK-30166][Hadoop Sequence Format] Refactor deprecated StreamingFileSink usage with target FileSink"

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dd4d4444fd1e07fcb14dcf4b3e33e11c851809cb
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100

    Revert "[FLINK-30166][Hadoop Sequence Format] Refactor deprecated StreamingFileSink usage with target FileSink"
    
    This reverts commit 1a7a83b9639a07a7dd1dd324f669fb04a522c3e1.
---
 flink-formats/flink-sequence-file/pom.xml                        | 6 ------
 .../formats/sequencefile/SequenceStreamingFileSinkITCase.java    | 9 +++++----
 2 files changed, 5 insertions(+), 10 deletions(-)

diff --git a/flink-formats/flink-sequence-file/pom.xml b/flink-formats/flink-sequence-file/pom.xml
index 83595049e87..059539e108e 100644
--- a/flink-formats/flink-sequence-file/pom.xml
+++ b/flink-formats/flink-sequence-file/pom.xml
@@ -93,12 +93,6 @@ under the License.
             <scope>test</scope>
             <type>test-jar</type>
         </dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-files</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
 	</dependencies>
 
 
diff --git a/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java
index 33740ce87bd..8b8ba2f3ef3 100644
--- a/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java
+++ b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java
@@ -22,10 +22,10 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
 import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -47,7 +47,8 @@ import java.util.List;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Integration test case for writing bulk encoded files with the {@link FileSink} with SequenceFile.
+ * Integration test case for writing bulk encoded files with the {@link StreamingFileSink} with
+ * SequenceFile.
  */
 @ExtendWith(MiniClusterExtension.class)
 class SequenceStreamingFileSinkITCase {
@@ -78,8 +79,8 @@ class SequenceStreamingFileSinkITCase {
                                 return new Tuple2<>(new LongWritable(value.f0), new Text(value.f1));
                             }
                         })
-                .sinkTo(
-                        FileSink.forBulkFormat(
+                .addSink(
+                        StreamingFileSink.forBulkFormat(
                                         testPath,
                                         new SequenceFileWriterFactory<>(
                                                 configuration,


[flink] 08/08: Revert "[FLINK-30166][AVRO] Refactor deprecated StreamingFileSink usage with target FileSink"

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f77032efb9ff804ffe47bffd3b3d7fa7ea8741af
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100

    Revert "[FLINK-30166][AVRO] Refactor deprecated StreamingFileSink usage with target FileSink"
    
    This reverts commit 24a133c7bda0b07bbae166db6e92435117071f84.
---
 ...TCase.java => AvroStreamingFileSinkITCase.java} | 30 ++++++++++------------
 1 file changed, 13 insertions(+), 17 deletions(-)

diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFileSinkITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java
similarity index 91%
rename from flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFileSinkITCase.java
rename to flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java
index b36db4d13d3..e2752aa75e5 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFileSinkITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java
@@ -19,12 +19,12 @@
 package org.apache.flink.formats.avro;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
 import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.util.AbstractTestBase;
@@ -54,9 +54,10 @@ import java.util.List;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Simple integration test case for writing bulk encoded files with the {@link FileSink} with Avro.
+ * Simple integration test case for writing bulk encoded files with the {@link StreamingFileSink}
+ * with Avro.
  */
-public class AvroFileSinkITCase extends AbstractTestBase {
+public class AvroStreamingFileSinkITCase extends AbstractTestBase {
 
     @Rule public final Timeout timeoutPerTest = Timeout.seconds(20);
 
@@ -77,11 +78,10 @@ public class AvroFileSinkITCase extends AbstractTestBase {
         AvroWriterFactory<Address> avroWriterFactory = AvroWriters.forSpecificRecord(Address.class);
         DataStream<Address> stream =
                 env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Address.class));
-        FileSink<Address> sink =
-                FileSink.forBulkFormat(Path.fromLocalFile(folder), avroWriterFactory)
+        stream.addSink(
+                StreamingFileSink.forBulkFormat(Path.fromLocalFile(folder), avroWriterFactory)
                         .withBucketAssigner(new UniqueBucketAssigner<>("test"))
-                        .build();
-        stream.sinkTo(sink);
+                        .build());
         env.execute();
 
         validateResults(folder, new SpecificDatumReader<>(Address.class), data);
@@ -101,13 +101,10 @@ public class AvroFileSinkITCase extends AbstractTestBase {
         AvroWriterFactory<GenericRecord> avroWriterFactory = AvroWriters.forGenericRecord(schema);
         DataStream<GenericRecord> stream =
                 env.addSource(new FiniteTestSource<>(data), new GenericRecordAvroTypeInfo(schema));
-
-        FileSink<GenericRecord> sink =
-                FileSink.forBulkFormat(Path.fromLocalFile(folder), avroWriterFactory)
+        stream.addSink(
+                StreamingFileSink.forBulkFormat(Path.fromLocalFile(folder), avroWriterFactory)
                         .withBucketAssigner(new UniqueBucketAssigner<>("test"))
-                        .build();
-
-        stream.sinkTo(sink);
+                        .build());
         env.execute();
 
         validateResults(folder, new GenericDatumReader<>(schema), new ArrayList<>(data));
@@ -126,11 +123,10 @@ public class AvroFileSinkITCase extends AbstractTestBase {
         AvroWriterFactory<Datum> avroWriterFactory = AvroWriters.forReflectRecord(Datum.class);
         DataStream<Datum> stream =
                 env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Datum.class));
-        FileSink<Datum> sink =
-                FileSink.forBulkFormat(Path.fromLocalFile(folder), avroWriterFactory)
+        stream.addSink(
+                StreamingFileSink.forBulkFormat(Path.fromLocalFile(folder), avroWriterFactory)
                         .withBucketAssigner(new UniqueBucketAssigner<>("test"))
-                        .build();
-        stream.sinkTo(sink);
+                        .build());
         env.execute();
 
         validateResults(folder, new ReflectDatumReader<>(Datum.class), data);


[flink] 01/08: Revert "[FLINK-30166][ORC] Refactor deprecated StreamingFileSink usage with target FileSink"

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cc12d4dad0904f6a367c0e35db2d1f63bae9b03d
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100

    Revert "[FLINK-30166][ORC] Refactor deprecated StreamingFileSink usage with target FileSink"
    
    This reverts commit 1ac202dd8c21a9362611f45b6de2773699790acf.
---
 .../java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java     | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java
index b8152ff7dde..aefcf188b23 100644
--- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java
+++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/writer/OrcBulkWriterITCase.java
@@ -19,13 +19,13 @@
 package org.apache.flink.orc.writer;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.orc.data.Record;
 import org.apache.flink.orc.util.OrcBulkWriterTestUtil;
 import org.apache.flink.orc.vector.RecordVectorizer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
 import org.apache.flink.streaming.util.FiniteTestSource;
 
@@ -38,7 +38,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 
-/** Integration test for writing data in ORC bulk format using FileSink. */
+/** Integration test for writing data in ORC bulk format using StreamingFileSink. */
 class OrcBulkWriterITCase {
 
     private final String schema = "struct<_col0:string,_col1:int>";
@@ -61,8 +61,8 @@ class OrcBulkWriterITCase {
         DataStream<Record> stream =
                 env.addSource(new FiniteTestSource<>(testData), TypeInformation.of(Record.class));
         stream.map(str -> str)
-                .sinkTo(
-                        FileSink.forBulkFormat(new Path(outDir.toURI()), factory)
+                .addSink(
+                        StreamingFileSink.forBulkFormat(new Path(outDir.toURI()), factory)
                                 .withBucketAssigner(new UniqueBucketAssigner<>("test"))
                                 .build());
 


[flink] 05/08: Revert "[FLINK-30166][Parquet] Refactor deprecated StreamingFileSink usage with target FileSink"

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a9a23ab55522730968965af37c6a165d4b0b0cf6
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100

    Revert "[FLINK-30166][Parquet] Refactor deprecated StreamingFileSink usage with target FileSink"
    
    This reverts commit 3ef68ae2d3b0be58d9c015a260c668a3c4723d97.
---
 ...ava => AvroParquetStreamingFileSinkITCase.java} | 32 +++++++++++-----------
 ...va => ParquetProtoStreamingFileSinkITCase.java} | 16 +++++------
 2 files changed, 24 insertions(+), 24 deletions(-)

diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java
similarity index 93%
rename from flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileSinkITCase.java
rename to flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java
index a49305b501c..91de5fbc4fe 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java
@@ -19,12 +19,12 @@
 package org.apache.flink.formats.parquet.avro;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.formats.parquet.generated.Address;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
 import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -56,11 +56,11 @@ import java.util.List;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Simple integration test case for writing bulk encoded files with the {@link FileSink} with
- * Parquet.
+ * Simple integration test case for writing bulk encoded files with the {@link StreamingFileSink}
+ * with Parquet.
  */
 @ExtendWith(MiniClusterExtension.class)
-class AvroParquetFileSinkITCase {
+class AvroParquetStreamingFileSinkITCase {
 
     @Test
     void testWriteParquetAvroSpecific(@TempDir File folder) throws Exception {
@@ -78,13 +78,13 @@ class AvroParquetFileSinkITCase {
         DataStream<Address> stream =
                 env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Address.class));
 
-        FileSink<Address> sink =
-                FileSink.forBulkFormat(
+        stream.addSink(
+                StreamingFileSink.forBulkFormat(
                                 Path.fromLocalFile(folder),
                                 AvroParquetWriters.forSpecificRecord(Address.class))
                         .withBucketAssigner(new UniqueBucketAssigner<>("test"))
-                        .build();
-        stream.sinkTo(sink);
+                        .build());
+
         env.execute();
 
         validateResults(folder, SpecificData.get(), data);
@@ -104,13 +104,13 @@ class AvroParquetFileSinkITCase {
         DataStream<GenericRecord> stream =
                 env.addSource(new FiniteTestSource<>(data), new GenericRecordAvroTypeInfo(schema));
 
-        FileSink<GenericRecord> sink =
-                FileSink.forBulkFormat(
+        stream.addSink(
+                StreamingFileSink.forBulkFormat(
                                 Path.fromLocalFile(folder),
                                 AvroParquetWriters.forGenericRecord(schema))
                         .withBucketAssigner(new UniqueBucketAssigner<>("test"))
-                        .build();
-        stream.sinkTo(sink);
+                        .build());
+
         env.execute();
 
         List<Address> expected =
@@ -134,13 +134,13 @@ class AvroParquetFileSinkITCase {
         DataStream<Datum> stream =
                 env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Datum.class));
 
-        FileSink<Datum> sink =
-                FileSink.forBulkFormat(
+        stream.addSink(
+                StreamingFileSink.forBulkFormat(
                                 Path.fromLocalFile(folder),
                                 AvroParquetWriters.forReflectRecord(Datum.class))
                         .withBucketAssigner(new UniqueBucketAssigner<>("test"))
-                        .build();
-        stream.sinkTo(sink);
+                        .build());
+
         env.execute();
 
         validateResults(folder, ReflectData.get(), data);
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java
similarity index 94%
rename from flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoFileSinkITCase.java
rename to flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java
index 10d215a1b6a..3b70aa158c3 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java
@@ -19,10 +19,10 @@
 package org.apache.flink.formats.parquet.protobuf;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
 import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -46,11 +46,11 @@ import static org.apache.flink.formats.parquet.protobuf.SimpleRecord.SimpleProto
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Simple integration test case for writing bulk encoded files with the {@link FileSink} with
- * Parquet.
+ * Simple integration test case for writing bulk encoded files with the {@link StreamingFileSink}
+ * with Parquet.
  */
 @ExtendWith(MiniClusterExtension.class)
-class ParquetProtoFileSinkITCase {
+class ParquetProtoStreamingFileSinkITCase {
 
     @Test
     void testParquetProtoWriters(@TempDir File folder) throws Exception {
@@ -69,13 +69,13 @@ class ParquetProtoFileSinkITCase {
                 env.addSource(
                         new FiniteTestSource<>(data), TypeInformation.of(SimpleProtoRecord.class));
 
-        FileSink<SimpleProtoRecord> sink =
-                FileSink.forBulkFormat(
+        stream.addSink(
+                StreamingFileSink.forBulkFormat(
                                 Path.fromLocalFile(folder),
                                 ParquetProtoWriters.forType(SimpleProtoRecord.class))
                         .withBucketAssigner(new UniqueBucketAssigner<>("test"))
-                        .build();
-        stream.sinkTo(sink);
+                        .build());
+
         env.execute();
 
         validateResults(folder, data);


[flink] 06/08: Revert "[FLINK-30166][Tests] Remove StreamingFileSink as option for tests"

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 37258fbe6305d10a62a977e398018f79cb51d080
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100

    Revert "[FLINK-30166][Tests] Remove StreamingFileSink as option for tests"
    
    This reverts commit 440a275cad1092be7245f0438b7e5b5b1691f708.
---
 .../flink/connector/file/sink/FileSinkProgram.java  | 21 ++++++++++++++++++---
 1 file changed, 18 insertions(+), 3 deletions(-)

diff --git a/flink-end-to-end-tests/flink-file-sink-test/src/main/java/org/apache/flink/connector/file/sink/FileSinkProgram.java b/flink-end-to-end-tests/flink-file-sink-test/src/main/java/org/apache/flink/connector/file/sink/FileSinkProgram.java
index 85064adb6ce..8ec6d013dfa 100644
--- a/flink-end-to-end-tests/flink-file-sink-test/src/main/java/org/apache/flink/connector/file/sink/FileSinkProgram.java
+++ b/flink-end-to-end-tests/flink-file-sink-test/src/main/java/org/apache/flink/connector/file/sink/FileSinkProgram.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -42,7 +43,7 @@ import java.io.PrintStream;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Test program for the {@link FileSink}.
+ * Test program for the {@link StreamingFileSink} and {@link FileSink}.
  *
  * <p>Uses a source that steadily emits a deterministic set of records over 60 seconds, after which
  * it idles and waits for job cancellation. Every record has a unique index that is written to the
@@ -71,7 +72,21 @@ public enum FileSinkProgram {
         // generate data, shuffle, sink
         DataStream<Tuple2<Integer, Integer>> source = env.addSource(new Generator(10, 10, 60));
 
-        if (sinkToTest.equalsIgnoreCase("FileSink")) {
+        if (sinkToTest.equalsIgnoreCase("StreamingFileSink")) {
+            final StreamingFileSink<Tuple2<Integer, Integer>> sink =
+                    StreamingFileSink.forRowFormat(
+                                    new Path(outputPath),
+                                    (Encoder<Tuple2<Integer, Integer>>)
+                                            (element, stream) -> {
+                                                PrintStream out = new PrintStream(stream);
+                                                out.println(element.f1);
+                                            })
+                            .withBucketAssigner(new KeyBucketAssigner())
+                            .withRollingPolicy(OnCheckpointRollingPolicy.build())
+                            .build();
+
+            source.keyBy(0).addSink(sink);
+        } else if (sinkToTest.equalsIgnoreCase("FileSink")) {
             FileSink<Tuple2<Integer, Integer>> sink =
                     FileSink.forRowFormat(
                                     new Path(outputPath),
@@ -88,7 +103,7 @@ public enum FileSinkProgram {
             throw new UnsupportedOperationException("Unsupported sink type: " + sinkToTest);
         }
 
-        env.execute("FileSinkProgram");
+        env.execute("StreamingFileSinkProgram");
     }
 
     /** Use first field for buckets. */


[flink] 07/08: Revert "[FLINK-30166][Tests] Remove no longer necessary test"

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93c7505d3133848686c3704d7bfc2f59af83e92e
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100

    Revert "[FLINK-30166][Tests] Remove no longer necessary test"
    
    This reverts commit 21c44c0863b52090a49da46ed228d3de18799506.
---
 .../f5e3e868-8d92-4258-9654-a605dc9c550f           |   6 +
 .../file/sink/writer/FileSinkMigrationITCase.java  | 291 +++++++++++++++++++++
 2 files changed, 297 insertions(+)

diff --git a/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f b/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f
index a8d59151016..ff88812bba2 100644
--- a/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f
+++ b/flink-connectors/flink-connector-files/archunit-violations/f5e3e868-8d92-4258-9654-a605dc9c550f
@@ -10,3 +10,9 @@ org.apache.flink.connector.file.sink.StreamingExecutionFileSinkITCase does not s
 * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
  or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
+org.apache.flink.connector.file.sink.writer.FileSinkMigrationITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java
new file mode 100644
index 00000000000..90dde5462c4
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.testutils.junit.SharedObjectsExtension;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests migrating from {@link StreamingFileSink} to {@link FileSink}. It trigger a savepoint for
+ * the {@link StreamingFileSink} job and restore the {@link FileSink} job from the savepoint taken.
+ */
+class FileSinkMigrationITCase {
+
+    @RegisterExtension
+    private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
+
+    private static final String SOURCE_UID = "source";
+
+    private static final String SINK_UID = "sink";
+
+    private static final int NUM_SOURCES = 4;
+
+    private static final int NUM_SINKS = 3;
+
+    private static final int NUM_RECORDS = 10000;
+
+    private static final int NUM_BUCKETS = 4;
+
+    private SharedReference<CountDownLatch> finalCheckpointLatch;
+
+    @BeforeEach
+    void setup() {
+        // We wait for two successful checkpoints in sources before shutting down. This ensures that
+        // the sink can commit its data.
+        // We need to keep a "static" latch here because all sources need to be kept running
+        // while we're waiting for the required number of checkpoints. Otherwise, we would lock up
+        // because we can only do checkpoints while all operators are running.
+        finalCheckpointLatch = sharedObjects.add(new CountDownLatch(NUM_SOURCES * 2));
+    }
+
+    @Test
+    void test() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        SharedReference<Collection<Long>> list = sharedObjects.add(new ArrayList<>());
+        int n = 10000;
+        env.setParallelism(100);
+        env.fromSequence(0, n).map(i -> list.applySync(l -> l.add(i)));
+        env.execute();
+        assertThat(list.get()).hasSize(n + 1);
+        assertThat(LongStream.rangeClosed(0, n).boxed().collect(Collectors.toList()))
+                .isEqualTo(list.get().stream().sorted().collect(Collectors.toList()));
+    }
+
+    @Test
+    void testMigration(
+            @TempDir java.nio.file.Path tmpOutputDir, @TempDir java.nio.file.Path tmpSavepointDir)
+            throws Exception {
+        String outputPath = tmpOutputDir.toString();
+        String savepointBasePath = tmpSavepointDir.toString();
+
+        final MiniClusterConfiguration cfg =
+                new MiniClusterConfiguration.Builder()
+                        .withRandomPorts()
+                        .setNumTaskManagers(1)
+                        .setNumSlotsPerTaskManager(4)
+                        .build();
+
+        JobGraph streamingFileSinkJobGraph = createStreamingFileSinkJobGraph(outputPath);
+        String savepointPath =
+                executeAndTakeSavepoint(cfg, streamingFileSinkJobGraph, savepointBasePath);
+
+        JobGraph fileSinkJobGraph = createFileSinkJobGraph(outputPath);
+        loadSavepointAndExecute(cfg, fileSinkJobGraph, savepointPath);
+
+        IntegerFileSinkTestDataUtils.checkIntegerSequenceSinkOutput(
+                outputPath, NUM_RECORDS, NUM_BUCKETS, NUM_SOURCES);
+    }
+
+    private JobGraph createStreamingFileSinkJobGraph(String outputPath) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+
+        StreamingFileSink<Integer> sink =
+                StreamingFileSink.forRowFormat(
+                                new Path(outputPath), new IntegerFileSinkTestDataUtils.IntEncoder())
+                        .withBucketAssigner(
+                                new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS))
+                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
+                        .build();
+
+        env.addSource(new StatefulSource(true, finalCheckpointLatch))
+                .uid(SOURCE_UID)
+                .setParallelism(NUM_SOURCES)
+                .addSink(sink)
+                .setParallelism(NUM_SINKS)
+                .uid(SINK_UID);
+        return env.getStreamGraph().getJobGraph();
+    }
+
+    private JobGraph createFileSinkJobGraph(String outputPath) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+
+        FileSink<Integer> sink =
+                FileSink.forRowFormat(
+                                new Path(outputPath), new IntegerFileSinkTestDataUtils.IntEncoder())
+                        .withBucketAssigner(
+                                new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS))
+                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
+                        .build();
+
+        env.addSource(new StatefulSource(false, finalCheckpointLatch))
+                .uid(SOURCE_UID)
+                .setParallelism(NUM_SOURCES)
+                .sinkTo(sink)
+                .setParallelism(NUM_SINKS)
+                .uid(SINK_UID);
+        return env.getStreamGraph().getJobGraph();
+    }
+
+    private String executeAndTakeSavepoint(
+            MiniClusterConfiguration cfg, JobGraph jobGraph, String savepointBasePath)
+            throws Exception {
+        try (MiniCluster miniCluster = new MiniCluster(cfg)) {
+            miniCluster.start();
+            CompletableFuture<JobSubmissionResult> jobSubmissionResultFuture =
+                    miniCluster.submitJob(jobGraph);
+            JobID jobId = jobSubmissionResultFuture.get().getJobID();
+
+            waitForAllTaskRunning(miniCluster, jobId, false);
+
+            CompletableFuture<String> savepointResultFuture =
+                    miniCluster.triggerSavepoint(
+                            jobId, savepointBasePath, true, SavepointFormatType.CANONICAL);
+            return savepointResultFuture.get();
+        }
+    }
+
+    private void loadSavepointAndExecute(
+            MiniClusterConfiguration cfg, JobGraph jobGraph, String savepointPath)
+            throws Exception {
+        jobGraph.setSavepointRestoreSettings(
+                SavepointRestoreSettings.forPath(savepointPath, false));
+
+        try (MiniCluster miniCluster = new MiniCluster(cfg)) {
+            miniCluster.start();
+            miniCluster.executeJobBlocking(jobGraph);
+        }
+    }
+
+    private static class StatefulSource extends RichParallelSourceFunction<Integer>
+            implements CheckpointedFunction, CheckpointListener {
+
+        private final boolean takingSavepointMode;
+
+        private SharedReference<CountDownLatch> finalCheckpointLatch;
+
+        private ListState<Integer> nextValueState;
+
+        private int nextValue;
+
+        private volatile boolean snapshottedAfterAllRecordsOutput;
+
+        private volatile boolean isWaitingCheckpointComplete;
+
+        private volatile boolean isCanceled;
+
+        public StatefulSource(
+                boolean takingSavepointMode, SharedReference<CountDownLatch> finalCheckpointLatch) {
+            this.takingSavepointMode = takingSavepointMode;
+            this.finalCheckpointLatch = finalCheckpointLatch;
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            nextValueState =
+                    context.getOperatorStateStore()
+                            .getListState(new ListStateDescriptor<>("nextValue", Integer.class));
+
+            if (nextValueState.get() != null && nextValueState.get().iterator().hasNext()) {
+                nextValue = nextValueState.get().iterator().next();
+            }
+        }
+
+        @Override
+        public void run(SourceContext<Integer> ctx) throws Exception {
+            if (takingSavepointMode) {
+                sendRecordsUntil(NUM_RECORDS / 3, 0, ctx);
+                sendRecordsUntil(NUM_RECORDS / 2, 100, ctx);
+
+                while (true) {
+                    Thread.sleep(5000);
+                }
+            } else {
+                sendRecordsUntil(NUM_RECORDS, 0, ctx);
+
+                // Wait the last checkpoint to commit all the pending records.
+                isWaitingCheckpointComplete = true;
+                finalCheckpointLatch.get().await();
+            }
+        }
+
+        private void sendRecordsUntil(
+                int targetNumber, int sleepInMillis, SourceContext<Integer> ctx)
+                throws InterruptedException {
+            while (!isCanceled && nextValue < targetNumber) {
+                synchronized (ctx.getCheckpointLock()) {
+                    ctx.collect(nextValue++);
+                }
+
+                if (sleepInMillis > 0) {
+                    Thread.sleep(sleepInMillis);
+                }
+            }
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {
+            nextValueState.update(Collections.singletonList(nextValue));
+
+            if (isWaitingCheckpointComplete) {
+                snapshottedAfterAllRecordsOutput = true;
+            }
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws Exception {
+            if (isWaitingCheckpointComplete && snapshottedAfterAllRecordsOutput) {
+                finalCheckpointLatch.get().countDown();
+            }
+        }
+
+        @Override
+        public void cancel() {
+            isCanceled = true;
+        }
+    }
+}


[flink] 04/08: Revert "[FLINK-30166][SQL E2E Test] Refactor deprecated StreamingFileSink usage with target FileSink"

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 889626d4960304028e46e87ff4f7f879b781e337
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100

    Revert "[FLINK-30166][SQL E2E Test] Refactor deprecated StreamingFileSink usage with target FileSink"
    
    This reverts commit 88c450b458e2f53f132d7f51cfd7771cc54fc072.
---
 flink-end-to-end-tests/flink-stream-sql-test/pom.xml              | 8 +-------
 .../java/org/apache/flink/sql/tests/StreamSQLTestProgram.java     | 8 ++++----
 2 files changed, 5 insertions(+), 11 deletions(-)

diff --git a/flink-end-to-end-tests/flink-stream-sql-test/pom.xml b/flink-end-to-end-tests/flink-stream-sql-test/pom.xml
index 270ac1b3cbc..0afff45234b 100644
--- a/flink-end-to-end-tests/flink-stream-sql-test/pom.xml
+++ b/flink-end-to-end-tests/flink-stream-sql-test/pom.xml
@@ -41,13 +41,7 @@
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-files</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-    </dependencies>
+	</dependencies>
 
 	<build>
 		<plugins>
diff --git a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
index a9553f9c69a..7abee30c072 100644
--- a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
+++ b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -39,6 +38,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -148,8 +148,8 @@ public class StreamSQLTestProgram {
         DataStream<Row> resultStream =
                 tEnv.toAppendStream(result, Types.ROW(Types.INT, Types.SQL_TIMESTAMP));
 
-        final FileSink<Row> sink =
-                FileSink.forRowFormat(
+        final StreamingFileSink<Row> sink =
+                StreamingFileSink.forRowFormat(
                                 new Path(outputPath),
                                 (Encoder<Row>)
                                         (element, stream) -> {
@@ -166,7 +166,7 @@ public class StreamSQLTestProgram {
                 .map(new KillMapper())
                 .setParallelism(1)
                 // add sink function
-                .sinkTo(sink)
+                .addSink(sink)
                 .setParallelism(1);
 
         sEnv.execute();


[flink] 02/08: Revert "[FLINK-30166][Hadoop Compress] Refactor deprecated StreamingFileSink usage with target FileSink"

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5dbe3b12e898bcf5772807e0a25945fa2b839cef
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100

    Revert "[FLINK-30166][Hadoop Compress] Refactor deprecated StreamingFileSink usage with target FileSink"
    
    This reverts commit cd54dfef4fc2c6f97f6e1ff3c0580a47641158a0.
---
 flink-formats/flink-compress/pom.xml                           |  8 +-------
 .../flink/formats/compress/CompressionFactoryITCase.java       | 10 +++++-----
 2 files changed, 6 insertions(+), 12 deletions(-)

diff --git a/flink-formats/flink-compress/pom.xml b/flink-formats/flink-compress/pom.xml
index 588af51f8e1..f70fb464b87 100644
--- a/flink-formats/flink-compress/pom.xml
+++ b/flink-formats/flink-compress/pom.xml
@@ -87,13 +87,7 @@ under the License.
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-files</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
+	</dependencies>
 
 	<build>
 		<plugins>
diff --git a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
index ba6311ea59c..c2d08112f67 100644
--- a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
+++ b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
@@ -19,11 +19,11 @@
 package org.apache.flink.formats.compress;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.compress.extractor.DefaultExtractor;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
 import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -46,8 +46,8 @@ import java.util.stream.Collectors;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Integration test case for writing bulk encoded files with the {@link FileSink} and Hadoop
- * Compression Codecs.
+ * Integration test case for writing bulk encoded files with the {@link StreamingFileSink} and
+ * Hadoop Compression Codecs.
  */
 @ExtendWith(MiniClusterExtension.class)
 class CompressionFactoryITCase {
@@ -71,8 +71,8 @@ class CompressionFactoryITCase {
                 env.addSource(new FiniteTestSource<>(testData), TypeInformation.of(String.class));
 
         stream.map(str -> str)
-                .sinkTo(
-                        FileSink.forBulkFormat(
+                .addSink(
+                        StreamingFileSink.forBulkFormat(
                                         testPath,
                                         CompressWriters.forExtractor(new DefaultExtractor<String>())
                                                 .withHadoopCompression(TEST_CODEC_NAME))