You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/05/12 03:37:39 UTC

[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2584: Spark: Fix MergeInto when Source and Target are SinglePartition Distr…

RussellSpitzer commented on a change in pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#discussion_r630701730



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DynamicFileFilterExec.scala
##########
@@ -48,8 +48,32 @@ abstract class DynamicFileFilterExecBase(
   override def outputOrdering: Seq[SortOrder] = scanExec.outputOrdering
   override def supportsColumnar: Boolean = scanExec.supportsColumnar
 
-  override protected def doExecute(): RDD[InternalRow] = scanExec.execute()
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = scanExec.executeColumnar()
+  /*
+  If both target and source have the same partitioning we can have a problem here if our filter exec actually
+  changes the partition. Currently this can only occur in the SinglePartition distribution is in use which only
+  happens if both the target and source have a single partition, but if it does we have the potential of eliminating
+  the only partition in the target. If there are no partitions in the target then we will throw an exception because
+  the partitioning was assumed to be the same 1 partition in source and target. We fix this by making sure that
+  we always return at least 1 empty partition, in the future we may need to handle more complicated partitioner
+  scenarios.
+   */
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val result = scanExec.execute()
+    if (result.partitions.length == 0) {

Review comment:
       This won't break the 0 task planning because we really only need to fix the case when there is no "exchange" being proposed. If the original scan had 0 tasks then there would be an exchange planned so the amount of output partitions we generate isn't important.
   
   I think we could also fix it in SparkMergeScan but i'm not sure that is a clearer fix than the change here.  What's the goal of moving the fix into SparkMergeScan? Just to make sure we don't change empty 0 partition rdds to empty 1 partition rdds?
   
   I think we are in the clear there since an RDD is defined as empty if either the partitions list is of size 0 or if the rdd.take(1) is empty so we are basically just changing one empty representation for another that works in this use-case. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org