You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/05/10 07:22:37 UTC

[flink] branch master updated: [FLINK-22559][table-planner] The consumed DataType of ExecSink should only consider physical columns

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ef7843  [FLINK-22559][table-planner] The consumed DataType of ExecSink should only consider physical columns
1ef7843 is described below

commit 1ef78435ddd08c7743bb065981a7f7fadde1795a
Author: Leonard Xu <xb...@gmail.com>
AuthorDate: Sat May 8 15:40:30 2021 +0800

    [FLINK-22559][table-planner] The consumed DataType of ExecSink should only consider physical columns
    
    This closes #15864.
---
 .../kafka/table/UpsertKafkaTableITCase.java        |  2 --
 .../plan/nodes/exec/common/CommonExecSink.java     | 22 +++++++++++-----------
 2 files changed, 11 insertions(+), 13 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
index 1a26568..4f9d485 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaTableITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.utils.LegacyRowResource;
 import org.apache.flink.types.Row;
 
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -198,7 +197,6 @@ public class UpsertKafkaTableITCase extends KafkaTableTestBase {
     }
 
     @Test
-    @Ignore // FLINK-22559
     public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 90a0960..7a0c3a3 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -117,8 +117,8 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
         final DynamicTableSink.SinkRuntimeProvider runtimeProvider =
                 tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded));
         final ResolvedSchema schema = tableSinkSpec.getCatalogTable().getResolvedSchema();
-        final RowType consumedRowType = getConsumedRowType(schema);
-        inputTransform = applyNotNullEnforcer(tableConfig, consumedRowType, inputTransform);
+        final RowType physicalRowType = getPhysicalRowType(schema);
+        inputTransform = applyNotNullEnforcer(tableConfig, physicalRowType, inputTransform);
 
         if (runtimeProvider instanceof DataStreamSinkProvider) {
             if (runtimeProvider instanceof ParallelismProvider) {
@@ -150,7 +150,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
             // apply keyBy partition transformation if needed
             inputTransform =
                     applyKeyByForDifferentParallelism(
-                            consumedRowType,
+                            physicalRowType,
                             schema.getPrimaryKey().orElse(null),
                             inputTransform,
                             inputParallelism,
@@ -186,12 +186,12 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
      * Apply an operator to filter or report error to process not-null values for not-null fields.
      */
     private Transformation<RowData> applyNotNullEnforcer(
-            TableConfig config, RowType consumedRowType, Transformation<RowData> inputTransform) {
+            TableConfig config, RowType physicalRowType, Transformation<RowData> inputTransform) {
         final ExecutionConfigOptions.NotNullEnforcer notNullEnforcer =
                 config.getConfiguration()
                         .get(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER);
-        final int[] notNullFieldIndices = getNotNullFieldIndices(consumedRowType);
-        final String[] fieldNames = consumedRowType.getFieldNames().toArray(new String[0]);
+        final int[] notNullFieldIndices = getNotNullFieldIndices(physicalRowType);
+        final String[] fieldNames = physicalRowType.getFieldNames().toArray(new String[0]);
 
         if (notNullFieldIndices.length > 0) {
             final SinkNotNullEnforcer enforcer =
@@ -215,9 +215,9 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
         }
     }
 
-    private int[] getNotNullFieldIndices(RowType consumedType) {
-        return IntStream.range(0, consumedType.getFieldCount())
-                .filter(pos -> !consumedType.getTypeAt(pos).isNullable())
+    private int[] getNotNullFieldIndices(RowType physicalType) {
+        return IntStream.range(0, physicalType.getFieldCount())
+                .filter(pos -> !physicalType.getTypeAt(pos).isNullable())
                 .toArray();
     }
 
@@ -318,7 +318,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
         return InternalTypeInfo.of(getInputEdges().get(0).getOutputType());
     }
 
-    private RowType getConsumedRowType(ResolvedSchema schema) {
-        return (RowType) schema.toSinkRowDataType().getLogicalType();
+    private RowType getPhysicalRowType(ResolvedSchema schema) {
+        return (RowType) schema.toPhysicalRowDataType().getLogicalType();
     }
 }