You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/01/09 12:26:50 UTC

[flink] 05/08: Revert "[FLINK-30166][Parquet] Refactor deprecated StreamingFileSink usage with target FileSink"

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a9a23ab55522730968965af37c6a165d4b0b0cf6
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100

    Revert "[FLINK-30166][Parquet] Refactor deprecated StreamingFileSink usage with target FileSink"
    
    This reverts commit 3ef68ae2d3b0be58d9c015a260c668a3c4723d97.
---
 ...ava => AvroParquetStreamingFileSinkITCase.java} | 32 +++++++++++-----------
 ...va => ParquetProtoStreamingFileSinkITCase.java} | 16 +++++------
 2 files changed, 24 insertions(+), 24 deletions(-)

diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java
similarity index 93%
rename from flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileSinkITCase.java
rename to flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java
index a49305b501c..91de5fbc4fe 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java
@@ -19,12 +19,12 @@
 package org.apache.flink.formats.parquet.avro;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.formats.parquet.generated.Address;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
 import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -56,11 +56,11 @@ import java.util.List;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Simple integration test case for writing bulk encoded files with the {@link FileSink} with
- * Parquet.
+ * Simple integration test case for writing bulk encoded files with the {@link StreamingFileSink}
+ * with Parquet.
  */
 @ExtendWith(MiniClusterExtension.class)
-class AvroParquetFileSinkITCase {
+class AvroParquetStreamingFileSinkITCase {
 
     @Test
     void testWriteParquetAvroSpecific(@TempDir File folder) throws Exception {
@@ -78,13 +78,13 @@ class AvroParquetFileSinkITCase {
         DataStream<Address> stream =
                 env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Address.class));
 
-        FileSink<Address> sink =
-                FileSink.forBulkFormat(
+        stream.addSink(
+                StreamingFileSink.forBulkFormat(
                                 Path.fromLocalFile(folder),
                                 AvroParquetWriters.forSpecificRecord(Address.class))
                         .withBucketAssigner(new UniqueBucketAssigner<>("test"))
-                        .build();
-        stream.sinkTo(sink);
+                        .build());
+
         env.execute();
 
         validateResults(folder, SpecificData.get(), data);
@@ -104,13 +104,13 @@ class AvroParquetFileSinkITCase {
         DataStream<GenericRecord> stream =
                 env.addSource(new FiniteTestSource<>(data), new GenericRecordAvroTypeInfo(schema));
 
-        FileSink<GenericRecord> sink =
-                FileSink.forBulkFormat(
+        stream.addSink(
+                StreamingFileSink.forBulkFormat(
                                 Path.fromLocalFile(folder),
                                 AvroParquetWriters.forGenericRecord(schema))
                         .withBucketAssigner(new UniqueBucketAssigner<>("test"))
-                        .build();
-        stream.sinkTo(sink);
+                        .build());
+
         env.execute();
 
         List<Address> expected =
@@ -134,13 +134,13 @@ class AvroParquetFileSinkITCase {
         DataStream<Datum> stream =
                 env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Datum.class));
 
-        FileSink<Datum> sink =
-                FileSink.forBulkFormat(
+        stream.addSink(
+                StreamingFileSink.forBulkFormat(
                                 Path.fromLocalFile(folder),
                                 AvroParquetWriters.forReflectRecord(Datum.class))
                         .withBucketAssigner(new UniqueBucketAssigner<>("test"))
-                        .build();
-        stream.sinkTo(sink);
+                        .build());
+
         env.execute();
 
         validateResults(folder, ReflectData.get(), data);
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java
similarity index 94%
rename from flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoFileSinkITCase.java
rename to flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java
index 10d215a1b6a..3b70aa158c3 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java
@@ -19,10 +19,10 @@
 package org.apache.flink.formats.parquet.protobuf;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
 import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -46,11 +46,11 @@ import static org.apache.flink.formats.parquet.protobuf.SimpleRecord.SimpleProto
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Simple integration test case for writing bulk encoded files with the {@link FileSink} with
- * Parquet.
+ * Simple integration test case for writing bulk encoded files with the {@link StreamingFileSink}
+ * with Parquet.
  */
 @ExtendWith(MiniClusterExtension.class)
-class ParquetProtoFileSinkITCase {
+class ParquetProtoStreamingFileSinkITCase {
 
     @Test
     void testParquetProtoWriters(@TempDir File folder) throws Exception {
@@ -69,13 +69,13 @@ class ParquetProtoFileSinkITCase {
                 env.addSource(
                         new FiniteTestSource<>(data), TypeInformation.of(SimpleProtoRecord.class));
 
-        FileSink<SimpleProtoRecord> sink =
-                FileSink.forBulkFormat(
+        stream.addSink(
+                StreamingFileSink.forBulkFormat(
                                 Path.fromLocalFile(folder),
                                 ParquetProtoWriters.forType(SimpleProtoRecord.class))
                         .withBucketAssigner(new UniqueBucketAssigner<>("test"))
-                        .build();
-        stream.sinkTo(sink);
+                        .build());
+
         env.execute();
 
         validateResults(folder, data);