You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/12/16 05:59:40 UTC

(spark) branch master updated: [MINOR][SQL][PYTHON] Rename Python plan node names

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ed9a3a8b959 [MINOR][SQL][PYTHON] Rename Python plan node names
ed9a3a8b959 is described below

commit ed9a3a8b959322155b11a45650911c0e26a1a921
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Fri Dec 15 21:59:26 2023 -0800

    [MINOR][SQL][PYTHON] Rename Python plan node names
    
    ### What changes were proposed in this pull request?
    
    This PR is a sort of a followup of https://github.com/apache/spark/pull/38624 that proposes to rename the plan nodes for Python as below:
    
    From:
    
    ```
    package org.apache.spark.sql.execution.python
    
    MapInBatchExec
    ├── MapInPandasExec
    └── *PythonMapInArrowExec* (and *PythonMapInArrow*)
    
    *FlatMapCoGroupsInPythonExec*
    ├── FlatMapCoGroupsInArrowExec
    └── FlatMapCoGroupsInPandasExec
    
    *FlatMapGroupsInPythonExec*
    ├── FlatMapGroupsInArrowExec
    └── FlatMapGroupsInPandasExec
    ```
    
    To:
    
    ```
    package org.apache.spark.sql.execution.python
    
    MapInBatchExec
    ├── MapInPandasExec
    └── *MapInArrowExec* (and *MapInArrow*)
    
    *FlatMapCoGroupsInBatchExec*
    ├── FlatMapCoGroupsInArrowExec
    └── FlatMapCoGroupsInPandasExec
    
    *FlatMapGroupsInBatchExec*
    ├── FlatMapGroupsInArrowExec
    └── FlatMapGroupsInPandasExec
    ```
    
    ### Why are the changes needed?
    
    To have the consistent names for Python related execution nodes.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing CI should pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44373 from HyukjinKwon/minor-arrow-rename.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala  | 2 +-
 python/pyspark/sql/pandas/map_ops.py                                | 2 +-
 .../org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala      | 2 +-
 .../apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala   | 6 +++---
 .../spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala   | 4 ++--
 .../org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala      | 2 +-
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala          | 4 ++--
 .../main/scala/org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++--
 .../spark/sql/execution/python/FlatMapCoGroupsInArrowExec.scala     | 2 +-
 ...pCoGroupsInPythonExec.scala => FlatMapCoGroupsInBatchExec.scala} | 2 +-
 .../spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala    | 2 +-
 .../spark/sql/execution/python/FlatMapGroupsInArrowExec.scala       | 2 +-
 ...atMapGroupsInPythonExec.scala => FlatMapGroupsInBatchExec.scala} | 2 +-
 .../spark/sql/execution/python/FlatMapGroupsInPandasExec.scala      | 2 +-
 .../python/{PythonMapInArrowExec.scala => MapInArrowExec.scala}     | 4 ++--
 15 files changed, 21 insertions(+), 21 deletions(-)

diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 4fb03e54ae3..0cd6ee035f7 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -551,7 +551,7 @@ class SparkConnectPlanner(
               baseRel,
               isBarrier)
           case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
-            logical.PythonMapInArrow(
+            logical.MapInArrow(
               pythonUdf,
               DataTypeUtils.toAttributes(pythonUdf.dataType.asInstanceOf[StructType]),
               baseRel,
diff --git a/python/pyspark/sql/pandas/map_ops.py b/python/pyspark/sql/pandas/map_ops.py
index 55aa3249530..60b36672ca5 100644
--- a/python/pyspark/sql/pandas/map_ops.py
+++ b/python/pyspark/sql/pandas/map_ops.py
@@ -220,7 +220,7 @@ class PandasMapOpsMixin:
             func, returnType=schema, functionType=PythonEvalType.SQL_MAP_ARROW_ITER_UDF
         )  # type: ignore[call-overload]
         udf_column = udf(*[self[col] for col in self.columns])
-        jdf = self._jdf.pythonMapInArrow(udf_column._jc.expr(), barrier)
+        jdf = self._jdf.mapInArrow(udf_column._jc.expr(), barrier)
         return DataFrame(jdf, self.sparkSession)
 
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 1ce984a39b2..abb2b4f1da5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -748,7 +748,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
             !o.isInstanceOf[Generate] &&
             !o.isInstanceOf[CreateVariable] &&
             !o.isInstanceOf[MapInPandas] &&
-            !o.isInstanceOf[PythonMapInArrow] &&
+            !o.isInstanceOf[MapInArrow] &&
             // Lateral join is checked in checkSubqueryExpression.
             !o.isInstanceOf[LateralJoin] =>
             // The rule above is used to check Aggregate operator.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
index 56ce3765836..2017b2e8eef 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
@@ -151,8 +151,8 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
         _.output.map(_.exprId.id),
         newMap => newMap.copy(output = newMap.output.map(_.newInstance())))
 
-    case p: PythonMapInArrow =>
-      deduplicateAndRenew[PythonMapInArrow](
+    case p: MapInArrow =>
+      deduplicateAndRenew[MapInArrow](
         existingRelations,
         p,
         _.output.map(_.exprId.id),
@@ -388,7 +388,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
         newVersion.copyTagsFrom(oldVersion)
         Seq((oldVersion, newVersion))
 
-      case oldVersion @ PythonMapInArrow(_, output, _, _)
+      case oldVersion @ MapInArrow(_, output, _, _)
         if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
         val newVersion = oldVersion.copy(output = output.map(_.newInstance()))
         newVersion.copyTagsFrom(oldVersion)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
index f5930c5272a..2664809d4b2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
@@ -89,7 +89,7 @@ case class MapInPandas(
  * Map partitions using a udf: iter(pyarrow.RecordBatch) -> iter(pyarrow.RecordBatch).
  * This is used by DataFrame.mapInArrow() in PySpark
  */
-case class PythonMapInArrow(
+case class MapInArrow(
     functionExpr: Expression,
     output: Seq[Attribute],
     child: LogicalPlan,
@@ -97,7 +97,7 @@ case class PythonMapInArrow(
 
   override val producedAttributes = AttributeSet(output)
 
-  override protected def withNewChildInternal(newChild: LogicalPlan): PythonMapInArrow =
+  override protected def withNewChildInternal(newChild: LogicalPlan): MapInArrow =
     copy(child = newChild)
 }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 93a2efbbf6d..40ce1a3133c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -741,7 +741,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {
       false)
     val output = DataTypeUtils.toAttributes(pythonUdf.dataType.asInstanceOf[StructType])
     val project = Project(Seq(UnresolvedAttribute("a")), testRelation)
-    val mapInArrow = PythonMapInArrow(
+    val mapInArrow = MapInArrow(
       pythonUdf,
       output,
       project,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e1fb87a9e1e..2256f9db8c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3551,10 +3551,10 @@ class Dataset[T] private[sql](
    * defines a transformation: `iter(pyarrow.RecordBatch)` -> `iter(pyarrow.RecordBatch)`.
    * Each partition is each iterator consisting of `pyarrow.RecordBatch`s as batches.
    */
-  private[sql] def pythonMapInArrow(func: PythonUDF, isBarrier: Boolean = false): DataFrame = {
+  private[sql] def mapInArrow(func: PythonUDF, isBarrier: Boolean = false): DataFrame = {
     Dataset.ofRows(
       sparkSession,
-      PythonMapInArrow(
+      MapInArrow(
         func,
         toAttributes(func.dataType.asInstanceOf[StructType]),
         logicalPlan,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 35070ac1d56..88061425892 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -838,8 +838,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           func, output, planLater(left), planLater(right)) :: Nil
       case logical.MapInPandas(func, output, child, isBarrier) =>
         execution.python.MapInPandasExec(func, output, planLater(child), isBarrier) :: Nil
-      case logical.PythonMapInArrow(func, output, child, isBarrier) =>
-        execution.python.PythonMapInArrowExec(func, output, planLater(child), isBarrier) :: Nil
+      case logical.MapInArrow(func, output, child, isBarrier) =>
+        execution.python.MapInArrowExec(func, output, planLater(child), isBarrier) :: Nil
       case logical.AttachDistributedSequence(attr, child) =>
         execution.python.AttachDistributedSequenceExec(attr, planLater(child)) :: Nil
       case logical.MapElements(f, _, _, objAttr, child) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInArrowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInArrowExec.scala
index 17c68a86b75..e9114041473 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInArrowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInArrowExec.scala
@@ -48,7 +48,7 @@ case class FlatMapCoGroupsInArrowExec(
     output: Seq[Attribute],
     left: SparkPlan,
     right: SparkPlan)
-  extends FlatMapCoGroupsInPythonExec {
+  extends FlatMapCoGroupsInBatchExec {
 
   protected val pythonEvalType: Int = PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala
similarity index 97%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPythonExec.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala
index f75b0019f10..97aa1495670 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInBatchExec.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.python.PandasGroupUtils._
 /**
  * Base class for Python-based FlatMapCoGroupsIn*Exec.
  */
-trait FlatMapCoGroupsInPythonExec extends SparkPlan with BinaryExecNode with PythonSQLMetrics {
+trait FlatMapCoGroupsInBatchExec extends SparkPlan with BinaryExecNode with PythonSQLMetrics {
   val leftGroup: Seq[Attribute]
   val rightGroup: Seq[Attribute]
   val func: Expression
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala
index 32d7748bcaa..78bede3388c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala
@@ -48,7 +48,7 @@ case class FlatMapCoGroupsInPandasExec(
     output: Seq[Attribute],
     left: SparkPlan,
     right: SparkPlan)
-  extends FlatMapCoGroupsInPythonExec {
+  extends FlatMapCoGroupsInBatchExec {
 
   protected val pythonEvalType: Int = PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInArrowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInArrowExec.scala
index b0dd800af8f..942aaf6e44c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInArrowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInArrowExec.scala
@@ -46,7 +46,7 @@ case class FlatMapGroupsInArrowExec(
     func: Expression,
     output: Seq[Attribute],
     child: SparkPlan)
-  extends FlatMapGroupsInPythonExec {
+  extends FlatMapGroupsInBatchExec {
 
   protected val pythonEvalType: Int = PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala
similarity index 97%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPythonExec.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala
index e5a00e2cc8e..5550ddf72a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.types.StructType
 /**
  * Base class for Python-based FlatMapGroupsIn*Exec.
  */
-trait FlatMapGroupsInPythonExec extends SparkPlan with UnaryExecNode with PythonSQLMetrics {
+trait FlatMapGroupsInBatchExec extends SparkPlan with UnaryExecNode with PythonSQLMetrics {
   val groupingAttributes: Seq[Attribute]
   val func: Expression
   val output: Seq[Attribute]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
index 88747899720..82c2f3fab20 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
@@ -44,7 +44,7 @@ case class FlatMapGroupsInPandasExec(
     func: Expression,
     output: Seq[Attribute],
     child: SparkPlan)
-  extends FlatMapGroupsInPythonExec {
+  extends FlatMapGroupsInBatchExec {
 
   protected val pythonEvalType: Int = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonMapInArrowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInArrowExec.scala
similarity index 96%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonMapInArrowExec.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInArrowExec.scala
index e5a457035c6..2b1d1928ffd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonMapInArrowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInArrowExec.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.SparkPlan
  * A relation produced by applying a function that takes an iterator of PyArrow's record batches
  * and outputs an iterator of PyArrow's record batches.
  */
-case class PythonMapInArrowExec(
+case class MapInArrowExec(
     func: Expression,
     output: Seq[Attribute],
     child: SparkPlan,
@@ -34,6 +34,6 @@ case class PythonMapInArrowExec(
 
   override protected val pythonEvalType: Int = PythonEvalType.SQL_MAP_ARROW_ITER_UDF
 
-  override protected def withNewChildInternal(newChild: SparkPlan): PythonMapInArrowExec =
+  override protected def withNewChildInternal(newChild: SparkPlan): MapInArrowExec =
     copy(child = newChild)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org