You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/11/25 01:34:36 UTC

[GitHub] [spark] ueshin commented on a change in pull request #30242: [SPARK-33277][PYSPARK][SQL] Use ContextAwareIterator to stop consuming after the task ends.

ueshin commented on a change in pull request #30242:
URL: https://github.com/apache/spark/pull/30242#discussion_r530044246



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
##########
@@ -137,3 +139,47 @@ trait EvalPythonExec extends UnaryExecNode {
     }
   }
 }
+
+/**
+ * A TaskContext aware iterator.
+ *
+ * As the Python evaluation consumes the parent iterator in a separate thread,
+ * it could consume more data from the parent even after the task ends and the parent is closed.
+ * Thus, we should use ContextAwareIterator to stop consuming after the task ends.
+ */
+class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] {
+
+  val thread = new AtomicReference[Thread]()
+
+  if (iter.hasNext) {
+    val failed = new AtomicBoolean(false)
+
+    context.addTaskFailureListener { (_, _) =>
+      failed.set(true)
+    }
+
+    context.addTaskCompletionListener[Unit] { _ =>
+      var thread = this.thread.get()
+
+      while (thread == null && !failed.get()) {
+        // Wait for a while since the writer thread might not reach to consuming the iterator yet.
+        context.wait(10)

Review comment:
       I do mean `wait`. This will run within `synchronized(context)` and we should release the lock for the writer thread while waiting.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
##########
@@ -137,3 +139,47 @@ trait EvalPythonExec extends UnaryExecNode {
     }
   }
 }
+
+/**
+ * A TaskContext aware iterator.
+ *
+ * As the Python evaluation consumes the parent iterator in a separate thread,
+ * it could consume more data from the parent even after the task ends and the parent is closed.
+ * Thus, we should use ContextAwareIterator to stop consuming after the task ends.
+ */
+class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] {
+
+  val thread = new AtomicReference[Thread]()

Review comment:
       sure. 👍 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
##########
@@ -137,3 +139,47 @@ trait EvalPythonExec extends UnaryExecNode {
     }
   }
 }
+
+/**
+ * A TaskContext aware iterator.
+ *
+ * As the Python evaluation consumes the parent iterator in a separate thread,
+ * it could consume more data from the parent even after the task ends and the parent is closed.
+ * Thus, we should use ContextAwareIterator to stop consuming after the task ends.
+ */
+class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] {
+
+  val thread = new AtomicReference[Thread]()
+
+  if (iter.hasNext) {

Review comment:
       Actually this is to make sure the upstream iterator is initialized. The upstream iterator must be initialized earlier as it might register another completion listener and the listener should run later than this one.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
##########
@@ -137,3 +139,47 @@ trait EvalPythonExec extends UnaryExecNode {
     }
   }
 }
+
+/**
+ * A TaskContext aware iterator.
+ *
+ * As the Python evaluation consumes the parent iterator in a separate thread,
+ * it could consume more data from the parent even after the task ends and the parent is closed.
+ * Thus, we should use ContextAwareIterator to stop consuming after the task ends.
+ */
+class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] {
+
+  val thread = new AtomicReference[Thread]()
+
+  if (iter.hasNext) {
+    val failed = new AtomicBoolean(false)
+
+    context.addTaskFailureListener { (_, _) =>
+      failed.set(true)
+    }
+
+    context.addTaskCompletionListener[Unit] { _ =>

Review comment:
       The task completion lister will wait for the `thread` to stop within this listener, and the `thread` will stop soon as it checks `!context.isCompleted() && !context.isInterrupted()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org