You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/01/09 12:26:53 UTC

[flink] 08/08: Revert "[FLINK-30166][AVRO] Refactor deprecated StreamingFileSink usage with target FileSink"

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f77032efb9ff804ffe47bffd3b3d7fa7ea8741af
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100

    Revert "[FLINK-30166][AVRO] Refactor deprecated StreamingFileSink usage with target FileSink"
    
    This reverts commit 24a133c7bda0b07bbae166db6e92435117071f84.
---
 ...TCase.java => AvroStreamingFileSinkITCase.java} | 30 ++++++++++------------
 1 file changed, 13 insertions(+), 17 deletions(-)

diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFileSinkITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java
similarity index 91%
rename from flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFileSinkITCase.java
rename to flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java
index b36db4d13d3..e2752aa75e5 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFileSinkITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroStreamingFileSinkITCase.java
@@ -19,12 +19,12 @@
 package org.apache.flink.formats.avro;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
 import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.util.AbstractTestBase;
@@ -54,9 +54,10 @@ import java.util.List;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Simple integration test case for writing bulk encoded files with the {@link FileSink} with Avro.
+ * Simple integration test case for writing bulk encoded files with the {@link StreamingFileSink}
+ * with Avro.
  */
-public class AvroFileSinkITCase extends AbstractTestBase {
+public class AvroStreamingFileSinkITCase extends AbstractTestBase {
 
     @Rule public final Timeout timeoutPerTest = Timeout.seconds(20);
 
@@ -77,11 +78,10 @@ public class AvroFileSinkITCase extends AbstractTestBase {
         AvroWriterFactory<Address> avroWriterFactory = AvroWriters.forSpecificRecord(Address.class);
         DataStream<Address> stream =
                 env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Address.class));
-        FileSink<Address> sink =
-                FileSink.forBulkFormat(Path.fromLocalFile(folder), avroWriterFactory)
+        stream.addSink(
+                StreamingFileSink.forBulkFormat(Path.fromLocalFile(folder), avroWriterFactory)
                         .withBucketAssigner(new UniqueBucketAssigner<>("test"))
-                        .build();
-        stream.sinkTo(sink);
+                        .build());
         env.execute();
 
         validateResults(folder, new SpecificDatumReader<>(Address.class), data);
@@ -101,13 +101,10 @@ public class AvroFileSinkITCase extends AbstractTestBase {
         AvroWriterFactory<GenericRecord> avroWriterFactory = AvroWriters.forGenericRecord(schema);
         DataStream<GenericRecord> stream =
                 env.addSource(new FiniteTestSource<>(data), new GenericRecordAvroTypeInfo(schema));
-
-        FileSink<GenericRecord> sink =
-                FileSink.forBulkFormat(Path.fromLocalFile(folder), avroWriterFactory)
+        stream.addSink(
+                StreamingFileSink.forBulkFormat(Path.fromLocalFile(folder), avroWriterFactory)
                         .withBucketAssigner(new UniqueBucketAssigner<>("test"))
-                        .build();
-
-        stream.sinkTo(sink);
+                        .build());
         env.execute();
 
         validateResults(folder, new GenericDatumReader<>(schema), new ArrayList<>(data));
@@ -126,11 +123,10 @@ public class AvroFileSinkITCase extends AbstractTestBase {
         AvroWriterFactory<Datum> avroWriterFactory = AvroWriters.forReflectRecord(Datum.class);
         DataStream<Datum> stream =
                 env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Datum.class));
-        FileSink<Datum> sink =
-                FileSink.forBulkFormat(Path.fromLocalFile(folder), avroWriterFactory)
+        stream.addSink(
+                StreamingFileSink.forBulkFormat(Path.fromLocalFile(folder), avroWriterFactory)
                         .withBucketAssigner(new UniqueBucketAssigner<>("test"))
-                        .build();
-        stream.sinkTo(sink);
+                        .build());
         env.execute();
 
         validateResults(folder, new ReflectDatumReader<>(Datum.class), data);