You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/05/08 00:51:45 UTC
[GitHub] [iceberg] RussellSpitzer edited a comment on issue #2533: spark sql MERGE INTO There is an error Error: Error running query: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(1, 0) (state=,code=0)
RussellSpitzer edited a comment on issue #2533:
URL: https://github.com/apache/iceberg/issues/2533#issuecomment-834913139
OK So here is what happens,
We build up this plan which has a sort merge join
The sort merge join implements as a zippartitions rdd
```
result = {MapPartitionsRDD@21132} "MapPartitionsRDD[49] at sql at <console>:26"
isBarrier_ = false
prev = {ZippedPartitionsRDD2@21135} "ZippedPartitionsRDD2[48] at sql at <console>:26"
f = {SortMergeJoinExec$lambda@21156} "org.apache.spark.sql.execution.joins.SortMergeJoinExec$$Lambda$3615/2092599862@10710787"
arg = {SortMergeJoinExec@21170} "SortMergeJoin [user_name#39, start_time#32], [user_name#36, end_time#35], FullOuter\n:- *(2) Sort [user_name#39 ASC NULLS FIRST, start_time#32 ASC NULLS FIRST], false, 0\n: +- SortAggregate(key=[user_name#39], functions=[min(log_time#37), max(log_time#37)], output=[start_time#32, end_time#33, user_name#39, _row_from_source_#42])\n: +- SortAggregate(key=[user_name#39], functions=[partial_min(log_time#37), partial_max(log_time#37)], output=[user_name#39, min#60, max#61])\n: +- *(1) Sort [user_name#39 ASC NULLS FIRST], false, 0\n: +- *(1) Project [log_time#37, user_name#39]\n: +- *(1) Filter ((isnotnull(log_time#37) AND (log_time#37 >= 2021-05-06 12:05:00)) AND (log_time#37 < 2021-05-06 12:10:00))\n: +- BatchScan[log_time#37, user_name#39] iceberg.iceberg_db.user_logs [filters=log_time IS NOT NULL, log_time >= '2021-05-06 12:05:00', log_time < '2021-05-06 12:10:00']\n+- *(8) Sort [user_name#36 ASC
NULLS FIRST, end_time#35 ASC NULLS FIRST], false,"
```
This is because of SortMergeJoinExec
https://github.com/apache/spark/blob/94cac5978cf33f99a9f28180c9c909d5c884c152/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L132
does the zip partitions. This code is not built for handling what happens when you try to zip a 0 partition RDD with a 1 Partition RDD.
I think we basically just need to preempt our creation of a JoinPlan if either side of the join is an empty RDD and just bail out in the MergePlanning.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org