You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2021/12/01 08:05:24 UTC

[flink] 01/03: [FLINK-24596][table] Allow using unified Sinks with the DataStreamSinkProvider

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b7cd97b02d7f39b5190d27fbaf6e6287f127b9a9
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Thu Oct 21 10:20:25 2021 +0200

    [FLINK-24596][table] Allow using unified Sinks with the DataStreamSinkProvider
---
 .../connectors/cassandra/CassandraSink.java        |  2 +-
 .../flink/streaming/api/datastream/DataStream.java |  2 +-
 .../streaming/api/datastream/DataStreamSink.java   |  8 ++-
 .../streaming/api/datastream/KeyedStream.java      |  4 +-
 .../api/operators/collect/CollectStreamSink.java   |  8 ++-
 .../transformations/LegacySinkTransformation.java  |  5 +-
 .../LegacySinkTransformationTranslator.java        |  2 +-
 .../api/datastream/DataStreamSinkTest.java         | 10 ++-
 .../plan/nodes/exec/batch/BatchExecLegacySink.java |  3 +-
 .../nodes/exec/common/CommonExecLegacySink.java    | 18 +++---
 .../nodes/exec/stream/StreamExecLegacySink.java    |  3 +-
 .../nodes/exec/common/CommonExecSinkITCase.java    | 75 +++++++++++++++++++---
 12 files changed, 105 insertions(+), 35 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 89aa10a..bf44f0a 100644
--- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -62,7 +62,7 @@ public class CassandraSink<IN> {
     }
 
     private LegacySinkTransformation<IN> getSinkTransformation() {
-        return sink1.getTransformation();
+        return sink1.getLegacyTransformation();
     }
 
     private Transformation<IN> getTransformation() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 726728b..1c1bca6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1244,7 +1244,7 @@ public class DataStream<T> {
 
         DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
 
-        getExecutionEnvironment().addOperator(sink.getTransformation());
+        getExecutionEnvironment().addOperator(sink.getLegacyTransformation());
         return sink;
     }
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index eea988e..3aca069 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
@@ -64,7 +65,12 @@ public class DataStreamSink<T> {
 
     /** Returns the transformation that contains the actual sink operator of this sink. */
     @Internal
-    public LegacySinkTransformation<T> getTransformation() {
+    public Transformation<T> getTransformation() {
+        return transformation;
+    }
+
+    @Internal
+    public LegacySinkTransformation<T> getLegacyTransformation() {
         if (transformation instanceof LegacySinkTransformation) {
             return (LegacySinkTransformation<T>) transformation;
         } else {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index c299a68..ada3a82 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -299,8 +299,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
     @Override
     public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
         DataStreamSink<T> result = super.addSink(sinkFunction);
-        result.getTransformation().setStateKeySelector(keySelector);
-        result.getTransformation().setStateKeyType(keyType);
+        result.getLegacyTransformation().setStateKeySelector(keySelector);
+        result.getLegacyTransformation().setStateKeyType(keyType);
         return result;
     }
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectStreamSink.java
index d53c3e6..a68043d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectStreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectStreamSink.java
@@ -18,10 +18,12 @@
 package org.apache.flink.streaming.api.operators.collect;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
 
 /**
  * A {@link DataStreamSink} which is used to collect results of a data stream. It completely
@@ -30,17 +32,17 @@ import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
 @Internal
 public class CollectStreamSink<T> extends DataStreamSink<T> {
 
-    private final LegacySinkTransformation<T> transformation;
+    private final PhysicalTransformation<T> transformation;
 
     public CollectStreamSink(DataStream<T> inputStream, CollectSinkOperatorFactory<T> factory) {
         super(inputStream, (CollectSinkOperator<T>) factory.getOperator());
         this.transformation =
-                new LegacySinkTransformation<>(
+                new LegacySinkTransformation<T>(
                         inputStream.getTransformation(), "Collect Stream Sink", factory, 1);
     }
 
     @Override
-    public LegacySinkTransformation<T> getTransformation() {
+    public Transformation<T> getTransformation() {
         return transformation;
     }
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
index d240329..8977c70 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySinkTransformation.java
@@ -23,7 +23,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
@@ -40,7 +39,7 @@ import java.util.List;
  * @param <T> The type of the elements in the input {@code LegacySinkTransformation}
  */
 @Internal
-public class LegacySinkTransformation<T> extends PhysicalTransformation<Object> {
+public class LegacySinkTransformation<T> extends PhysicalTransformation<T> {
 
     private final Transformation<T> input;
 
@@ -70,7 +69,7 @@ public class LegacySinkTransformation<T> extends PhysicalTransformation<Object>
             String name,
             StreamOperatorFactory<Object> operatorFactory,
             int parallelism) {
-        super(name, TypeExtractor.getForClass(Object.class), parallelism);
+        super(name, input.getOutputType(), parallelism);
         this.input = input;
         this.operatorFactory = operatorFactory;
     }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java
index fa2b1d3..378a717 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/LegacySinkTransformationTranslator.java
@@ -42,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 @Internal
 public class LegacySinkTransformationTranslator<IN>
-        extends SimpleTransformationTranslator<Object, LegacySinkTransformation<IN>> {
+        extends SimpleTransformationTranslator<IN, LegacySinkTransformation<IN>> {
 
     @Override
     protected Collection<Integer> translateForBatchInternal(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java
index b8d39ad..b2de771 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/DataStreamSinkTest.java
@@ -17,18 +17,24 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
 import org.apache.flink.streaming.runtime.operators.sink.TestSink;
 
 import org.junit.Test;
 
+import static org.junit.Assert.assertTrue;
+
 /** Unit test for {@link DataStreamSink}. */
 public class DataStreamSinkTest {
 
-    @Test(expected = IllegalStateException.class)
+    @Test
     public void throwExceptionWhenGettingTransformationWithNewSinkAPI() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.fromElements(1, 2).sinkTo(TestSink.newBuilder().build()).getTransformation();
+        final Transformation<Integer> transformation =
+                env.fromElements(1, 2).sinkTo(TestSink.newBuilder().build()).getTransformation();
+        assertTrue(transformation instanceof SinkTransformation);
     }
 
     @Test(expected = UnsupportedOperationException.class)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacySink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacySink.java
index 5aa1dea..049920c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacySink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLegacySink.java
@@ -37,8 +37,7 @@ import java.lang.reflect.Modifier;
  *
  * @param <T> The return type of the {@link TableSink}.
  */
-public class BatchExecLegacySink<T> extends CommonExecLegacySink<T>
-        implements BatchExecNode<Object> {
+public class BatchExecLegacySink<T> extends CommonExecLegacySink<T> implements BatchExecNode<T> {
 
     public BatchExecLegacySink(
             TableSink<T> tableSink,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java
index 8cb2b66..ca29944 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLegacySink.java
@@ -58,8 +58,8 @@ import java.util.List;
  *
  * @param <T> The return type of the {@link TableSink}.
  */
-public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object>
-        implements MultipleTransformationTranslator<Object> {
+public abstract class CommonExecLegacySink<T> extends ExecNodeBase<T>
+        implements MultipleTransformationTranslator<T> {
     protected final TableSink<T> tableSink;
     protected final @Nullable String[] upsertKeys;
     protected final boolean needRetraction;
@@ -82,7 +82,7 @@ public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object>
 
     @SuppressWarnings("unchecked")
     @Override
-    protected Transformation<Object> translateToPlanInternal(PlannerBase planner) {
+    protected Transformation<T> translateToPlanInternal(PlannerBase planner) {
         if (tableSink instanceof StreamTableSink) {
             final Transformation<T> transform;
             if (tableSink instanceof RetractStreamTableSink) {
@@ -121,8 +121,9 @@ public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object>
             }
 
             final DataStream<T> dataStream = new DataStream<T>(planner.getExecEnv(), transform);
-            final DataStreamSink<?> dsSink =
-                    ((StreamTableSink<T>) tableSink).consumeDataStream(dataStream);
+            final DataStreamSink<T> dsSink =
+                    (DataStreamSink<T>)
+                            ((StreamTableSink<T>) tableSink).consumeDataStream(dataStream);
             if (dsSink == null) {
                 throw new TableException(
                         String.format(
@@ -131,15 +132,14 @@ public abstract class CommonExecLegacySink<T> extends ExecNodeBase<Object>
                                         + "However, %s doesn't implement this method.",
                                 tableSink.getClass().getCanonicalName()));
             }
-            return dsSink.getTransformation();
+            return dsSink.getLegacyTransformation();
         } else if (tableSink instanceof DataStreamTableSink) {
             // In case of table to DataStream through
             // StreamTableEnvironment#toAppendStream/toRetractStream,
             // we insert a DataStreamTableSink that wraps the given DataStream as a LogicalSink. It
             // is no real table sink, so we just need translate its input to Transformation.
-            return (Transformation<Object>)
-                    translateToTransformation(
-                            planner, ((DataStreamTableSink<T>) tableSink).withChangeFlag());
+            return translateToTransformation(
+                    planner, ((DataStreamTableSink<T>) tableSink).withChangeFlag());
         } else {
             throw new TableException(
                     String.format(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacySink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacySink.java
index af9ee1d..b1e4ee6 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacySink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacySink.java
@@ -40,8 +40,7 @@ import java.util.stream.Collectors;
  *
  * @param <T> The return type of the {@link TableSink}.
  */
-public class StreamExecLegacySink<T> extends CommonExecLegacySink<T>
-        implements StreamExecNode<Object> {
+public class StreamExecLegacySink<T> extends CommonExecLegacySink<T> implements StreamExecNode<T> {
 
     public StreamExecLegacySink(
             TableSink<T> tableSink,
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
index 744ced7..60c489b 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
@@ -50,7 +50,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
+import static org.apache.flink.table.api.DataTypes.INT;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.not;
@@ -93,7 +95,8 @@ public class CommonExecSinkITCase extends AbstractTestBase {
                                                     DynamicTableSink.Context context) {
                                         return SinkProvider.of(
                                                 TestSink.newBuilder()
-                                                        .setWriter(new TestWriter(timestamps))
+                                                        .setWriter(
+                                                                new TestTimestampWriter(timestamps))
                                                         .setCommittableSerializer(
                                                                 TestSink.StringCommittableSerializer
                                                                         .INSTANCE)
@@ -136,7 +139,7 @@ public class CommonExecSinkITCase extends AbstractTestBase {
                                                             public void invoke(
                                                                     RowData value,
                                                                     Context context) {
-                                                                addTimestamp(
+                                                                addElement(
                                                                         timestamps,
                                                                         context.timestamp());
                                                             }
@@ -153,6 +156,35 @@ public class CommonExecSinkITCase extends AbstractTestBase {
     }
 
     @Test
+    public void testUnifiedSinksAreUsableWithDataStreamSinkProvider()
+            throws ExecutionException, InterruptedException {
+        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+        final SharedReference<List<RowData>> fetched = sharedObjects.add(new ArrayList<>());
+        final List<Row> rows = Arrays.asList(Row.of(1), Row.of(2));
+
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(Schema.newBuilder().column("a", INT()).build())
+                        .source(new TimestampTestSource(rows))
+                        .sink(
+                                new TableFactoryHarness.SinkBase() {
+                                    @Override
+                                    public DataStreamSinkProvider getSinkRuntimeProvider(
+                                            DynamicTableSink.Context context) {
+                                        return buildRecordTestSinkProvider(fetched);
+                                    }
+                                })
+                        .build();
+        tableEnv.createTable("T1", sourceDescriptor);
+        String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
+        tableEnv.executeSql(sqlStmt).await();
+        final List<Integer> fetchedRows =
+                fetched.get().stream().map(r -> r.getInt(0)).sorted().collect(Collectors.toList());
+        assertEquals(fetchedRows.get(0).intValue(), 1);
+        assertEquals(fetchedRows.get(1).intValue(), 2);
+    }
+
+    @Test
     public void testStreamRecordTimestampInserterNotApplied() {
         final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
         final SharedReference<List<Long>> timestamps = sharedObjects.add(new ArrayList<>());
@@ -175,7 +207,8 @@ public class CommonExecSinkITCase extends AbstractTestBase {
                                                     DynamicTableSink.Context context) {
                                         return SinkProvider.of(
                                                 TestSink.newBuilder()
-                                                        .setWriter(new TestWriter(timestamps))
+                                                        .setWriter(
+                                                                new TestTimestampWriter(timestamps))
                                                         .setCommittableSerializer(
                                                                 TestSink.StringCommittableSerializer
                                                                         .INSTANCE)
@@ -187,8 +220,19 @@ public class CommonExecSinkITCase extends AbstractTestBase {
         assertPlan(tableEnv, "INSERT INTO T1 SELECT * FROM T1", false);
     }
 
-    private static void addTimestamp(SharedReference<List<Long>> timestamps, Long timestamp) {
-        timestamps.applySync(l -> l.add(timestamp));
+    private static DataStreamSinkProvider buildRecordTestSinkProvider(
+            SharedReference<List<RowData>> fetched) {
+        return dataStream ->
+                dataStream.sinkTo(
+                        TestSink.newBuilder()
+                                .setWriter(new RecordWriter(fetched))
+                                .setCommittableSerializer(
+                                        TestSink.StringCommittableSerializer.INSTANCE)
+                                .build());
+    }
+
+    private static <T> void addElement(SharedReference<List<T>> elements, T element) {
+        elements.applySync(l -> l.add(element));
     }
 
     private static void assertPlan(
@@ -264,17 +308,32 @@ public class CommonExecSinkITCase extends AbstractTestBase {
         public void cancel() {}
     }
 
-    private static class TestWriter extends TestSink.DefaultSinkWriter<RowData> {
+    private static class TestTimestampWriter extends TestSink.DefaultSinkWriter<RowData> {
 
         private final SharedReference<List<Long>> timestamps;
 
-        private TestWriter(SharedReference<List<Long>> timestamps) {
+        private TestTimestampWriter(SharedReference<List<Long>> timestamps) {
             this.timestamps = timestamps;
         }
 
         @Override
         public void write(RowData element, Context context) {
-            addTimestamp(timestamps, context.timestamp());
+            addElement(timestamps, context.timestamp());
+            super.write(element, context);
+        }
+    }
+
+    private static class RecordWriter extends TestSink.DefaultSinkWriter<RowData> {
+
+        private final SharedReference<List<RowData>> rows;
+
+        private RecordWriter(SharedReference<List<RowData>> rows) {
+            this.rows = rows;
+        }
+
+        @Override
+        public void write(RowData element, Context context) {
+            addElement(rows, element);
             super.write(element, context);
         }
     }