You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/02/24 07:36:57 UTC
[flink] 01/05: [FLINK-26297][table-planner] Rename ShortcutUtils method to get TableConfig
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bd46985d1ddfea02b398a744f0521094a019b487
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Mon Feb 21 11:47:03 2022 +0200
[FLINK-26297][table-planner] Rename ShortcutUtils method to get TableConfig
---
.../java/org/apache/flink/table/planner/utils/ShortcutUtils.java | 2 +-
.../planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala | 6 ++++--
2 files changed, 5 insertions(+), 3 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
index a33dee1..90914da7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java
@@ -88,7 +88,7 @@ public final class ShortcutUtils {
return context.unwrap(FlinkContext.class);
}
- public static ReadableConfig unwrapConfig(RelNode relNode) {
+ public static ReadableConfig unwrapTableConfig(RelNode relNode) {
return unwrapContext(relNode).getTableConfig().getConfiguration();
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala
index f68a984..e58063f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalExchange
-import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapConfig
+import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.flink.table.planner.utils.StreamExchangeModeUtils.getBatchStreamExchangeMode
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
@@ -59,7 +59,9 @@ class BatchPhysicalExchange(
throw new UnsupportedOperationException("Range sort is not supported.")
}
- val exchangeMode = getBatchStreamExchangeMode(unwrapConfig(this), StreamExchangeMode.UNDEFINED)
+ val exchangeMode = getBatchStreamExchangeMode(
+ unwrapTableConfig(this),
+ StreamExchangeMode.UNDEFINED)
val damBehavior = if (exchangeMode eq StreamExchangeMode.BATCH) {
InputProperty.DamBehavior.BLOCKING