You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/01/09 12:26:47 UTC
[flink] 02/08: Revert "[FLINK-30166][Hadoop Compress] Refactor deprecated StreamingFileSink usage with target FileSink"
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5dbe3b12e898bcf5772807e0a25945fa2b839cef
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100
Revert "[FLINK-30166][Hadoop Compress] Refactor deprecated StreamingFileSink usage with target FileSink"
This reverts commit cd54dfef4fc2c6f97f6e1ff3c0580a47641158a0.
---
flink-formats/flink-compress/pom.xml | 8 +-------
.../flink/formats/compress/CompressionFactoryITCase.java | 10 +++++-----
2 files changed, 6 insertions(+), 12 deletions(-)
diff --git a/flink-formats/flink-compress/pom.xml b/flink-formats/flink-compress/pom.xml
index 588af51f8e1..f70fb464b87 100644
--- a/flink-formats/flink-compress/pom.xml
+++ b/flink-formats/flink-compress/pom.xml
@@ -87,13 +87,7 @@ under the License.
<scope>test</scope>
<type>test-jar</type>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-files</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ </dependencies>
<build>
<plugins>
diff --git a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
index ba6311ea59c..c2d08112f67 100644
--- a/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
+++ b/flink-formats/flink-compress/src/test/java/org/apache/flink/formats/compress/CompressionFactoryITCase.java
@@ -19,11 +19,11 @@
package org.apache.flink.formats.compress;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.compress.extractor.DefaultExtractor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -46,8 +46,8 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
/**
- * Integration test case for writing bulk encoded files with the {@link FileSink} and Hadoop
- * Compression Codecs.
+ * Integration test case for writing bulk encoded files with the {@link StreamingFileSink} and
+ * Hadoop Compression Codecs.
*/
@ExtendWith(MiniClusterExtension.class)
class CompressionFactoryITCase {
@@ -71,8 +71,8 @@ class CompressionFactoryITCase {
env.addSource(new FiniteTestSource<>(testData), TypeInformation.of(String.class));
stream.map(str -> str)
- .sinkTo(
- FileSink.forBulkFormat(
+ .addSink(
+ StreamingFileSink.forBulkFormat(
testPath,
CompressWriters.forExtractor(new DefaultExtractor<String>())
.withHadoopCompression(TEST_CODEC_NAME))