You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/01/09 12:26:50 UTC
[flink] 05/08: Revert "[FLINK-30166][Parquet] Refactor deprecated StreamingFileSink usage with target FileSink"
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a9a23ab55522730968965af37c6a165d4b0b0cf6
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100
Revert "[FLINK-30166][Parquet] Refactor deprecated StreamingFileSink usage with target FileSink"
This reverts commit 3ef68ae2d3b0be58d9c015a260c668a3c4723d97.
---
...ava => AvroParquetStreamingFileSinkITCase.java} | 32 +++++++++++-----------
...va => ParquetProtoStreamingFileSinkITCase.java} | 16 +++++------
2 files changed, 24 insertions(+), 24 deletions(-)
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java
similarity index 93%
rename from flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileSinkITCase.java
rename to flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java
index a49305b501c..91de5fbc4fe 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/AvroParquetStreamingFileSinkITCase.java
@@ -19,12 +19,12 @@
package org.apache.flink.formats.parquet.avro;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.formats.parquet.generated.Address;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -56,11 +56,11 @@ import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
/**
- * Simple integration test case for writing bulk encoded files with the {@link FileSink} with
- * Parquet.
+ * Simple integration test case for writing bulk encoded files with the {@link StreamingFileSink}
+ * with Parquet.
*/
@ExtendWith(MiniClusterExtension.class)
-class AvroParquetFileSinkITCase {
+class AvroParquetStreamingFileSinkITCase {
@Test
void testWriteParquetAvroSpecific(@TempDir File folder) throws Exception {
@@ -78,13 +78,13 @@ class AvroParquetFileSinkITCase {
DataStream<Address> stream =
env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Address.class));
- FileSink<Address> sink =
- FileSink.forBulkFormat(
+ stream.addSink(
+ StreamingFileSink.forBulkFormat(
Path.fromLocalFile(folder),
AvroParquetWriters.forSpecificRecord(Address.class))
.withBucketAssigner(new UniqueBucketAssigner<>("test"))
- .build();
- stream.sinkTo(sink);
+ .build());
+
env.execute();
validateResults(folder, SpecificData.get(), data);
@@ -104,13 +104,13 @@ class AvroParquetFileSinkITCase {
DataStream<GenericRecord> stream =
env.addSource(new FiniteTestSource<>(data), new GenericRecordAvroTypeInfo(schema));
- FileSink<GenericRecord> sink =
- FileSink.forBulkFormat(
+ stream.addSink(
+ StreamingFileSink.forBulkFormat(
Path.fromLocalFile(folder),
AvroParquetWriters.forGenericRecord(schema))
.withBucketAssigner(new UniqueBucketAssigner<>("test"))
- .build();
- stream.sinkTo(sink);
+ .build());
+
env.execute();
List<Address> expected =
@@ -134,13 +134,13 @@ class AvroParquetFileSinkITCase {
DataStream<Datum> stream =
env.addSource(new FiniteTestSource<>(data), TypeInformation.of(Datum.class));
- FileSink<Datum> sink =
- FileSink.forBulkFormat(
+ stream.addSink(
+ StreamingFileSink.forBulkFormat(
Path.fromLocalFile(folder),
AvroParquetWriters.forReflectRecord(Datum.class))
.withBucketAssigner(new UniqueBucketAssigner<>("test"))
- .build();
- stream.sinkTo(sink);
+ .build());
+
env.execute();
validateResults(folder, ReflectData.get(), data);
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java
similarity index 94%
rename from flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoFileSinkITCase.java
rename to flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java
index 10d215a1b6a..3b70aa158c3 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/protobuf/ParquetProtoStreamingFileSinkITCase.java
@@ -19,10 +19,10 @@
package org.apache.flink.formats.parquet.protobuf;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -46,11 +46,11 @@ import static org.apache.flink.formats.parquet.protobuf.SimpleRecord.SimpleProto
import static org.assertj.core.api.Assertions.assertThat;
/**
- * Simple integration test case for writing bulk encoded files with the {@link FileSink} with
- * Parquet.
+ * Simple integration test case for writing bulk encoded files with the {@link StreamingFileSink}
+ * with Parquet.
*/
@ExtendWith(MiniClusterExtension.class)
-class ParquetProtoFileSinkITCase {
+class ParquetProtoStreamingFileSinkITCase {
@Test
void testParquetProtoWriters(@TempDir File folder) throws Exception {
@@ -69,13 +69,13 @@ class ParquetProtoFileSinkITCase {
env.addSource(
new FiniteTestSource<>(data), TypeInformation.of(SimpleProtoRecord.class));
- FileSink<SimpleProtoRecord> sink =
- FileSink.forBulkFormat(
+ stream.addSink(
+ StreamingFileSink.forBulkFormat(
Path.fromLocalFile(folder),
ParquetProtoWriters.forType(SimpleProtoRecord.class))
.withBucketAssigner(new UniqueBucketAssigner<>("test"))
- .build();
- stream.sinkTo(sink);
+ .build());
+
env.execute();
validateResults(folder, data);