You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/06 02:25:39 UTC
[flink] branch master updated: [FLINK-28322][table-api] DataStreamScan(Sink)Provider's new method is not compatible
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 03a1a42d210 [FLINK-28322][table-api] DataStreamScan(Sink)Provider's new method is not compatible
03a1a42d210 is described below
commit 03a1a42d210bf30751a8a5fed295cd56b0ff34df
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Jul 6 10:06:52 2022 +0800
[FLINK-28322][table-api] DataStreamScan(Sink)Provider's new method is not compatible
This closes #20119
---
.../connector/file/table/FileSystemTableSink.java | 9 ++-
.../flink/connectors/hive/HiveTableSink.java | 10 ++-
.../connectors/kafka/table/KafkaDynamicSink.java | 56 ++++++++---------
.../org/apache/flink/table/utils/TestingSinks.java | 13 +++-
.../connector/sink/DataStreamSinkProvider.java | 6 +-
.../connector/source/DataStreamScanProvider.java | 6 +-
.../planner/connectors/CollectDynamicSink.java | 71 +++++++++++-----------
.../nodes/exec/common/CommonExecSinkITCase.java | 38 +++++++++---
8 files changed, 127 insertions(+), 82 deletions(-)
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
index 7ac5b385c13..81eb49a5457 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
@@ -136,8 +136,13 @@ public class FileSystemTableSink extends AbstractFileSystemTable
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
- return (DataStreamSinkProvider)
- (providerContext, dataStream) -> consume(providerContext, dataStream, sinkContext);
+ return new DataStreamSinkProvider() {
+ @Override
+ public DataStreamSink<?> consumeDataStream(
+ ProviderContext providerContext, DataStream<RowData> dataStream) {
+ return consume(providerContext, dataStream, sinkContext);
+ }
+ };
}
private DataStreamSink<?> consume(
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 43166709200..abbc2786994 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -163,9 +163,13 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
DataStructureConverter converter =
context.createDataStructureConverter(tableSchema.toRowDataType());
- return (DataStreamSinkProvider)
- (providerContext, dataStream) ->
- consume(providerContext, dataStream, context.isBounded(), converter);
+ return new DataStreamSinkProvider() {
+ @Override
+ public DataStreamSink<?> consumeDataStream(
+ ProviderContext providerContext, DataStream<RowData> dataStream) {
+ return consume(providerContext, dataStream, context.isBounded(), converter);
+ }
+ };
}
private DataStreamSink<?> consume(
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
index 8af1782ac95..041846dd78f 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
@@ -26,11 +26,13 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -215,34 +217,32 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada
upsertMode))
.build();
if (flushMode.isEnabled() && upsertMode) {
- return (DataStreamSinkProvider)
- (providerContext, dataStream) -> {
- final boolean objectReuse =
- dataStream
- .getExecutionEnvironment()
- .getConfig()
- .isObjectReuseEnabled();
- final ReducingUpsertSink<?> sink =
- new ReducingUpsertSink<>(
- kafkaSink,
- physicalDataType,
- keyProjection,
- flushMode,
- objectReuse
- ? createRowDataTypeSerializer(
- context,
- dataStream.getExecutionConfig())
- ::copy
- : rowData -> rowData);
- final DataStreamSink<RowData> end = dataStream.sinkTo(sink);
- providerContext
- .generateUid(UPSERT_KAFKA_TRANSFORMATION)
- .ifPresent(end::uid);
- if (parallelism != null) {
- end.setParallelism(parallelism);
- }
- return end;
- };
+ return new DataStreamSinkProvider() {
+ @Override
+ public DataStreamSink<?> consumeDataStream(
+ ProviderContext providerContext, DataStream<RowData> dataStream) {
+ final boolean objectReuse =
+ dataStream.getExecutionEnvironment().getConfig().isObjectReuseEnabled();
+ final ReducingUpsertSink<?> sink =
+ new ReducingUpsertSink<>(
+ kafkaSink,
+ physicalDataType,
+ keyProjection,
+ flushMode,
+ objectReuse
+ ? createRowDataTypeSerializer(
+ context,
+ dataStream.getExecutionConfig())
+ ::copy
+ : rowData -> rowData);
+ final DataStreamSink<RowData> end = dataStream.sinkTo(sink);
+ providerContext.generateUid(UPSERT_KAFKA_TRANSFORMATION).ifPresent(end::uid);
+ if (parallelism != null) {
+ end.setParallelism(parallelism);
+ }
+ return end;
+ }
+ };
}
return SinkV2Provider.of(kafkaSink, parallelism);
}
diff --git a/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java b/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java
index b7645a67a03..9e6867d993b 100644
--- a/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java
+++ b/flink-python/src/test/java/org/apache/flink/table/utils/TestingSinks.java
@@ -19,8 +19,11 @@
package org.apache.flink.table.utils;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
@@ -53,8 +56,14 @@ public class TestingSinks {
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final DataStructureConverter converter =
context.createDataStructureConverter(rowDataType);
- return (DataStreamSinkProvider)
- (providerContext, dataStream) -> dataStream.addSink(new RowSink(converter));
+ return new DataStreamSinkProvider() {
+
+ @Override
+ public DataStreamSink<?> consumeDataStream(
+ ProviderContext providerContext, DataStream<RowData> dataStream) {
+ return dataStream.addSink(new RowSink(converter));
+ }
+ };
}
@Override
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/DataStreamSinkProvider.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/DataStreamSinkProvider.java
index da0344ef721..c492bddf96e 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/DataStreamSinkProvider.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/DataStreamSinkProvider.java
@@ -54,8 +54,10 @@ public interface DataStreamSinkProvider
*
* @see SingleOutputStreamOperator#uid(String)
*/
- DataStreamSink<?> consumeDataStream(
- ProviderContext providerContext, DataStream<RowData> dataStream);
+ default DataStreamSink<?> consumeDataStream(
+ ProviderContext providerContext, DataStream<RowData> dataStream) {
+ return consumeDataStream(dataStream);
+ }
/**
* Consumes the given Java {@link DataStream} and returns the sink transformation {@link
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
index 2e6f2d62170..7fc4687363f 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
@@ -48,8 +48,10 @@ public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvi
*
* @see SingleOutputStreamOperator#uid(String)
*/
- DataStream<RowData> produceDataStream(
- ProviderContext providerContext, StreamExecutionEnvironment execEnv);
+ default DataStream<RowData> produceDataStream(
+ ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
+ return produceDataStream(execEnv);
+ }
/** Creates a scan Java {@link DataStream} from a {@link StreamExecutionEnvironment}. */
@Deprecated
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
index 96e3c950fee..cfdadfd07ec 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/CollectDynamicSink.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
@@ -32,6 +34,7 @@ import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.ResultProvider;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.RuntimeConverter;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -95,40 +98,40 @@ public final class CollectDynamicSink implements DynamicTableSink {
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
- return (DataStreamSinkProvider)
- (providerContext, inputStream) -> {
- final CheckpointConfig checkpointConfig =
- inputStream.getExecutionEnvironment().getCheckpointConfig();
- final ExecutionConfig config = inputStream.getExecutionConfig();
-
- final TypeSerializer<RowData> externalSerializer =
- InternalTypeInfo.<RowData>of(consumedDataType.getLogicalType())
- .createSerializer(config);
- final String accumulatorName = tableIdentifier.getObjectName();
-
- final CollectSinkOperatorFactory<RowData> factory =
- new CollectSinkOperatorFactory<>(
- externalSerializer,
- accumulatorName,
- maxBatchSize,
- socketTimeout);
- final CollectSinkOperator<RowData> operator =
- (CollectSinkOperator<RowData>) factory.getOperator();
-
- iterator =
- new CollectResultIterator<>(
- operator.getOperatorIdFuture(),
- externalSerializer,
- accumulatorName,
- checkpointConfig);
- converter = context.createDataStructureConverter(consumedDataType);
- converter.open(RuntimeConverter.Context.create(classLoader));
-
- final CollectStreamSink<RowData> sink =
- new CollectStreamSink<>(inputStream, factory);
- providerContext.generateUid(COLLECT_TRANSFORMATION).ifPresent(sink::uid);
- return sink.name("Collect table sink");
- };
+ return new DataStreamSinkProvider() {
+ @Override
+ public DataStreamSink<?> consumeDataStream(
+ ProviderContext providerContext, DataStream<RowData> inputStream) {
+ final CheckpointConfig checkpointConfig =
+ inputStream.getExecutionEnvironment().getCheckpointConfig();
+ final ExecutionConfig config = inputStream.getExecutionConfig();
+
+ final TypeSerializer<RowData> externalSerializer =
+ InternalTypeInfo.<RowData>of(consumedDataType.getLogicalType())
+ .createSerializer(config);
+ final String accumulatorName = tableIdentifier.getObjectName();
+
+ final CollectSinkOperatorFactory<RowData> factory =
+ new CollectSinkOperatorFactory<>(
+ externalSerializer, accumulatorName, maxBatchSize, socketTimeout);
+ final CollectSinkOperator<RowData> operator =
+ (CollectSinkOperator<RowData>) factory.getOperator();
+
+ iterator =
+ new CollectResultIterator<>(
+ operator.getOperatorIdFuture(),
+ externalSerializer,
+ accumulatorName,
+ checkpointConfig);
+ converter = context.createDataStructureConverter(consumedDataType);
+ converter.open(RuntimeConverter.Context.create(classLoader));
+
+ final CollectStreamSink<RowData> sink =
+ new CollectStreamSink<>(inputStream, factory);
+ providerContext.generateUid(COLLECT_TRANSFORMATION).ifPresent(sink::uid);
+ return sink.name("Collect table sink");
+ }
+ };
}
@Override
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
index 5548fa7ecf8..b0b86eb9c72 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.planner.plan.nodes.exec.common;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -33,6 +35,7 @@ import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkProvider;
@@ -149,8 +152,14 @@ public class CommonExecSinkITCase extends AbstractTestBase {
@Override
public DataStreamSinkProvider getSinkRuntimeProvider(
DynamicTableSink.Context context) {
- return (providerContext, dataStream) ->
- dataStream.addSink(sinkFunction);
+ return new DataStreamSinkProvider() {
+ @Override
+ public DataStreamSink<?> consumeDataStream(
+ ProviderContext providerContext,
+ DataStream<RowData> dataStream) {
+ return dataStream.addSink(sinkFunction);
+ }
+ };
}
})
.build();
@@ -443,8 +452,14 @@ public class CommonExecSinkITCase extends AbstractTestBase {
@Override
public DataStreamSinkProvider getSinkRuntimeProvider(
DynamicTableSink.Context context) {
- return (providerContext, dataStream) ->
- dataStream.addSink(sinkFunction);
+ return new DataStreamSinkProvider() {
+ @Override
+ public DataStreamSink<?> consumeDataStream(
+ ProviderContext providerContext,
+ DataStream<RowData> dataStream) {
+ return dataStream.addSink(sinkFunction);
+ }
+ };
}
})
.build();
@@ -494,12 +509,17 @@ public class CommonExecSinkITCase extends AbstractTestBase {
return new TableFactoryHarness.SinkBase() {
@Override
public DataStreamSinkProvider getSinkRuntimeProvider(Context context) {
- return (providerContext, dataStream) -> {
- TestSink<RowData> sink = buildRecordWriterTestSink(new RecordWriter(fetched));
- if (useSinkV2) {
- return dataStream.sinkTo(SinkV1Adapter.wrap(sink));
+ return new DataStreamSinkProvider() {
+ @Override
+ public DataStreamSink<?> consumeDataStream(
+ ProviderContext providerContext, DataStream<RowData> dataStream) {
+ TestSink<RowData> sink =
+ buildRecordWriterTestSink(new RecordWriter(fetched));
+ if (useSinkV2) {
+ return dataStream.sinkTo(SinkV1Adapter.wrap(sink));
+ }
+ return dataStream.sinkTo(sink);
}
- return dataStream.sinkTo(sink);
};
}
};