You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/01/09 12:26:49 UTC

[flink] 04/08: Revert "[FLINK-30166][SQL E2E Test] Refactor deprecated StreamingFileSink usage with target FileSink"

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 889626d4960304028e46e87ff4f7f879b781e337
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100

    Revert "[FLINK-30166][SQL E2E Test] Refactor deprecated StreamingFileSink usage with target FileSink"
    
    This reverts commit 88c450b458e2f53f132d7f51cfd7771cc54fc072.
---
 flink-end-to-end-tests/flink-stream-sql-test/pom.xml              | 8 +-------
 .../java/org/apache/flink/sql/tests/StreamSQLTestProgram.java     | 8 ++++----
 2 files changed, 5 insertions(+), 11 deletions(-)

diff --git a/flink-end-to-end-tests/flink-stream-sql-test/pom.xml b/flink-end-to-end-tests/flink-stream-sql-test/pom.xml
index 270ac1b3cbc..0afff45234b 100644
--- a/flink-end-to-end-tests/flink-stream-sql-test/pom.xml
+++ b/flink-end-to-end-tests/flink-stream-sql-test/pom.xml
@@ -41,13 +41,7 @@
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-files</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-    </dependencies>
+	</dependencies>
 
 	<build>
 		<plugins>
diff --git a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
index a9553f9c69a..7abee30c072 100644
--- a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
+++ b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -39,6 +38,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -148,8 +148,8 @@ public class StreamSQLTestProgram {
         DataStream<Row> resultStream =
                 tEnv.toAppendStream(result, Types.ROW(Types.INT, Types.SQL_TIMESTAMP));
 
-        final FileSink<Row> sink =
-                FileSink.forRowFormat(
+        final StreamingFileSink<Row> sink =
+                StreamingFileSink.forRowFormat(
                                 new Path(outputPath),
                                 (Encoder<Row>)
                                         (element, stream) -> {
@@ -166,7 +166,7 @@ public class StreamSQLTestProgram {
                 .map(new KillMapper())
                 .setParallelism(1)
                 // add sink function
-                .sinkTo(sink)
+                .addSink(sink)
                 .setParallelism(1);
 
         sEnv.execute();