You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/11/05 07:24:19 UTC

[spark] branch branch-3.0 updated: Revert "[SPARK-33277][PYSPARK][SQL][3.0] Use ContextAwareIterator to stop consuming after the task ends"

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 74d8eac  Revert "[SPARK-33277][PYSPARK][SQL][3.0] Use ContextAwareIterator to stop consuming after the task ends"
74d8eac is described below

commit 74d8eacbe9cdc0b25a177543eb48ac54bd065cbb
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Thu Nov 5 16:15:44 2020 +0900

    Revert "[SPARK-33277][PYSPARK][SQL][3.0] Use ContextAwareIterator to stop consuming after the task ends"
    
    This reverts commit 92ba08d42d7317db58eb6fefe1e117d0ad84c78a.
---
 python/pyspark/sql/tests/test_pandas_map.py        | 22 ----------------------
 python/pyspark/sql/tests/test_pandas_udf_scalar.py | 19 -------------------
 python/pyspark/sql/tests/test_udf.py               | 20 --------------------
 .../sql/execution/python/EvalPythonExec.scala      | 18 +-----------------
 .../sql/execution/python/MapInPandasExec.scala     |  7 +++----
 5 files changed, 4 insertions(+), 82 deletions(-)

diff --git a/python/pyspark/sql/tests/test_pandas_map.py b/python/pyspark/sql/tests/test_pandas_map.py
index 3f82dfa..f1956a2 100644
--- a/python/pyspark/sql/tests/test_pandas_map.py
+++ b/python/pyspark/sql/tests/test_pandas_map.py
@@ -16,15 +16,12 @@
 #
 import os
 import sys
-import shutil
-import tempfile
 import time
 import unittest
 
 if sys.version >= '3':
     unicode = str
 
-from pyspark.sql import Row
 from pyspark.sql.functions import pandas_udf, PandasUDFType
 from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \
     pandas_requirement_message, pyarrow_requirement_message
@@ -120,25 +117,6 @@ class MapInPandasTests(ReusedSQLTestCase):
         expected = df.collect()
         self.assertEquals(actual, expected)
 
-    # SPARK-33277
-    def test_map_in_pandas_with_column_vector(self):
-        path = tempfile.mkdtemp()
-        shutil.rmtree(path)
-
-        try:
-            self.spark.range(0, 200000, 1, 1).write.parquet(path)
-
-            def func(iterator):
-                for pdf in iterator:
-                    yield pd.DataFrame({'id': [0] * len(pdf)})
-
-            for offheap in ["true", "false"]:
-                with self.sql_conf({"spark.sql.columnVector.offheap.enabled": offheap}):
-                    self.assertEquals(
-                        self.spark.read.parquet(path).mapInPandas(func, 'id long').head(), Row(0))
-        finally:
-            shutil.rmtree(path)
-
 
 if __name__ == "__main__":
     from pyspark.sql.tests.test_pandas_map import *
diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
index 375bb26..7260e80 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
@@ -1133,25 +1133,6 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         finally:
             shutil.rmtree(path)
 
-    # SPARK-33277
-    def test_pandas_udf_with_column_vector(self):
-        path = tempfile.mkdtemp()
-        shutil.rmtree(path)
-
-        try:
-            self.spark.range(0, 200000, 1, 1).write.parquet(path)
-
-            @pandas_udf(LongType())
-            def udf(x):
-                return pd.Series([0] * len(x))
-
-            for offheap in ["true", "false"]:
-                with self.sql_conf({"spark.sql.columnVector.offheap.enabled": offheap}):
-                    self.assertEquals(
-                        self.spark.read.parquet(path).select(udf('id')).head(), Row(0))
-        finally:
-            shutil.rmtree(path)
-
 
 if __name__ == "__main__":
     from pyspark.sql.tests.test_pandas_udf_scalar import *
diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py
index c6e6682..ea7ec9f 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -670,26 +670,6 @@ class UDFTests(ReusedSQLTestCase):
         r = df.select(fUdf(*df.columns))
         self.assertEqual(r.first()[0], "success")
 
-    # SPARK-33277
-    def test_udf_with_column_vector(self):
-        path = tempfile.mkdtemp()
-        shutil.rmtree(path)
-
-        try:
-            self.spark.range(0, 100000, 1, 1).write.parquet(path)
-
-            def f(x):
-                return 0
-
-            fUdf = udf(f, LongType())
-
-            for offheap in ["true", "false"]:
-                with self.sql_conf({"spark.sql.columnVector.offheap.enabled": offheap}):
-                    self.assertEquals(
-                        self.spark.read.parquet(path).select(fUdf('id')).head(), Row(0))
-        finally:
-            shutil.rmtree(path)
-
 
 class UDFInitializationTests(unittest.TestCase):
     def tearDown(self):
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index d49df0f..a0f23e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -88,7 +88,6 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute],
 
     inputRDD.mapPartitions { iter =>
       val context = TaskContext.get()
-      val contextAwareIterator = new ContextAwareIterator(iter, context)
 
       // The queue used to buffer input rows so we can drain it to
       // combine input with output from Python.
@@ -120,7 +119,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute],
       })
 
       // Add rows to queue to join later with the result.
-      val projectedRowIter = contextAwareIterator.map { inputRow =>
+      val projectedRowIter = iter.map { inputRow =>
         queue.add(inputRow.asInstanceOf[UnsafeRow])
         projection(inputRow)
       }
@@ -137,18 +136,3 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute],
     }
   }
 }
-
-/**
- * A TaskContext aware iterator.
- *
- * As the Python evaluation consumes the parent iterator in a separate thread,
- * it could consume more data from the parent even after the task ends and the parent is closed.
- * Thus, we should use ContextAwareIterator to stop consuming after the task ends.
- */
-class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] {
-
-  override def hasNext: Boolean =
-    !context.isCompleted() && !context.isInterrupted() && iter.hasNext
-
-  override def next(): IN = iter.next()
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala
index 7fc18f8..2bb8081 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala
@@ -61,17 +61,16 @@ case class MapInPandasExec(
       val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
       val outputTypes = child.schema
 
-      val context = TaskContext.get()
-      val contextAwareIterator = new ContextAwareIterator(inputIter, context)
-
       // Here we wrap it via another row so that Python sides understand it
       // as a DataFrame.
-      val wrappedIter = contextAwareIterator.map(InternalRow(_))
+      val wrappedIter = inputIter.map(InternalRow(_))
 
       // DO NOT use iter.grouped(). See BatchIterator.
       val batchIter =
         if (batchSize > 0) new BatchIterator(wrappedIter, batchSize) else Iterator(wrappedIter)
 
+      val context = TaskContext.get()
+
       val columnarBatchIter = new ArrowPythonRunner(
         chainedFunc,
         PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org