You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/06 02:25:39 UTC

[flink] branch master updated: [FLINK-28322][table-api] DataStreamScan(Sink)Provider's new method is not compatible

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 03a1a42d210 [FLINK-28322][table-api] DataStreamScan(Sink)Provider's new method is not compatible
03a1a42d210 is described below

commit 03a1a42d210bf30751a8a5fed295cd56b0ff34df
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Jul 6 10:06:52 2022 +0800

    [FLINK-28322][table-api] DataStreamScan(Sink)Provider's new method is not compatible
    
    This closes #20119
---
 .../connector/file/table/FileSystemTableSink.java  |  9 ++-
 .../flink/connectors/hive/HiveTableSink.java       | 10 ++-
 .../connectors/kafka/table/KafkaDynamicSink.java   | 56 ++++++++---------
 .../org/apache/flink/table/utils/TestingSinks.java | 13 +++-
 .../connector/sink/DataStreamSinkProvider.java     |  6 +-
 .../connector/source/DataStreamScanProvider.java   |  6 +-
 .../planner/connectors/CollectDynamicSink.java     | 71 +++++++++++-----------
 .../nodes/exec/common/CommonExecSinkITCase.java    | 38 +++++++++---
 8 files changed, 127 insertions(+), 82 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
index 7ac5b385c13..81eb49a5457 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
@@ -136,8 +136,13 @@ public class FileSystemTableSink extends AbstractFileSystemTable
 
     @Override
     public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
-        return (DataStreamSinkProvider)
-                (providerContext, dataStream) -> consume(providerContext, dataStream, sinkContext);
+        return new DataStreamSinkProvider() {
+            @Override
+            public DataStreamSink<?> consumeDataStream(
+                    ProviderContext providerContext, DataStream<RowData> dataStream) {
+                return consume(providerContext, dataStream, sinkContext);
+            }
+        };
     }
 
     private DataStreamSink<?> consume(
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 43166709200..abbc2786994 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -163,9 +163,13 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
     public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         DataStructureConverter converter =
                 context.createDataStructureConverter(tableSchema.toRowDataType());
-        return (DataStreamSinkProvider)
-                (providerContext, dataStream) ->
-                        consume(providerContext, dataStream, context.isBounded(), converter);
+        return new DataStreamSinkProvider() {
+            @Override
+            public DataStreamSink<?> consumeDataStream(
+                    ProviderContext providerContext, DataStream<RowData> dataStream) {
+                return consume(providerContext, dataStream, context.isBounded(), converter);
+            }
+        };
     }
 
     private DataStreamSink<?> consume(
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
index 8af1782ac95..041846dd78f 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
@@ -26,11 +26,13 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.kafka.sink.KafkaSink;
 import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.ProviderContext;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -215,34 +217,32 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
                                         upsertMode))
                         .build();
         if (flushMode.isEnabled() && upsertMode) {
-            return (DataStreamSinkProvider)
-                    (providerContext, dataStream) -> {
-                        final boolean objectReuse =
-                                dataStream
-                                        .getExecutionEnvironment()
-                                        .getConfig()
-                                        .isObjectReuseEnabled();
-                        final ReducingUpsertSink<?> sink =
-                                new ReducingUpsertSink<>(
-                                        kafkaSink,
-                                        physicalDataType,
-                                        keyProjection,
-                                        flushMode,
-                                        objectReuse
-                                                ? createRowDataTypeSerializer(
-                                                                context,
-                                                                dataStream.getExecutionConfig())
-                                                        ::copy
-                                                : rowData -> rowData);
-                        final DataStreamSink<RowData> end = dataStream.sinkTo(sink);
-                        providerContext
-                                .generateUid(UPSERT_KAFKA_TRANSFORMATION)
-                                .ifPresent(end::uid);
-                        if (parallelism != null) {
-                            end.setParallelism(parallelism);
-                        }
-                        return end;
-                    };
+            return new DataStreamSinkProvider() {
+                @Override
+                public DataStreamSink<?> consumeDataStream(
+                        ProviderContext providerContext, DataStream<RowData> dataStream) {
+                    final boolean objectReuse =
+                            dataStream.getExecutionEnvironment().getConfig().isObjectReuseEnabled();
+                    final ReducingUpsertSink<?> sink =
+                            new ReducingUpsertSink<>(
+                                    kafkaSink,
+                                    physicalDataType,
+                                    keyProjection,
+                                    flushMode,
+                                    objectReuse
+                                            ? createRowDataTypeSerializer(
+                                                            context,
+                                                            dataStream.getExecutionConfig())
+                                                    ::copy
+                                            : rowData -> rowData);
+                    final DataStreamSink<RowData> end = dataStream.sinkTo(sink);
+                    providerContext.generateUid(UPSERT_KAFKA_TRANSFORMATION).ifPresent(end::uid);
+                    if (parallelism != null) {
+                        end.setParallelism(parallelism);
+                    }
+                    return end;
+                }
+            };
         }
         return SinkV2Provider.of(kafkaSink, parallelism);
     }
diff --git a/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java b/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java
index b7645a67a03..9e6867d993b 100644
--- a/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java
+++ b/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java
@@ -19,8 +19,11 @@
 package org.apache.flink.table.utils;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
@@ -53,8 +56,14 @@ public class TestingSinks {
         public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
             final DataStructureConverter converter =
                     context.createDataStructureConverter(rowDataType);
-            return (DataStreamSinkProvider)
-                    (providerContext, dataStream) -> dataStream.addSink(new RowSink(converter));
+            return new DataStreamSinkProvider() {
+
+                @Override
+                public DataStreamSink<?> consumeDataStream(
+                        ProviderContext providerContext, DataStream<RowData> dataStream) {
+                    return dataStream.addSink(new RowSink(converter));
+                }
+            };
         }
 
         @Override
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/DataStreamSinkProvider.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/DataStreamSinkProvider.java
index da0344ef721..c492bddf96e 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/DataStreamSinkProvider.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/DataStreamSinkProvider.java
@@ -54,8 +54,10 @@ public interface DataStreamSinkProvider
      *
      * @see SingleOutputStreamOperator#uid(String)
      */
-    DataStreamSink<?> consumeDataStream(
-            ProviderContext providerContext, DataStream<RowData> dataStream);
+    default DataStreamSink<?> consumeDataStream(
+            ProviderContext providerContext, DataStream<RowData> dataStream) {
+        return consumeDataStream(dataStream);
+    }
 
     /**
      * Consumes the given Java {@link DataStream} and returns the sink transformation {@link
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
index 2e6f2d62170..7fc4687363f 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
@@ -48,8 +48,10 @@ public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvi
      *
      * @see SingleOutputStreamOperator#uid(String)
      */
-    DataStream<RowData> produceDataStream(
-            ProviderContext providerContext, StreamExecutionEnvironment execEnv);
+    default DataStream<RowData> produceDataStream(
+            ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
+        return produceDataStream(execEnv);
+    }
 
     /** Creates a scan Java {@link DataStream} from a {@link StreamExecutionEnvironment}. */
     @Deprecated
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
index 96e3c950fee..cfdadfd07ec 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
 import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
@@ -32,6 +34,7 @@ import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.internal.ResultProvider;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
 import org.apache.flink.table.connector.RuntimeConverter;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -95,40 +98,40 @@ public final class CollectDynamicSink implements DynamicTableSink {
 
     @Override
     public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
-        return (DataStreamSinkProvider)
-                (providerContext, inputStream) -> {
-                    final CheckpointConfig checkpointConfig =
-                            inputStream.getExecutionEnvironment().getCheckpointConfig();
-                    final ExecutionConfig config = inputStream.getExecutionConfig();
-
-                    final TypeSerializer<RowData> externalSerializer =
-                            InternalTypeInfo.<RowData>of(consumedDataType.getLogicalType())
-                                    .createSerializer(config);
-                    final String accumulatorName = tableIdentifier.getObjectName();
-
-                    final CollectSinkOperatorFactory<RowData> factory =
-                            new CollectSinkOperatorFactory<>(
-                                    externalSerializer,
-                                    accumulatorName,
-                                    maxBatchSize,
-                                    socketTimeout);
-                    final CollectSinkOperator<RowData> operator =
-                            (CollectSinkOperator<RowData>) factory.getOperator();
-
-                    iterator =
-                            new CollectResultIterator<>(
-                                    operator.getOperatorIdFuture(),
-                                    externalSerializer,
-                                    accumulatorName,
-                                    checkpointConfig);
-                    converter = context.createDataStructureConverter(consumedDataType);
-                    converter.open(RuntimeConverter.Context.create(classLoader));
-
-                    final CollectStreamSink<RowData> sink =
-                            new CollectStreamSink<>(inputStream, factory);
-                    providerContext.generateUid(COLLECT_TRANSFORMATION).ifPresent(sink::uid);
-                    return sink.name("Collect table sink");
-                };
+        return new DataStreamSinkProvider() {
+            @Override
+            public DataStreamSink<?> consumeDataStream(
+                    ProviderContext providerContext, DataStream<RowData> inputStream) {
+                final CheckpointConfig checkpointConfig =
+                        inputStream.getExecutionEnvironment().getCheckpointConfig();
+                final ExecutionConfig config = inputStream.getExecutionConfig();
+
+                final TypeSerializer<RowData> externalSerializer =
+                        InternalTypeInfo.<RowData>of(consumedDataType.getLogicalType())
+                                .createSerializer(config);
+                final String accumulatorName = tableIdentifier.getObjectName();
+
+                final CollectSinkOperatorFactory<RowData> factory =
+                        new CollectSinkOperatorFactory<>(
+                                externalSerializer, accumulatorName, maxBatchSize, socketTimeout);
+                final CollectSinkOperator<RowData> operator =
+                        (CollectSinkOperator<RowData>) factory.getOperator();
+
+                iterator =
+                        new CollectResultIterator<>(
+                                operator.getOperatorIdFuture(),
+                                externalSerializer,
+                                accumulatorName,
+                                checkpointConfig);
+                converter = context.createDataStructureConverter(consumedDataType);
+                converter.open(RuntimeConverter.Context.create(classLoader));
+
+                final CollectStreamSink<RowData> sink =
+                        new CollectStreamSink<>(inputStream, factory);
+                providerContext.generateUid(COLLECT_TRANSFORMATION).ifPresent(sink::uid);
+                return sink.name("Collect table sink");
+            }
+        };
     }
 
     @Override
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
index 5548fa7ecf8..b0b86eb9c72 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.common;
 
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -33,6 +35,7 @@ import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.ProviderContext;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkProvider;
@@ -149,8 +152,14 @@ public class CommonExecSinkITCase extends AbstractTestBase {
                                     @Override
                                     public DataStreamSinkProvider getSinkRuntimeProvider(
                                             DynamicTableSink.Context context) {
-                                        return (providerContext, dataStream) ->
-                                                dataStream.addSink(sinkFunction);
+                                        return new DataStreamSinkProvider() {
+                                            @Override
+                                            public DataStreamSink<?> consumeDataStream(
+                                                    ProviderContext providerContext,
+                                                    DataStream<RowData> dataStream) {
+                                                return dataStream.addSink(sinkFunction);
+                                            }
+                                        };
                                     }
                                 })
                         .build();
@@ -443,8 +452,14 @@ public class CommonExecSinkITCase extends AbstractTestBase {
                                     @Override
                                     public DataStreamSinkProvider getSinkRuntimeProvider(
                                             DynamicTableSink.Context context) {
-                                        return (providerContext, dataStream) ->
-                                                dataStream.addSink(sinkFunction);
+                                        return new DataStreamSinkProvider() {
+                                            @Override
+                                            public DataStreamSink<?> consumeDataStream(
+                                                    ProviderContext providerContext,
+                                                    DataStream<RowData> dataStream) {
+                                                return dataStream.addSink(sinkFunction);
+                                            }
+                                        };
                                     }
                                 })
                         .build();
@@ -494,12 +509,17 @@ public class CommonExecSinkITCase extends AbstractTestBase {
         return new TableFactoryHarness.SinkBase() {
             @Override
             public DataStreamSinkProvider getSinkRuntimeProvider(Context context) {
-                return (providerContext, dataStream) -> {
-                    TestSink<RowData> sink = buildRecordWriterTestSink(new RecordWriter(fetched));
-                    if (useSinkV2) {
-                        return dataStream.sinkTo(SinkV1Adapter.wrap(sink));
+                return new DataStreamSinkProvider() {
+                    @Override
+                    public DataStreamSink<?> consumeDataStream(
+                            ProviderContext providerContext, DataStream<RowData> dataStream) {
+                        TestSink<RowData> sink =
+                                buildRecordWriterTestSink(new RecordWriter(fetched));
+                        if (useSinkV2) {
+                            return dataStream.sinkTo(SinkV1Adapter.wrap(sink));
+                        }
+                        return dataStream.sinkTo(sink);
                     }
-                    return dataStream.sinkTo(sink);
                 };
             }
         };