You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/05/08 00:51:45 UTC

[GitHub] [iceberg] RussellSpitzer edited a comment on issue #2533: spark sql MERGE INTO There is an error Error: Error running query: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(1, 0) (state=,code=0)

RussellSpitzer edited a comment on issue #2533:
URL: https://github.com/apache/iceberg/issues/2533#issuecomment-834913139


   OK So here is what happens,
   
   We build up this plan which has a sort merge join
   
   The sort merge join implements as a zippartitions rdd
   ```
   result = {MapPartitionsRDD@21132} "MapPartitionsRDD[49] at sql at <console>:26"
    isBarrier_ = false
    prev = {ZippedPartitionsRDD2@21135} "ZippedPartitionsRDD2[48] at sql at <console>:26"
     f = {SortMergeJoinExec$lambda@21156} "org.apache.spark.sql.execution.joins.SortMergeJoinExec$$Lambda$3615/2092599862@10710787"
      arg = {SortMergeJoinExec@21170} "SortMergeJoin [user_name#39, start_time#32], [user_name#36, end_time#35], FullOuter\n:- *(2) Sort [user_name#39 ASC NULLS FIRST, start_time#32 ASC NULLS FIRST], false, 0\n:  +- SortAggregate(key=[user_name#39], functions=[min(log_time#37), max(log_time#37)], output=[start_time#32, end_time#33, user_name#39, _row_from_source_#42])\n:     +- SortAggregate(key=[user_name#39], functions=[partial_min(log_time#37), partial_max(log_time#37)], output=[user_name#39, min#60, max#61])\n:        +- *(1) Sort [user_name#39 ASC NULLS FIRST], false, 0\n:           +- *(1) Project [log_time#37, user_name#39]\n:              +- *(1) Filter ((isnotnull(log_time#37) AND (log_time#37 >= 2021-05-06 12:05:00)) AND (log_time#37 < 2021-05-06 12:10:00))\n:                 +- BatchScan[log_time#37, user_name#39] iceberg.iceberg_db.user_logs [filters=log_time IS NOT NULL, log_time >= '2021-05-06 12:05:00', log_time < '2021-05-06 12:10:00']\n+- *(8) Sort [user_name#36 ASC 
 NULLS FIRST, end_time#35 ASC NULLS FIRST], false,"
      ```
      
      This is because of SortMergeJoinExec
      
      https://github.com/apache/spark/blob/94cac5978cf33f99a9f28180c9c909d5c884c152/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L132
      
      does the zip partitions. This code is not built for handling what happens when you try to zip a 0 partition RDD with a 1 Partition RDD.
      
      I think we basically just need to preempt our creation of a JoinPlan if either side of the join is an empty RDD and just bail out in the MergePlanning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org