You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/24 14:52:16 UTC

[GitHub] [flink] leonardBang commented on a change in pull request #11853: [FLINK-15006][table-planner] Add option to shuffle-by-partition when dynamic inserting

leonardBang commented on a change in pull request #11853:
URL: https://github.com/apache/flink/pull/11853#discussion_r414634883



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java
##########
@@ -71,6 +71,12 @@
 			.withDescription("The default partition name in case the dynamic partition" +
 					" column value is null/empty string");
 
+	public static final ConfigOption<Boolean> SINK_SHUFFLE_BY_PARTITION = key("sink.shuffle-by-partition.enable")
+			.booleanType()
+			.defaultValue(false)
+			.withDescription("Before sink, can shuffle by dynamic partition fields to sink parallelisms," +
+					" this can greatly reduce the number of files. But will lead to data skew too.");
+

Review comment:
       How about this ?"The option to enable shuffle data by dynamic partition fields in sink phase, this can greatly reduce the number of file for filesystem sink but may lead data skew, the default value is disabled."

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecSinkRule.scala
##########
@@ -53,15 +53,20 @@ class StreamExecSinkRule extends ConverterRule(
             val dynamicPartIndices =
               dynamicPartFields.map(partitionSink.getTableSchema.getFieldNames.indexOf(_))
 
-            if (partitionSink.configurePartitionGrouping(false)) {
-              throw new TableException("Partition grouping in stream mode is not supported yet!")
-            }
+            val shuffleEnable = sinkNode
+                .catalogTable
+                .getProperties
+                .get(FileSystemTableFactory.SINK_SHUFFLE_BY_PARTITION.key())
 
-            if (!partitionSink.isInstanceOf[DataStreamTableSink[_]]) {

Review comment:
        `DataStreamTableSink` is a special Type, this change will make all stream sink nodes have chance to use the shuffle enable config? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org