You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/02/09 09:11:42 UTC

[GitHub] [spark] zhengruifeng edited a comment on pull request #31468: [SPARK-34353][SQL] CollectLimitExec avoid shuffle if input rdd has single partition

zhengruifeng edited a comment on pull request #31468:
URL: https://github.com/apache/spark/pull/31468#issuecomment-775778416


   ```
   scala> spark.sql("CREATE TABLE t (key bigint, value string) USING parquet")
   res0: org.apache.spark.sql.DataFrame = []
   
   scala> spark.sql("SELECT COUNT(*) FROM t").head
   res1: org.apache.spark.sql.Row = [0]
   
   scala> val t = spark.table("t")
   t: org.apache.spark.sql.DataFrame = [key: bigint, value: string]
   
   scala> t.count
   res2: Long = 0
   
   scala> t.rdd.getNumPartitions
   res3: Int = 0
   
   scala> 
   
   scala> spark.sql(s"CACHE TABLE v1 AS SELECT * FROM t LIMIT 10")
   java.util.NoSuchElementException: next on empty iterator
     at scala.collection.Iterator$$anon$2.next(Iterator.scala:41)
     at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
     at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
     at scala.collection.IterableLike.head(IterableLike.scala:109)
     at scala.collection.IterableLike.head$(IterableLike.scala:108)
     at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:198)
     at scala.collection.IndexedSeqOptimized.head(IndexedSeqOptimized.scala:129)
     at scala.collection.IndexedSeqOptimized.head$(IndexedSeqOptimized.scala:129)
     at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:198)
     at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3024)
     at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3023)
     at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3705)
     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
     at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
     at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3703)
     at org.apache.spark.sql.Dataset.count(Dataset.scala:3023)
     at org.apache.spark.sql.execution.datasources.v2.BaseCacheTableExec.run(CacheTableExec.scala:65)
     at org.apache.spark.sql.execution.datasources.v2.BaseCacheTableExec.run$(CacheTableExec.scala:41)
     at org.apache.spark.sql.execution.datasources.v2.CacheTableAsSelectExec.run(CacheTableExec.scala:88)
     at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:42)
     at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:42)
     at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:48)
     at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
     at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3705)
     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
     at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
     at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3703)
     at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
     at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
     at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
     at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
     at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
     ... 47 elided
   
   scala> spark.sql("SELECT COUNT(*) FROM v1").head
   java.util.NoSuchElementException: next on empty iterator
     at scala.collection.Iterator$$anon$2.next(Iterator.scala:41)
     at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
     at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
     at scala.collection.IterableLike.head(IterableLike.scala:109)
     at scala.collection.IterableLike.head$(IterableLike.scala:108)
     at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:198)
     at scala.collection.IndexedSeqOptimized.head(IndexedSeqOptimized.scala:129)
     at scala.collection.IndexedSeqOptimized.head$(IndexedSeqOptimized.scala:129)
     at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:198)
     at org.apache.spark.sql.Dataset.head(Dataset.scala:2747)
     ... 47 elided
   
   scala> val v1 = spark.table("v1")
   v1: org.apache.spark.sql.DataFrame = [key: bigint, value: string]
   
   scala> v1.count
   java.util.NoSuchElementException: next on empty iterator
     at scala.collection.Iterator$$anon$2.next(Iterator.scala:41)
     at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
     at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
     at scala.collection.IterableLike.head(IterableLike.scala:109)
     at scala.collection.IterableLike.head$(IterableLike.scala:108)
     at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:198)
     at scala.collection.IndexedSeqOptimized.head(IndexedSeqOptimized.scala:129)
     at scala.collection.IndexedSeqOptimized.head$(IndexedSeqOptimized.scala:129)
     at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:198)
     at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3024)
     at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3023)
     at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3705)
     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
     at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
     at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
     at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3703)
     at org.apache.spark.sql.Dataset.count(Dataset.scala:3023)
     ... 47 elided
   
   scala> v1.rdd.getNumPartitions
   res7: Int = 0
   
   scala> v1.repartition(3).count
   res8: Long = 0
   ```
   
   `childRDD` may have no partition, then the number output partition is zero; 
   while in existing impl, the `ShuffledRowRDD` will make sure `SinglePartition`, so that this issue will not be triggered.
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org