You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/12/16 05:59:40 UTC
(spark) branch master updated: [MINOR][SQL][PYTHON] Rename Python plan node names
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ed9a3a8b959 [MINOR][SQL][PYTHON] Rename Python plan node names
ed9a3a8b959 is described below
commit ed9a3a8b959322155b11a45650911c0e26a1a921
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Fri Dec 15 21:59:26 2023 -0800
[MINOR][SQL][PYTHON] Rename Python plan node names
### What changes were proposed in this pull request?
This PR is a sort of a followup of https://github.com/apache/spark/pull/38624 that proposes to rename the plan nodes for Python as below:
From:
```
package org.apache.spark.sql.execution.python
MapInBatchExec
├── MapInPandasExec
└── *PythonMapInArrowExec* (and *PythonMapInArrow*)
*FlatMapCoGroupsInPythonExec*
├── FlatMapCoGroupsInArrowExec
└── FlatMapCoGroupsInPandasExec
*FlatMapGroupsInPythonExec*
├── FlatMapGroupsInArrowExec
└── FlatMapGroupsInPandasExec
```
To:
```
package org.apache.spark.sql.execution.python
MapInBatchExec
├── MapInPandasExec
└── *MapInArrowExec* (and *MapInArrow*)
*FlatMapCoGroupsInBatchExec*
├── FlatMapCoGroupsInArrowExec
└── FlatMapCoGroupsInPandasExec
*FlatMapGroupsInBatchExec*
├── FlatMapGroupsInArrowExec
└── FlatMapGroupsInPandasExec
```
### Why are the changes needed?
To have the consistent names for Python related execution nodes.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing CI should pass.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44373 from HyukjinKwon/minor-arrow-rename.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala | 2 +-
python/pyspark/sql/pandas/map_ops.py | 2 +-
.../org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 2 +-
.../apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala | 6 +++---
.../spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala | 4 ++--
.../org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala | 2 +-
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 ++--
.../main/scala/org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++--
.../spark/sql/execution/python/FlatMapCoGroupsInArrowExec.scala | 2 +-
...pCoGroupsInPythonExec.scala => FlatMapCoGroupsInBatchExec.scala} | 2 +-
.../spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala | 2 +-
.../spark/sql/execution/python/FlatMapGroupsInArrowExec.scala | 2 +-
...atMapGroupsInPythonExec.scala => FlatMapGroupsInBatchExec.scala} | 2 +-
.../spark/sql/execution/python/FlatMapGroupsInPandasExec.scala | 2 +-
.../python/{PythonMapInArrowExec.scala => MapInArrowExec.scala} | 4 ++--
15 files changed, 21 insertions(+), 21 deletions(-)
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 4fb03e54ae3..0cd6ee035f7 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -551,7 +551,7 @@ class SparkConnectPlanner(
baseRel,
isBarrier)
case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
- logical.PythonMapInArrow(
+ logical.MapInArrow(
pythonUdf,
DataTypeUtils.toAttributes(pythonUdf.dataType.asInstanceOf[StructType]),
baseRel,
diff --git a/python/pyspark/sql/pandas/map_ops.py b/python/pyspark/sql/pandas/map_ops.py
index 55aa3249530..60b36672ca5 100644
--- a/python/pyspark/sql/pandas/map_ops.py
+++ b/python/pyspark/sql/pandas/map_ops.py
@@ -220,7 +220,7 @@ class PandasMapOpsMixin:
func, returnType=schema, functionType=PythonEvalType.SQL_MAP_ARROW_ITER_UDF
) # type: ignore[call-overload]
udf_column = udf(*[self[col] for col in self.columns])
- jdf = self._jdf.pythonMapInArrow(udf_column._jc.expr(), barrier)
+ jdf = self._jdf.mapInArrow(udf_column._jc.expr(), barrier)
return DataFrame(jdf, self.sparkSession)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 1ce984a39b2..abb2b4f1da5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -748,7 +748,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
!o.isInstanceOf[Generate] &&
!o.isInstanceOf[CreateVariable] &&
!o.isInstanceOf[MapInPandas] &&
- !o.isInstanceOf[PythonMapInArrow] &&
+ !o.isInstanceOf[MapInArrow] &&
// Lateral join is checked in checkSubqueryExpression.
!o.isInstanceOf[LateralJoin] =>
// The rule above is used to check Aggregate operator.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
index 56ce3765836..2017b2e8eef 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
@@ -151,8 +151,8 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
_.output.map(_.exprId.id),
newMap => newMap.copy(output = newMap.output.map(_.newInstance())))
- case p: PythonMapInArrow =>
- deduplicateAndRenew[PythonMapInArrow](
+ case p: MapInArrow =>
+ deduplicateAndRenew[MapInArrow](
existingRelations,
p,
_.output.map(_.exprId.id),
@@ -388,7 +388,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
newVersion.copyTagsFrom(oldVersion)
Seq((oldVersion, newVersion))
- case oldVersion @ PythonMapInArrow(_, output, _, _)
+ case oldVersion @ MapInArrow(_, output, _, _)
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
val newVersion = oldVersion.copy(output = output.map(_.newInstance()))
newVersion.copyTagsFrom(oldVersion)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
index f5930c5272a..2664809d4b2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
@@ -89,7 +89,7 @@ case class MapInPandas(
* Map partitions using a udf: iter(pyarrow.RecordBatch) -> iter(pyarrow.RecordBatch).
* This is used by DataFrame.mapInArrow() in PySpark
*/
-case class PythonMapInArrow(
+case class MapInArrow(
functionExpr: Expression,
output: Seq[Attribute],
child: LogicalPlan,
@@ -97,7 +97,7 @@ case class PythonMapInArrow(
override val producedAttributes = AttributeSet(output)
- override protected def withNewChildInternal(newChild: LogicalPlan): PythonMapInArrow =
+ override protected def withNewChildInternal(newChild: LogicalPlan): MapInArrow =
copy(child = newChild)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 93a2efbbf6d..40ce1a3133c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -741,7 +741,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {
false)
val output = DataTypeUtils.toAttributes(pythonUdf.dataType.asInstanceOf[StructType])
val project = Project(Seq(UnresolvedAttribute("a")), testRelation)
- val mapInArrow = PythonMapInArrow(
+ val mapInArrow = MapInArrow(
pythonUdf,
output,
project,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e1fb87a9e1e..2256f9db8c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3551,10 +3551,10 @@ class Dataset[T] private[sql](
* defines a transformation: `iter(pyarrow.RecordBatch)` -> `iter(pyarrow.RecordBatch)`.
* Each partition is each iterator consisting of `pyarrow.RecordBatch`s as batches.
*/
- private[sql] def pythonMapInArrow(func: PythonUDF, isBarrier: Boolean = false): DataFrame = {
+ private[sql] def mapInArrow(func: PythonUDF, isBarrier: Boolean = false): DataFrame = {
Dataset.ofRows(
sparkSession,
- PythonMapInArrow(
+ MapInArrow(
func,
toAttributes(func.dataType.asInstanceOf[StructType]),
logicalPlan,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 35070ac1d56..88061425892 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -838,8 +838,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
func, output, planLater(left), planLater(right)) :: Nil
case logical.MapInPandas(func, output, child, isBarrier) =>
execution.python.MapInPandasExec(func, output, planLater(child), isBarrier) :: Nil
- case logical.PythonMapInArrow(func, output, child, isBarrier) =>
- execution.python.PythonMapInArrowExec(func, output, planLater(child), isBarrier) :: Nil
+ case logical.MapInArrow(func, output, child, isBarrier) =>
+ execution.python.MapInArrowExec(func, output, planLater(child), isBarrier) :: Nil
case logical.AttachDistributedSequence(attr, child) =>
execution.python.AttachDistributedSequenceExec(attr, planLater(child)) :: Nil
case logical.MapElements(f, _, _, objAttr, child) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInArrowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInArrowExec.scala
index 17c68a86b75..e9114041473 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInArrowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInArrowExec.scala
@@ -48,7 +48,7 @@ case class FlatMapCoGroupsInArrowExec(
output: Seq[Attribute],
left: SparkPlan,
right: SparkPlan)
- extends FlatMapCoGroupsInPythonExec {
+ extends FlatMapCoGroupsInBatchExec {
protected val pythonEvalType: Int = PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala
similarity index 97%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPythonExec.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala
index f75b0019f10..97aa1495670 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.python.PandasGroupUtils._
/**
* Base class for Python-based FlatMapCoGroupsIn*Exec.
*/
-trait FlatMapCoGroupsInPythonExec extends SparkPlan with BinaryExecNode with PythonSQLMetrics {
+trait FlatMapCoGroupsInBatchExec extends SparkPlan with BinaryExecNode with PythonSQLMetrics {
val leftGroup: Seq[Attribute]
val rightGroup: Seq[Attribute]
val func: Expression
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala
index 32d7748bcaa..78bede3388c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala
@@ -48,7 +48,7 @@ case class FlatMapCoGroupsInPandasExec(
output: Seq[Attribute],
left: SparkPlan,
right: SparkPlan)
- extends FlatMapCoGroupsInPythonExec {
+ extends FlatMapCoGroupsInBatchExec {
protected val pythonEvalType: Int = PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInArrowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInArrowExec.scala
index b0dd800af8f..942aaf6e44c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInArrowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInArrowExec.scala
@@ -46,7 +46,7 @@ case class FlatMapGroupsInArrowExec(
func: Expression,
output: Seq[Attribute],
child: SparkPlan)
- extends FlatMapGroupsInPythonExec {
+ extends FlatMapGroupsInBatchExec {
protected val pythonEvalType: Int = PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala
similarity index 97%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPythonExec.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala
index e5a00e2cc8e..5550ddf72a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.types.StructType
/**
* Base class for Python-based FlatMapGroupsIn*Exec.
*/
-trait FlatMapGroupsInPythonExec extends SparkPlan with UnaryExecNode with PythonSQLMetrics {
+trait FlatMapGroupsInBatchExec extends SparkPlan with UnaryExecNode with PythonSQLMetrics {
val groupingAttributes: Seq[Attribute]
val func: Expression
val output: Seq[Attribute]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
index 88747899720..82c2f3fab20 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
@@ -44,7 +44,7 @@ case class FlatMapGroupsInPandasExec(
func: Expression,
output: Seq[Attribute],
child: SparkPlan)
- extends FlatMapGroupsInPythonExec {
+ extends FlatMapGroupsInBatchExec {
protected val pythonEvalType: Int = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonMapInArrowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInArrowExec.scala
similarity index 96%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonMapInArrowExec.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInArrowExec.scala
index e5a457035c6..2b1d1928ffd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonMapInArrowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInArrowExec.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.SparkPlan
* A relation produced by applying a function that takes an iterator of PyArrow's record batches
* and outputs an iterator of PyArrow's record batches.
*/
-case class PythonMapInArrowExec(
+case class MapInArrowExec(
func: Expression,
output: Seq[Attribute],
child: SparkPlan,
@@ -34,6 +34,6 @@ case class PythonMapInArrowExec(
override protected val pythonEvalType: Int = PythonEvalType.SQL_MAP_ARROW_ITER_UDF
- override protected def withNewChildInternal(newChild: SparkPlan): PythonMapInArrowExec =
+ override protected def withNewChildInternal(newChild: SparkPlan): MapInArrowExec =
copy(child = newChild)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org