You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2023/01/09 12:26:49 UTC
[flink] 04/08: Revert "[FLINK-30166][SQL E2E Test] Refactor deprecated StreamingFileSink usage with target FileSink"
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch revert-21371-30166
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 889626d4960304028e46e87ff4f7f879b781e337
Author: Matthias Pohl <ma...@aiven.io>
AuthorDate: Mon Jan 9 13:26:23 2023 +0100
Revert "[FLINK-30166][SQL E2E Test] Refactor deprecated StreamingFileSink usage with target FileSink"
This reverts commit 88c450b458e2f53f132d7f51cfd7771cc54fc072.
---
flink-end-to-end-tests/flink-stream-sql-test/pom.xml | 8 +-------
.../java/org/apache/flink/sql/tests/StreamSQLTestProgram.java | 8 ++++----
2 files changed, 5 insertions(+), 11 deletions(-)
diff --git a/flink-end-to-end-tests/flink-stream-sql-test/pom.xml b/flink-end-to-end-tests/flink-stream-sql-test/pom.xml
index 270ac1b3cbc..0afff45234b 100644
--- a/flink-end-to-end-tests/flink-stream-sql-test/pom.xml
+++ b/flink-end-to-end-tests/flink-stream-sql-test/pom.xml
@@ -41,13 +41,7 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-files</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- </dependencies>
+ </dependencies>
<build>
<plugins>
diff --git a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
index a9553f9c69a..7abee30c072 100644
--- a/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
+++ b/flink-end-to-end-tests/flink-stream-sql-test/src/main/java/org/apache/flink/sql/tests/StreamSQLTestProgram.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -39,6 +38,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -148,8 +148,8 @@ public class StreamSQLTestProgram {
DataStream<Row> resultStream =
tEnv.toAppendStream(result, Types.ROW(Types.INT, Types.SQL_TIMESTAMP));
- final FileSink<Row> sink =
- FileSink.forRowFormat(
+ final StreamingFileSink<Row> sink =
+ StreamingFileSink.forRowFormat(
new Path(outputPath),
(Encoder<Row>)
(element, stream) -> {
@@ -166,7 +166,7 @@ public class StreamSQLTestProgram {
.map(new KillMapper())
.setParallelism(1)
// add sink function
- .sinkTo(sink)
+ .addSink(sink)
.setParallelism(1);
sEnv.execute();