You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/05/10 07:23:59 UTC
[flink] branch release-1.13 updated: [FLINK-22559][table-planner]
The consumed DataType of ExecSink should only consider physical columns
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new cfa9a39 [FLINK-22559][table-planner] The consumed DataType of ExecSink should only consider physical columns
cfa9a39 is described below
commit cfa9a398ccb73ce38f9df0478bcbe547013c756b
Author: Leonard Xu <xb...@gmail.com>
AuthorDate: Sat May 8 15:40:30 2021 +0800
[FLINK-22559][table-planner] The consumed DataType of ExecSink should only consider physical columns
This closes #15864.
---
.../plan/nodes/exec/common/CommonExecSink.java | 22 +++++++++++-----------
1 file changed, 11 insertions(+), 11 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 90a0960..7a0c3a3 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -117,8 +117,8 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
final DynamicTableSink.SinkRuntimeProvider runtimeProvider =
tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded));
final ResolvedSchema schema = tableSinkSpec.getCatalogTable().getResolvedSchema();
- final RowType consumedRowType = getConsumedRowType(schema);
- inputTransform = applyNotNullEnforcer(tableConfig, consumedRowType, inputTransform);
+ final RowType physicalRowType = getPhysicalRowType(schema);
+ inputTransform = applyNotNullEnforcer(tableConfig, physicalRowType, inputTransform);
if (runtimeProvider instanceof DataStreamSinkProvider) {
if (runtimeProvider instanceof ParallelismProvider) {
@@ -150,7 +150,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
// apply keyBy partition transformation if needed
inputTransform =
applyKeyByForDifferentParallelism(
- consumedRowType,
+ physicalRowType,
schema.getPrimaryKey().orElse(null),
inputTransform,
inputParallelism,
@@ -186,12 +186,12 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
* Apply an operator to filter or report error to process not-null values for not-null fields.
*/
private Transformation<RowData> applyNotNullEnforcer(
- TableConfig config, RowType consumedRowType, Transformation<RowData> inputTransform) {
+ TableConfig config, RowType physicalRowType, Transformation<RowData> inputTransform) {
final ExecutionConfigOptions.NotNullEnforcer notNullEnforcer =
config.getConfiguration()
.get(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER);
- final int[] notNullFieldIndices = getNotNullFieldIndices(consumedRowType);
- final String[] fieldNames = consumedRowType.getFieldNames().toArray(new String[0]);
+ final int[] notNullFieldIndices = getNotNullFieldIndices(physicalRowType);
+ final String[] fieldNames = physicalRowType.getFieldNames().toArray(new String[0]);
if (notNullFieldIndices.length > 0) {
final SinkNotNullEnforcer enforcer =
@@ -215,9 +215,9 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
}
}
- private int[] getNotNullFieldIndices(RowType consumedType) {
- return IntStream.range(0, consumedType.getFieldCount())
- .filter(pos -> !consumedType.getTypeAt(pos).isNullable())
+ private int[] getNotNullFieldIndices(RowType physicalType) {
+ return IntStream.range(0, physicalType.getFieldCount())
+ .filter(pos -> !physicalType.getTypeAt(pos).isNullable())
.toArray();
}
@@ -318,7 +318,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
return InternalTypeInfo.of(getInputEdges().get(0).getOutputType());
}
- private RowType getConsumedRowType(ResolvedSchema schema) {
- return (RowType) schema.toSinkRowDataType().getLogicalType();
+ private RowType getPhysicalRowType(ResolvedSchema schema) {
+ return (RowType) schema.toPhysicalRowDataType().getLogicalType();
}
}