You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/11/02 00:15:10 UTC

[spark] branch branch-2.4 updated: [SPARK-33277][PYSPARK][SQL][2.4] Use ContextAwareIterator to stop consumin…

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new cabf957  [SPARK-33277][PYSPARK][SQL][2.4] Use ContextAwareIterator to stop consumin…
cabf957 is described below

commit cabf9571cd5bc620b5cddf5a4d003f29ed5b5459
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Mon Nov 2 09:07:44 2020 +0900

    [SPARK-33277][PYSPARK][SQL][2.4] Use ContextAwareIterator to stop consumin…
    
    ### What changes were proposed in this pull request?
    
    This is a backport of #30177.
    
    As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends.
    
    ### Why are the changes needed?
    
    Python/Pandas UDF right after off-heap vectorized reader could cause executor crash.
    
    E.g.,:
    
    ```py
    spark.range(0, 100000, 1, 1).write.parquet(path)
    
    spark.conf.set("spark.sql.columnVector.offheap.enabled", True)
    
    def f(x):
        return 0
    
    fUdf = udf(f, LongType())
    
    spark.read.parquet(path).select(fUdf('id')).head()
    ```
    
    This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added tests, and manually.
    
    Closes #30218 from ueshin/issues/SPARK-33277/2.4/python_pandas_udf.
    
    Authored-by: Takuya UESHIN <ue...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 python/pyspark/sql/tests.py                        | 42 ++++++++++++++++++++++
 .../sql/execution/python/EvalPythonExec.scala      | 18 +++++++++-
 2 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index b995227..8a25311 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3628,6 +3628,26 @@ class SQLTests(ReusedSQLTestCase):
         finally:
             self.spark.catalog.dropTempView("v")
 
+    # SPARK-33277
+    def test_udf_with_column_vector(self):
+        path = tempfile.mkdtemp()
+        shutil.rmtree(path)
+
+        try:
+            self.spark.range(0, 100000, 1, 1).write.parquet(path)
+
+            def f(x):
+                return 0
+
+            fUdf = udf(f, LongType())
+
+            for offheap in ["true", "false"]:
+                with self.sql_conf({"spark.sql.columnVector.offheap.enabled": offheap}):
+                    self.assertEquals(
+                        self.spark.read.parquet(path).select(fUdf('id')).head(), Row(0))
+        finally:
+            shutil.rmtree(path)
+
 
 class HiveSparkSubmitTests(SparkSubmitTests):
 
@@ -5575,6 +5595,28 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         finally:
             shutil.rmtree(path)
 
+    # SPARK-33277
+    def test_pandas_udf_with_column_vector(self):
+        import pandas as pd
+        from pyspark.sql.functions import pandas_udf
+
+        path = tempfile.mkdtemp()
+        shutil.rmtree(path)
+
+        try:
+            self.spark.range(0, 200000, 1, 1).write.parquet(path)
+
+            @pandas_udf(LongType())
+            def udf(x):
+                return pd.Series([0] * len(x))
+
+            for offheap in ["true", "false"]:
+                with self.sql_conf({"spark.sql.columnVector.offheap.enabled": offheap}):
+                    self.assertEquals(
+                        self.spark.read.parquet(path).select(udf('id')).head(), Row(0))
+        finally:
+            shutil.rmtree(path)
+
 
 @unittest.skipIf(
     not _have_pandas or not _have_pyarrow,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index 942a6db..293a7c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -88,6 +88,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil
 
     inputRDD.mapPartitions { iter =>
       val context = TaskContext.get()
+      val contextAwareIterator = new ContextAwareIterator(iter, context)
 
       // The queue used to buffer input rows so we can drain it to
       // combine input with output from Python.
@@ -119,7 +120,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil
       })
 
       // Add rows to queue to join later with the result.
-      val projectedRowIter = iter.map { inputRow =>
+      val projectedRowIter = contextAwareIterator.map { inputRow =>
         queue.add(inputRow.asInstanceOf[UnsafeRow])
         projection(inputRow)
       }
@@ -136,3 +137,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil
     }
   }
 }
+
+/**
+ * A TaskContext aware iterator.
+ *
+ * As the Python evaluation consumes the parent iterator in a separate thread,
+ * it could consume more data from the parent even after the task ends and the parent is closed.
+ * Thus, we should use ContextAwareIterator to stop consuming after the task ends.
+ */
+class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] {
+
+  override def hasNext: Boolean =
+    !context.isCompleted() && !context.isInterrupted() && iter.hasNext
+
+  override def next(): IN = iter.next()
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org