You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/12/03 16:55:44 UTC
[flink] 05/08: [FLINK-24687][table-planner] Remove planner dependency on FileSystemConnectorOptions
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d12fd3d729f772d5c545d58987049bb9ce9f1da8
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Mon Nov 8 10:46:10 2021 +0100
[FLINK-24687][table-planner] Remove planner dependency on FileSystemConnectorOptions
Signed-off-by: slinkydeveloper <fr...@gmail.com>
---
.../rules/physical/batch/BatchPhysicalLegacySinkRule.scala | 11 ++++++-----
.../plan/rules/physical/batch/BatchPhysicalSinkRule.scala | 11 ++++++-----
.../rules/physical/stream/StreamPhysicalLegacySinkRule.scala | 11 ++++++-----
.../plan/rules/physical/stream/StreamPhysicalSinkRule.scala | 7 ++++---
4 files changed, 22 insertions(+), 18 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala
index edebe6c..72d8afd 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala
@@ -28,7 +28,6 @@ import org.apache.flink.table.sinks.PartitionableTableSink
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.{RelCollations, RelNode}
-import org.apache.flink.table.filesystem.FileSystemConnectorOptions
import scala.collection.JavaConversions._
@@ -53,12 +52,14 @@ class BatchPhysicalLegacySinkRule extends ConverterRule(
val dynamicPartIndices =
dynamicPartFields.map(partitionSink.getTableSchema.getFieldNames.indexOf(_))
+ // TODO This option is hardcoded to remove the dependency of planner from
+ // flink-connector-files. We should move this option out of FileSystemConnectorOptions
val shuffleEnable = sink
- .catalogTable
- .getOptions
- .get(FileSystemConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key())
+ .catalogTable
+ .getOptions
+ .getOrDefault("sink.shuffle-by-partition.enable", "false")
- if (shuffleEnable != null && shuffleEnable.toBoolean) {
+ if (shuffleEnable.toBoolean) {
requiredTraitSet = requiredTraitSet.plus(
FlinkRelDistribution.hash(dynamicPartIndices
.map(Integer.valueOf), requireStrict = false))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala
index 5a00b51..b9c9f8f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala
@@ -30,7 +30,6 @@ import org.apache.flink.table.types.logical.RowType
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.{RelCollationTraitDef, RelCollations, RelNode}
-import org.apache.flink.table.filesystem.FileSystemConnectorOptions
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -68,12 +67,14 @@ class BatchPhysicalSinkRule extends ConverterRule(
val dynamicPartIndices =
dynamicPartFields.map(fieldNames.indexOf(_))
+ // TODO This option is hardcoded to remove the dependency of planner from
+ // flink-connector-files. We should move this option out of FileSystemConnectorOptions
val shuffleEnable = sink
- .catalogTable
- .getOptions
- .get(FileSystemConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key())
+ .catalogTable
+ .getOptions
+ .getOrDefault("sink.shuffle-by-partition.enable", "false")
- if (shuffleEnable != null && shuffleEnable.toBoolean) {
+ if (shuffleEnable.toBoolean) {
requiredTraitSet = requiredTraitSet.plus(
FlinkRelDistribution.hash(dynamicPartIndices
.map(Integer.valueOf), requireStrict = false))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala
index 82af4cd..5286b81 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala
@@ -27,7 +27,6 @@ import org.apache.flink.table.sinks.PartitionableTableSink
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.table.filesystem.FileSystemConnectorOptions
import scala.collection.JavaConversions._
@@ -52,12 +51,14 @@ class StreamPhysicalLegacySinkRule extends ConverterRule(
val dynamicPartIndices =
dynamicPartFields.map(partitionSink.getTableSchema.getFieldNames.indexOf(_))
+ // TODO This option is hardcoded to remove the dependency of planner from
+ // flink-connector-files. We should move this option out of FileSystemConnectorOptions
val shuffleEnable = sink
- .catalogTable
- .getOptions
- .get(FileSystemConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key())
+ .catalogTable
+ .getOptions
+ .getOrDefault("sink.shuffle-by-partition.enable", "false")
- if (shuffleEnable != null && shuffleEnable.toBoolean) {
+ if (shuffleEnable.toBoolean) {
requiredTraitSet = requiredTraitSet.plus(
FlinkRelDistribution.hash(dynamicPartIndices
.map(Integer.valueOf), requireStrict = false))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
index 13645e5..b5867e4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
@@ -29,7 +29,6 @@ import org.apache.flink.table.types.logical.RowType
import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.table.filesystem.FileSystemConnectorOptions
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -67,12 +66,14 @@ class StreamPhysicalSinkRule extends ConverterRule(
val dynamicPartIndices =
dynamicPartFields.map(fieldNames.indexOf(_))
+ // TODO This option is hardcoded to remove the dependency of planner from
+ // flink-connector-files. We should move this option out of FileSystemConnectorOptions
val shuffleEnable = sink
.catalogTable
.getOptions
- .get(FileSystemConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key())
+ .getOrDefault("sink.shuffle-by-partition.enable", "false")
- if (shuffleEnable != null && shuffleEnable.toBoolean) {
+ if (shuffleEnable.toBoolean) {
requiredTraitSet = requiredTraitSet.plus(
FlinkRelDistribution.hash(dynamicPartIndices
.map(Integer.valueOf), requireStrict = false))