You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/18 11:28:24 UTC

[GitHub] [flink] wenlong88 commented on a change in pull request #14637: [FLINK-20949][table-planner-blink] Separate the implementation of sink nodes

wenlong88 commented on a change in pull request #14637:
URL: https://github.com/apache/flink/pull/14637#discussion_r559471925



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ParallelismProvider;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.sinks.TableSinkUtils;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer;
+import org.apache.flink.table.runtime.operators.sink.SinkOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Base {@link ExecNode} to write data to an external sink defined by a {@link DynamicTableSink}.
+ */
+public abstract class CommonExecSink extends ExecNodeBase<Object> {
+    protected final List<String> qualifiedName;
+    private final CatalogTable catalogTable;
+    private final DynamicTableSink tableSink;
+    private final ChangelogMode changelogMode;
+    private final boolean isBounded;
+
+    public CommonExecSink(
+            List<String> qualifiedName,
+            CatalogTable catalogTable,
+            DynamicTableSink tableSink,
+            ChangelogMode changelogMode,

Review comment:
       it may be better to use inputChangeLogMode here, because we may need to tell table sink the changelog mode again when rebuild the exec node.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ParallelismProvider;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.sinks.TableSinkUtils;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer;
+import org.apache.flink.table.runtime.operators.sink.SinkOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Base {@link ExecNode} to write data to an external sink defined by a {@link DynamicTableSink}.
+ */
+public abstract class CommonExecSink extends ExecNodeBase<Object> {
+    protected final List<String> qualifiedName;
+    private final CatalogTable catalogTable;

Review comment:
       the catalog table here is not necessary, just TableSchema needed? tableSchema + tableSink to describe a table sink.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ParallelismProvider;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.sinks.TableSinkUtils;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer;
+import org.apache.flink.table.runtime.operators.sink.SinkOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Base {@link ExecNode} to write data to an external sink defined by a {@link DynamicTableSink}.
+ */
+public abstract class CommonExecSink extends ExecNodeBase<Object> {
+    protected final List<String> qualifiedName;
+    private final CatalogTable catalogTable;
+    private final DynamicTableSink tableSink;
+    private final ChangelogMode changelogMode;
+    private final boolean isBounded;
+
+    public CommonExecSink(
+            List<String> qualifiedName,
+            CatalogTable catalogTable,
+            DynamicTableSink tableSink,
+            ChangelogMode changelogMode,
+            boolean isBounded,
+            ExecEdge inputEdge,
+            LogicalType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        this.tableSink = tableSink;
+        this.qualifiedName = qualifiedName;
+        this.catalogTable = catalogTable;
+        this.changelogMode = changelogMode;
+        this.isBounded = isBounded;
+    }
+
+    protected Transformation<Object> createSinkTransformation(
+            StreamExecutionEnvironment env,
+            TableConfig tableConfig,
+            Transformation<RowData> inputTransform,
+            int rowtimeFieldIndex) {
+        final DynamicTableSink.SinkRuntimeProvider runtimeProvider =
+                tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded));
+
+        final ExecutionConfigOptions.NotNullEnforcer notNullEnforcer =
+                tableConfig
+                        .getConfiguration()
+                        .get(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER);
+        final int[] notNullFieldIndices = TableSinkUtils.getNotNullFieldIndices(catalogTable);
+        final String[] fieldNames =
+                ((RowType) catalogTable.getSchema().toPhysicalRowDataType().getLogicalType())
+                        .getFieldNames()
+                        .toArray(new String[0]);
+        final SinkNotNullEnforcer enforcer =
+                new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, fieldNames);
+        final InternalTypeInfo<RowData> inputTypeInfo =
+                InternalTypeInfo.of(getInputNodes().get(0).getOutputType());
+
+        if (runtimeProvider instanceof DataStreamSinkProvider) {
+            if (runtimeProvider instanceof ParallelismProvider) {
+                throw new TableException(
+                        "`DataStreamSinkProvider` is not allowed to work with"
+                                + " `ParallelismProvider`, "
+                                + "please see document of `ParallelismProvider`");
+            } else {
+                final DataStream<RowData> dataStream =
+                        new DataStream<>(env, inputTransform).filter(enforcer);
+                final DataStreamSinkProvider provider = (DataStreamSinkProvider) runtimeProvider;
+                return provider.consumeDataStream(dataStream).getTransformation();
+            }
+        } else {
+            final SinkFunction<RowData> sinkFunction;
+            if (runtimeProvider instanceof SinkFunctionProvider) {
+                sinkFunction = ((SinkFunctionProvider) runtimeProvider).createSinkFunction();
+            } else if (runtimeProvider instanceof OutputFormatProvider) {
+                OutputFormat<RowData> outputFormat =
+                        ((OutputFormatProvider) runtimeProvider).createOutputFormat();
+                sinkFunction = new OutputFormatSinkFunction<>(outputFormat);
+            } else {
+                throw new TableException("This should not happen.");
+            }
+
+            if (sinkFunction instanceof InputTypeConfigurable) {
+                ((InputTypeConfigurable) sinkFunction).setInputType(inputTypeInfo, env.getConfig());
+            }
+
+            final SinkOperator operator =
+                    new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer);

Review comment:
       add check that runtimeProvider implements ParallelismProvider, assert in scala




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org