You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/12/03 16:55:44 UTC

[flink] 05/08: [FLINK-24687][table-planner] Remove planner dependency on FileSystemConnectorOptions

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d12fd3d729f772d5c545d58987049bb9ce9f1da8
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Mon Nov 8 10:46:10 2021 +0100

    [FLINK-24687][table-planner] Remove planner dependency on FileSystemConnectorOptions
    
    Signed-off-by: slinkydeveloper <fr...@gmail.com>
---
 .../rules/physical/batch/BatchPhysicalLegacySinkRule.scala    | 11 ++++++-----
 .../plan/rules/physical/batch/BatchPhysicalSinkRule.scala     | 11 ++++++-----
 .../rules/physical/stream/StreamPhysicalLegacySinkRule.scala  | 11 ++++++-----
 .../plan/rules/physical/stream/StreamPhysicalSinkRule.scala   |  7 ++++---
 4 files changed, 22 insertions(+), 18 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala
index edebe6c..72d8afd 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala
@@ -28,7 +28,6 @@ import org.apache.flink.table.sinks.PartitionableTableSink
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.{RelCollations, RelNode}
-import org.apache.flink.table.filesystem.FileSystemConnectorOptions
 
 import scala.collection.JavaConversions._
 
@@ -53,12 +52,14 @@ class BatchPhysicalLegacySinkRule extends ConverterRule(
             val dynamicPartIndices =
               dynamicPartFields.map(partitionSink.getTableSchema.getFieldNames.indexOf(_))
 
+            // TODO This option is hardcoded to remove the dependency of planner from
+            //  flink-connector-files. We should move this option out of FileSystemConnectorOptions
             val shuffleEnable = sink
-                .catalogTable
-                .getOptions
-                .get(FileSystemConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key())
+              .catalogTable
+              .getOptions
+              .getOrDefault("sink.shuffle-by-partition.enable", "false")
 
-            if (shuffleEnable != null && shuffleEnable.toBoolean) {
+            if (shuffleEnable.toBoolean) {
               requiredTraitSet = requiredTraitSet.plus(
                 FlinkRelDistribution.hash(dynamicPartIndices
                     .map(Integer.valueOf), requireStrict = false))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala
index 5a00b51..b9c9f8f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSinkRule.scala
@@ -30,7 +30,6 @@ import org.apache.flink.table.types.logical.RowType
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.{RelCollationTraitDef, RelCollations, RelNode}
-import org.apache.flink.table.filesystem.FileSystemConnectorOptions
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -68,12 +67,14 @@ class BatchPhysicalSinkRule extends ConverterRule(
             val dynamicPartIndices =
               dynamicPartFields.map(fieldNames.indexOf(_))
 
+            // TODO This option is hardcoded to remove the dependency of planner from
+            //  flink-connector-files. We should move this option out of FileSystemConnectorOptions
             val shuffleEnable = sink
-                .catalogTable
-                .getOptions
-                .get(FileSystemConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key())
+              .catalogTable
+              .getOptions
+              .getOrDefault("sink.shuffle-by-partition.enable", "false")
 
-            if (shuffleEnable != null && shuffleEnable.toBoolean) {
+            if (shuffleEnable.toBoolean) {
               requiredTraitSet = requiredTraitSet.plus(
                 FlinkRelDistribution.hash(dynamicPartIndices
                     .map(Integer.valueOf), requireStrict = false))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala
index 82af4cd..5286b81 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalLegacySinkRule.scala
@@ -27,7 +27,6 @@ import org.apache.flink.table.sinks.PartitionableTableSink
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.table.filesystem.FileSystemConnectorOptions
 
 import scala.collection.JavaConversions._
 
@@ -52,12 +51,14 @@ class StreamPhysicalLegacySinkRule extends ConverterRule(
             val dynamicPartIndices =
               dynamicPartFields.map(partitionSink.getTableSchema.getFieldNames.indexOf(_))
 
+            // TODO This option is hardcoded to remove the dependency of planner from
+            //  flink-connector-files. We should move this option out of FileSystemConnectorOptions
             val shuffleEnable = sink
-                .catalogTable
-                .getOptions
-                .get(FileSystemConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key())
+              .catalogTable
+              .getOptions
+              .getOrDefault("sink.shuffle-by-partition.enable", "false")
 
-            if (shuffleEnable != null && shuffleEnable.toBoolean) {
+            if (shuffleEnable.toBoolean) {
               requiredTraitSet = requiredTraitSet.plus(
                 FlinkRelDistribution.hash(dynamicPartIndices
                     .map(Integer.valueOf), requireStrict = false))
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
index 13645e5..b5867e4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalSinkRule.scala
@@ -29,7 +29,6 @@ import org.apache.flink.table.types.logical.RowType
 import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.table.filesystem.FileSystemConnectorOptions
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -67,12 +66,14 @@ class StreamPhysicalSinkRule extends ConverterRule(
             val dynamicPartIndices =
               dynamicPartFields.map(fieldNames.indexOf(_))
 
+            // TODO This option is hardcoded to remove the dependency of planner from
+            //  flink-connector-files. We should move this option out of FileSystemConnectorOptions
             val shuffleEnable = sink
                 .catalogTable
                 .getOptions
-                .get(FileSystemConnectorOptions.SINK_SHUFFLE_BY_PARTITION.key())
+                .getOrDefault("sink.shuffle-by-partition.enable", "false")
 
-            if (shuffleEnable != null && shuffleEnable.toBoolean) {
+            if (shuffleEnable.toBoolean) {
               requiredTraitSet = requiredTraitSet.plus(
                 FlinkRelDistribution.hash(dynamicPartIndices
                     .map(Integer.valueOf), requireStrict = false))