You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/07/05 00:23:02 UTC
[spark] branch master updated: [SPARK-28198][PYTHON][FOLLOW-UP]
Rename mapPartitionsInPandas to mapInPandas with a separate evaluation type
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5c55812 [SPARK-28198][PYTHON][FOLLOW-UP] Rename mapPartitionsInPandas to mapInPandas with a separate evaluation type
5c55812 is described below
commit 5c55812400e1e0a8aaeb50a50be106e80c916c86
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Fri Jul 5 09:22:41 2019 +0900
[SPARK-28198][PYTHON][FOLLOW-UP] Rename mapPartitionsInPandas to mapInPandas with a separate evaluation type
## What changes were proposed in this pull request?
This PR proposes to rename `mapPartitionsInPandas` to `mapInPandas` with a separate evaluation type .
Had an offline discussion with rxin, mengxr and cloud-fan
The reason is basically:
1. `SCALAR_ITER` doesn't make sense with `mapPartitionsInPandas`.
2. It cannot share the same Pandas UDF, for instance, at `select` and `mapPartitionsInPandas` unlike `GROUPED_AGG` because iterator's return type is different.
3. `mapPartitionsInPandas` -> `mapInPandas` - see https://github.com/apache/spark/pull/25044#issuecomment-508298552 and https://github.com/apache/spark/pull/25044#issuecomment-508299764
Renaming `SCALAR_ITER` as `MAP_ITER` is abandoned due to 2. reason.
For `XXX_ITER`, it might have to have a different interface in the future if we happen to add other versions of them. But this is an orthogonal topic with `mapPartitionsInPandas`.
## How was this patch tested?
Existing tests should cover.
Closes #25044 from HyukjinKwon/SPARK-28198.
Authored-by: HyukjinKwon <gu...@apache.org>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
python/pyspark/sql/dataframe.py | 13 +++++-------
python/pyspark/sql/functions.py | 5 ++++-
python/pyspark/sql/tests/test_pandas_udf_iter.py | 24 +++++++++++-----------
python/pyspark/sql/udf.py | 20 +++++++++++++++---
python/pyspark/worker.py | 7 +++++--
.../plans/logical/pythonLogicalOperators.scala | 4 ++--
.../main/scala/org/apache/spark/sql/Dataset.scala | 10 ++++-----
.../spark/sql/execution/SparkStrategies.scala | 4 ++--
...onsInPandasExec.scala => MapInPandasExec.scala} | 2 +-
9 files changed, 52 insertions(+), 37 deletions(-)
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 3f5d1ff..e666973 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2193,7 +2193,7 @@ class DataFrame(object):
_check_series_convert_timestamps_local_tz(pdf[field.name], timezone)
return pdf
- def mapPartitionsInPandas(self, udf):
+ def mapInPandas(self, udf):
"""
Maps each partition of the current :class:`DataFrame` using a pandas udf and returns
the result as a `DataFrame`.
@@ -2215,7 +2215,7 @@ class DataFrame(object):
... def filter_func(iterator):
... for pdf in iterator:
... yield pdf[pdf.id == 1]
- >>> df.mapPartitionsInPandas(filter_func).show() # doctest: +SKIP
+ >>> df.mapInPandas(filter_func).show() # doctest: +SKIP
+---+---+
| id|age|
+---+---+
@@ -2227,15 +2227,12 @@ class DataFrame(object):
"""
# Columns are special because hasattr always return True
if isinstance(udf, Column) or not hasattr(udf, 'func') \
- or udf.evalType != PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF:
+ or udf.evalType != PythonEvalType.SQL_MAP_PANDAS_ITER_UDF:
raise ValueError("Invalid udf: the udf argument must be a pandas_udf of type "
- "SCALAR_ITER.")
-
- if not isinstance(udf.returnType, StructType):
- raise ValueError("The returnType of the pandas_udf must be a StructType")
+ "MAP_ITER.")
udf_column = udf(*[self[col] for col in self.columns])
- jdf = self._jdf.mapPartitionsInPandas(udf_column._jc.expr())
+ jdf = self._jdf.mapInPandas(udf_column._jc.expr())
return DataFrame(jdf, self.sql_ctx)
def _collectAsArrow(self):
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 5d1e69e..bf33b9a 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2802,6 +2802,8 @@ class PandasUDFType(object):
GROUPED_AGG = PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
+ MAP_ITER = PythonEvalType.SQL_MAP_PANDAS_ITER_UDF
+
@since(1.3)
def udf(f=None, returnType=StringType()):
@@ -3278,7 +3280,8 @@ def pandas_udf(f=None, returnType=None, functionType=None):
if eval_type not in [PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
- PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]:
+ PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
+ PythonEvalType.SQL_MAP_PANDAS_ITER_UDF]:
raise ValueError("Invalid functionType: "
"functionType must be one the values from PandasUDFType")
diff --git a/python/pyspark/sql/tests/test_pandas_udf_iter.py b/python/pyspark/sql/tests/test_pandas_udf_iter.py
index c27cc5a..2a5709e 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_iter.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_iter.py
@@ -57,7 +57,7 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase):
ReusedSQLTestCase.tearDownClass()
def test_map_partitions_in_pandas(self):
- @pandas_udf('id long', PandasUDFType.SCALAR_ITER)
+ @pandas_udf('id long', PandasUDFType.MAP_ITER)
def func(iterator):
for pdf in iterator:
assert isinstance(pdf, pd.DataFrame)
@@ -65,7 +65,7 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase):
yield pdf
df = self.spark.range(10)
- actual = df.mapPartitionsInPandas(func).collect()
+ actual = df.mapInPandas(func).collect()
expected = df.collect()
self.assertEquals(actual, expected)
@@ -73,45 +73,45 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase):
data = [(1, "foo"), (2, None), (3, "bar"), (4, "bar")]
df = self.spark.createDataFrame(data, "a int, b string")
- @pandas_udf(df.schema, PandasUDFType.SCALAR_ITER)
+ @pandas_udf(df.schema, PandasUDFType.MAP_ITER)
def func(iterator):
for pdf in iterator:
assert isinstance(pdf, pd.DataFrame)
assert [d.name for d in list(pdf.dtypes)] == ['int32', 'object']
yield pdf
- actual = df.mapPartitionsInPandas(func).collect()
+ actual = df.mapInPandas(func).collect()
expected = df.collect()
self.assertEquals(actual, expected)
def test_different_output_length(self):
- @pandas_udf('a long', PandasUDFType.SCALAR_ITER)
+ @pandas_udf('a long', PandasUDFType.MAP_ITER)
def func(iterator):
for _ in iterator:
yield pd.DataFrame({'a': list(range(100))})
df = self.spark.range(10)
- actual = df.repartition(1).mapPartitionsInPandas(func).collect()
+ actual = df.repartition(1).mapInPandas(func).collect()
self.assertEquals(set((r.a for r in actual)), set(range(100)))
def test_empty_iterator(self):
- @pandas_udf('a int, b string', PandasUDFType.SCALAR_ITER)
+ @pandas_udf('a int, b string', PandasUDFType.MAP_ITER)
def empty_iter(_):
return iter([])
self.assertEqual(
- self.spark.range(10).mapPartitionsInPandas(empty_iter).count(), 0)
+ self.spark.range(10).mapInPandas(empty_iter).count(), 0)
def test_empty_rows(self):
- @pandas_udf('a int', PandasUDFType.SCALAR_ITER)
+ @pandas_udf('a int', PandasUDFType.MAP_ITER)
def empty_rows(_):
return iter([pd.DataFrame({'a': []})])
self.assertEqual(
- self.spark.range(10).mapPartitionsInPandas(empty_rows).count(), 0)
+ self.spark.range(10).mapInPandas(empty_rows).count(), 0)
def test_chain_map_partitions_in_pandas(self):
- @pandas_udf('id long', PandasUDFType.SCALAR_ITER)
+ @pandas_udf('id long', PandasUDFType.MAP_ITER)
def func(iterator):
for pdf in iterator:
assert isinstance(pdf, pd.DataFrame)
@@ -119,7 +119,7 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase):
yield pdf
df = self.spark.range(10)
- actual = df.mapPartitionsInPandas(func).mapPartitionsInPandas(func).collect()
+ actual = df.mapInPandas(func).mapInPandas(func).collect()
expected = df.collect()
self.assertEquals(actual, expected)
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 84be2d2..0944c87 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -42,7 +42,8 @@ def _create_udf(f, returnType, evalType):
if evalType in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
- PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF):
+ PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
+ PythonEvalType.SQL_MAP_PANDAS_ITER_UDF):
from pyspark.sql.utils import require_minimum_pyarrow_version
require_minimum_pyarrow_version()
@@ -135,6 +136,17 @@ class UserDefinedFunction(object):
else:
raise TypeError("Invalid returnType for grouped map Pandas "
"UDFs: returnType must be a StructType.")
+ elif self.evalType == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF:
+ if isinstance(self._returnType_placeholder, StructType):
+ try:
+ to_arrow_type(self._returnType_placeholder)
+ except TypeError:
+ raise NotImplementedError(
+ "Invalid returnType with map iterator Pandas UDFs: "
+ "%s is not supported" % str(self._returnType_placeholder))
+ else:
+ raise TypeError("Invalid returnType for map iterator Pandas "
+ "UDFs: returnType must be a StructType.")
elif self.evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:
try:
# StructType is not yet allowed as a return type, explicitly check here to fail fast
@@ -328,10 +340,12 @@ class UDFRegistration(object):
if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
- PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]:
+ PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
+ PythonEvalType.SQL_MAP_PANDAS_ITER_UDF]:
raise ValueError(
"Invalid f: f must be SQL_BATCHED_UDF, SQL_SCALAR_PANDAS_UDF, "
- "SQL_SCALAR_PANDAS_ITER_UDF, or SQL_GROUPED_AGG_PANDAS_UDF")
+ "SQL_SCALAR_PANDAS_ITER_UDF, SQL_GROUPED_AGG_PANDAS_UDF or "
+ "SQL_MAP_PANDAS_ITER_UDF.")
register_udf = UserDefinedFunction(f.func, returnType=f.returnType, name=name,
evalType=f.evalType,
deterministic=f.deterministic)
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 568902f..b34abd0 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -295,7 +295,10 @@ def read_udfs(pickleSer, infile, eval_type):
is_map_iter = eval_type == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF
if is_scalar_iter or is_map_iter:
- assert num_udfs == 1, "One SCALAR_ITER UDF expected here."
+ if is_scalar_iter:
+ assert num_udfs == 1, "One SCALAR_ITER UDF expected here."
+ if is_map_iter:
+ assert num_udfs == 1, "One MAP_ITER UDF expected here."
arg_offsets, udf = read_single_udf(
pickleSer, infile, eval_type, runner_conf, udf_index=0)
@@ -318,7 +321,7 @@ def read_udfs(pickleSer, infile, eval_type):
for result_batch, result_type in result_iter:
num_output_rows += len(result_batch)
assert is_map_iter or num_output_rows <= num_input_rows[0], \
- "Pandas SCALAR_ITER UDF outputted more rows than input rows."
+ "Pandas MAP_ITER UDF outputted more rows than input rows."
yield (result_batch, result_type)
if is_scalar_iter:
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
index 757e46a..83695e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
@@ -41,9 +41,9 @@ case class FlatMapGroupsInPandas(
/**
* Map partitions using an udf: iter(pandas.Dataframe) -> iter(pandas.DataFrame).
- * This is used by DataFrame.mapPartitionsInPandas()
+ * This is used by DataFrame.mapInPandas()
*/
-case class MapPartitionsInPandas(
+case class MapInPandas(
functionExpr: Expression,
output: Seq[Attribute],
child: LogicalPlan) extends UnaryNode {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index fe5b15c..147222c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2651,14 +2651,12 @@ class Dataset[T] private[sql](
* This function uses Apache Arrow as serialization format between Java executors and Python
* workers.
*/
- private[sql] def mapPartitionsInPandas(f: PythonUDF): DataFrame = {
+ private[sql] def mapInPandas(func: PythonUDF): DataFrame = {
Dataset.ofRows(
sparkSession,
- MapPartitionsInPandas(
- // Here, the evalType is SQL_SCALAR_PANDAS_ITER_UDF since we share the
- // same Pandas type. To avoid conflicts, it sets SQL_MAP_PANDAS_ITER_UDF here.
- f.copy(evalType = PythonEvalType.SQL_MAP_PANDAS_ITER_UDF),
- f.dataType.asInstanceOf[StructType].toAttributes,
+ MapInPandas(
+ func,
+ func.dataType.asInstanceOf[StructType].toAttributes,
logicalPlan))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ea0c970..c4d5a2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -682,8 +682,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
f, p, b, is, ot, planLater(child)) :: Nil
case logical.FlatMapGroupsInPandas(grouping, func, output, child) =>
execution.python.FlatMapGroupsInPandasExec(grouping, func, output, planLater(child)) :: Nil
- case logical.MapPartitionsInPandas(func, output, child) =>
- execution.python.MapPartitionsInPandasExec(func, output, planLater(child)) :: Nil
+ case logical.MapInPandas(func, output, child) =>
+ execution.python.MapInPandasExec(func, output, planLater(child)) :: Nil
case logical.MapElements(f, _, _, objAttr, child) =>
execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil
case logical.AppendColumns(f, _, _, in, out, child) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapPartitionsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala
similarity index 99%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapPartitionsInPandasExec.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala
index 814366c..2bb8081 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapPartitionsInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
* `org.apache.spark.sql.catalyst.plans.logical.MapPartitionsInRWithArrow`
*
*/
-case class MapPartitionsInPandasExec(
+case class MapInPandasExec(
func: Expression,
output: Seq[Attribute],
child: SparkPlan)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org