You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/01/09 12:26:47 UTC

[flink] 02/08: Revert "[FLINK-30166][Hadoop Compress] Refactor deprecated StreamingFileSink usage with target FileSink"

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5dbe3b12e898bcf5772807e0a25945fa2b839cef
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100

    Revert "[FLINK-30166][Hadoop Compress] Refactor deprecated StreamingFileSink usage with target FileSink"
    
    This reverts commit cd54dfef4fc2c6f97f6e1ff3c0580a47641158a0.
---
 flink-formats/flink-compress/pom.xml                           |  8 +-------
 .../flink/formats/compress/CompressionFactoryITCase.java       | 10 +++++-----
 2 files changed, 6 insertions(+), 12 deletions(-)

diff --git a/flink-formats/flink-compress/pom.xml b/flink-formats/flink-compress/pom.xml
index 588af51f8e1..f70fb464b87 100644
--- a/flink-formats/flink-compress/pom.xml
+++ b/flink-formats/flink-compress/pom.xml
@@ -87,13 +87,7 @@ under the License.
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-files</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
+	</dependencies>
 
 	<build>
 		<plugins>
diff --git a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
index ba6311ea59c..c2d08112f67 100644
--- a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
+++ b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
@@ -19,11 +19,11 @@
 package org.apache.flink.formats.compress;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.compress.extractor.DefaultExtractor;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
 import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -46,8 +46,8 @@ import java.util.stream.Collectors;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Integration test case for writing bulk encoded files with the {@link FileSink} and Hadoop
- * Compression Codecs.
+ * Integration test case for writing bulk encoded files with the {@link StreamingFileSink} and
+ * Hadoop Compression Codecs.
  */
 @ExtendWith(MiniClusterExtension.class)
 class CompressionFactoryITCase {
@@ -71,8 +71,8 @@ class CompressionFactoryITCase {
                 env.addSource(new FiniteTestSource<>(testData), TypeInformation.of(String.class));
 
         stream.map(str -> str)
-                .sinkTo(
-                        FileSink.forBulkFormat(
+                .addSink(
+                        StreamingFileSink.forBulkFormat(
                                         testPath,
                                         CompressWriters.forExtractor(new DefaultExtractor<String>())
                                                 .withHadoopCompression(TEST_CODEC_NAME))