You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/12/09 10:47:20 UTC

[GitHub] [spark] HyukjinKwon commented on a change in pull request #26809: [SPARK-30185][SQL] Implement Dataset.tail API

HyukjinKwon commented on a change in pull request #26809: [SPARK-30185][SQL] Implement Dataset.tail API
URL: https://github.com/apache/spark/pull/26809#discussion_r355370827
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
 ##########
 @@ -65,6 +65,46 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends LimitExec {
   }
 }
 
+/**
+ * Take the last `limit` elements and collect them to a single partition.
+ *
+ * This operator will be used when a logical `Tail` operation is the final operator in an
+ * logical plan, which happens when the user is collecting results back to the driver.
+ */
+case class CollectTailExec(limit: Int, child: SparkPlan) extends LimitExec {
+  override def output: Seq[Attribute] = child.output
+  override def outputPartitioning: Partitioning = SinglePartition
+  override def executeCollect(): Array[InternalRow] = child.executeTail(limit)
+  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
+  private lazy val writeMetrics =
+    SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+  private lazy val readMetrics =
+    SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+  override lazy val metrics = readMetrics ++ writeMetrics
+  protected override def doExecute(): RDD[InternalRow] = {
+    val locallyLimited = child.execute().mapPartitionsInternal { iter =>
+      val slidingIter = iter.sliding(limit)
 
 Review comment:
   This sliding Scala API - I manually tested after writing it manually (e.g., having a finite queue and loop once via `while`). There wasn't notable performance diff so I just decided to use `sliding` as it does what I want.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org