You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/10 18:41:41 UTC

[GitHub] icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window)

icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240331939
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##########
 @@ -144,24 +282,107 @@ case class WindowInPandasExec(
         queue.close()
       }
 
-      val inputProj = UnsafeProjection.create(allInputs, child.output)
-      val pythonInput = grouped.map { case (_, rows) =>
-        rows.map { row =>
-          queue.add(row.asInstanceOf[UnsafeRow])
-          inputProj(row)
+      val stream = iter.map { row =>
+        queue.add(row.asInstanceOf[UnsafeRow])
+        row
+      }
+
+      val pythonInput = new Iterator[Iterator[UnsafeRow]] {
+
+        // Manage the stream and the grouping.
+        var nextRow: UnsafeRow = null
+        var nextGroup: UnsafeRow = null
+        var nextRowAvailable: Boolean = false
+        private[this] def fetchNextRow() {
+          nextRowAvailable = stream.hasNext
+          if (nextRowAvailable) {
+            nextRow = stream.next().asInstanceOf[UnsafeRow]
+            nextGroup = grouping(nextRow)
+          } else {
+            nextRow = null
+            nextGroup = null
+          }
+        }
+        fetchNextRow()
+
+        // Manage the current partition.
+        val buffer: ExternalAppendOnlyUnsafeRowArray =
+          new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
+        var bufferIterator: Iterator[UnsafeRow] = _
+
+        val indexRow = new SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType))
+
+        val frames = factories.map(_(indexRow))
+
+        private[this] def fetchNextPartition() {
+          // Collect all the rows in the current partition.
+          // Before we start to fetch new input rows, make a copy of nextGroup.
+          val currentGroup = nextGroup.copy()
+
+          // clear last partition
+          buffer.clear()
+
+          while (nextRowAvailable && nextGroup == currentGroup) {
 
 Review comment:
   Sure. Here https://github.com/apache/spark/pull/23279

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org