You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/02/09 09:11:42 UTC
[GitHub] [spark] zhengruifeng edited a comment on pull request #31468: [SPARK-34353][SQL] CollectLimitExec avoid shuffle if input rdd has single partition
zhengruifeng edited a comment on pull request #31468:
URL: https://github.com/apache/spark/pull/31468#issuecomment-775778416
```
scala> spark.sql("CREATE TABLE t (key bigint, value string) USING parquet")
res0: org.apache.spark.sql.DataFrame = []
scala> spark.sql("SELECT COUNT(*) FROM t").head
res1: org.apache.spark.sql.Row = [0]
scala> val t = spark.table("t")
t: org.apache.spark.sql.DataFrame = [key: bigint, value: string]
scala> t.count
res2: Long = 0
scala> t.rdd.getNumPartitions
res3: Int = 0
scala>
scala> spark.sql(s"CACHE TABLE v1 AS SELECT * FROM t LIMIT 10")
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:41)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
at scala.collection.IterableLike.head(IterableLike.scala:109)
at scala.collection.IterableLike.head$(IterableLike.scala:108)
at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:198)
at scala.collection.IndexedSeqOptimized.head(IndexedSeqOptimized.scala:129)
at scala.collection.IndexedSeqOptimized.head$(IndexedSeqOptimized.scala:129)
at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:198)
at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3024)
at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3023)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3705)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3703)
at org.apache.spark.sql.Dataset.count(Dataset.scala:3023)
at org.apache.spark.sql.execution.datasources.v2.BaseCacheTableExec.run(CacheTableExec.scala:65)
at org.apache.spark.sql.execution.datasources.v2.BaseCacheTableExec.run$(CacheTableExec.scala:41)
at org.apache.spark.sql.execution.datasources.v2.CacheTableAsSelectExec.run(CacheTableExec.scala:88)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:42)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:42)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:48)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3705)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3703)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:615)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:610)
... 47 elided
scala> spark.sql("SELECT COUNT(*) FROM v1").head
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:41)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
at scala.collection.IterableLike.head(IterableLike.scala:109)
at scala.collection.IterableLike.head$(IterableLike.scala:108)
at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:198)
at scala.collection.IndexedSeqOptimized.head(IndexedSeqOptimized.scala:129)
at scala.collection.IndexedSeqOptimized.head$(IndexedSeqOptimized.scala:129)
at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:198)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2747)
... 47 elided
scala> val v1 = spark.table("v1")
v1: org.apache.spark.sql.DataFrame = [key: bigint, value: string]
scala> v1.count
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:41)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
at scala.collection.IterableLike.head(IterableLike.scala:109)
at scala.collection.IterableLike.head$(IterableLike.scala:108)
at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:198)
at scala.collection.IndexedSeqOptimized.head(IndexedSeqOptimized.scala:129)
at scala.collection.IndexedSeqOptimized.head$(IndexedSeqOptimized.scala:129)
at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:198)
at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3024)
at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3023)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3705)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3703)
at org.apache.spark.sql.Dataset.count(Dataset.scala:3023)
... 47 elided
scala> v1.rdd.getNumPartitions
res7: Int = 0
scala> v1.repartition(3).count
res8: Long = 0
```
`childRDD` may have no partition, then the number output partition is zero;
while in existing impl, the `ShuffledRowRDD` will make sure `SinglePartition`, so that this issue will not be triggered.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org