You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/07/30 13:30:54 UTC

[spark] branch branch-3.2 updated: [SPARK-36338][PYTHON][SQL] Move distributed-sequence implementation to Scala side

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new fee87f1  [SPARK-36338][PYTHON][SQL] Move distributed-sequence implementation to Scala side
fee87f1 is described below

commit fee87f13d1be2fb018ff200d4f6dbfdb30f03a99
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Fri Jul 30 22:29:23 2021 +0900

    [SPARK-36338][PYTHON][SQL] Move distributed-sequence implementation to Scala side
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to implement `distributed-sequence` index in Scala side.
    
    ### Why are the changes needed?
    
    - Avoid unnecessary (de)serialization
    - Keep the nullability in the input DataFrame when `distributed-sequence` is enabled. During the serialization, all fields are being nullable for now (see https://github.com/apache/spark/pull/32775#discussion_r645882104)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No to end users since pandas API on Spark is not released yet.
    
    ```python
    import pyspark.pandas as ps
    ps.set_option('compute.default_index_type', 'distributed-sequence')
    ps.range(1).spark.print_schema()
    ```
    
    Before:
    
    ```
    root
     |-- id: long (nullable = true)
    ```
    
    After:
    
    ```
    root
     |-- id: long (nullable = false)
    ```
    
    ### How was this patch tested?
    
    Manually tested, and existing tests should cover them.
    
    Closes #33570 from HyukjinKwon/SPARK-36338.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit c6140d4d0af7f34152c1b681110a077be36f4068)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/pandas/accessors.py                 |  24 +---
 python/pyspark/pandas/config.py                    |   2 +-
 python/pyspark/pandas/indexes/base.py              |  16 +--
 python/pyspark/pandas/indexes/multi.py             |   2 +-
 python/pyspark/pandas/indexing.py                  |  16 +--
 python/pyspark/pandas/internal.py                  | 150 +++------------------
 python/pyspark/pandas/series.py                    |  12 +-
 python/pyspark/pandas/tests/test_dataframe.py      |  37 ++---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  25 ++++
 .../org/apache/spark/sql/DataFrameSuite.scala      |   6 +
 10 files changed, 89 insertions(+), 201 deletions(-)

diff --git a/python/pyspark/pandas/accessors.py b/python/pyspark/pandas/accessors.py
index 6454938..d71ded1 100644
--- a/python/pyspark/pandas/accessors.py
+++ b/python/pyspark/pandas/accessors.py
@@ -169,7 +169,7 @@ class PandasOnSparkFrameMethods(object):
                 for scol, label in zip(internal.data_spark_columns, internal.column_labels)
             ]
         )
-        sdf, force_nullable = attach_func(sdf, name_like_string(column))
+        sdf = attach_func(sdf, name_like_string(column))
 
         return DataFrame(
             InternalFrame(
@@ -178,28 +178,18 @@ class PandasOnSparkFrameMethods(object):
                     scol_for(sdf, SPARK_INDEX_NAME_FORMAT(i)) for i in range(internal.index_level)
                 ],
                 index_names=internal.index_names,
-                index_fields=(
-                    [field.copy(nullable=True) for field in internal.index_fields]
-                    if force_nullable
-                    else internal.index_fields
-                ),
+                index_fields=internal.index_fields,
                 column_labels=internal.column_labels + [column],
                 data_spark_columns=(
                     [scol_for(sdf, name_like_string(label)) for label in internal.column_labels]
                     + [scol_for(sdf, name_like_string(column))]
                 ),
-                data_fields=(
-                    (
-                        [field.copy(nullable=True) for field in internal.data_fields]
-                        if force_nullable
-                        else internal.data_fields
+                data_fields=internal.data_fields
+                + [
+                    InternalField.from_struct_field(
+                        StructField(name_like_string(column), LongType(), nullable=False)
                     )
-                    + [
-                        InternalField.from_struct_field(
-                            StructField(name_like_string(column), LongType(), nullable=False)
-                        )
-                    ]
-                ),
+                ],
                 column_label_names=internal.column_label_names,
             ).resolved_copy
         )
diff --git a/python/pyspark/pandas/config.py b/python/pyspark/pandas/config.py
index b03f2e1..10acc8c 100644
--- a/python/pyspark/pandas/config.py
+++ b/python/pyspark/pandas/config.py
@@ -175,7 +175,7 @@ _options = [
     Option(
         key="compute.default_index_type",
         doc=("This sets the default index type: sequence, distributed and distributed-sequence."),
-        default="sequence",
+        default="distributed-sequence",
         types=str,
         check_func=(
             lambda v: v in ("sequence", "distributed", "distributed-sequence"),
diff --git a/python/pyspark/pandas/indexes/base.py b/python/pyspark/pandas/indexes/base.py
index 4158d61..6c842bc 100644
--- a/python/pyspark/pandas/indexes/base.py
+++ b/python/pyspark/pandas/indexes/base.py
@@ -1692,9 +1692,7 @@ class Index(IndexOpsMixin):
         ]
         sdf = sdf.select(index_value_columns)
 
-        sdf, force_nullable = InternalFrame.attach_default_index(
-            sdf, default_index_type="distributed-sequence"
-        )
+        sdf = InternalFrame.attach_default_index(sdf, default_index_type="distributed-sequence")
         # sdf here looks as below
         # +-----------------+-----------------+-----------------+-----------------+
         # |__index_level_0__|__index_value_0__|__index_value_1__|__index_value_2__|
@@ -1727,11 +1725,7 @@ class Index(IndexOpsMixin):
                 scol_for(sdf, col) for col in self._internal.index_spark_column_names
             ],
             index_names=self._internal.index_names,
-            index_fields=(
-                [field.copy(nullable=True) for field in self._internal.index_fields]
-                if force_nullable
-                else self._internal.index_fields
-            ),
+            index_fields=self._internal.index_fields,
         )
 
         return DataFrame(internal).index
@@ -1829,7 +1823,7 @@ class Index(IndexOpsMixin):
         """
         sdf = self._internal.spark_frame.select(self.spark.column)
         sequence_col = verify_temp_column_name(sdf, "__distributed_sequence_column__")
-        sdf, _ = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
+        sdf = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
         # spark_frame here looks like below
         # +-----------------+---------------+
         # |__index_level_0__|__index_value__|
@@ -1877,7 +1871,7 @@ class Index(IndexOpsMixin):
         """
         sdf = self._internal.spark_frame.select(self.spark.column)
         sequence_col = verify_temp_column_name(sdf, "__distributed_sequence_column__")
-        sdf, _ = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
+        sdf = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
 
         return (
             sdf.orderBy(
@@ -2475,7 +2469,7 @@ class Index(IndexOpsMixin):
                 scol_for(sdf, col) for col in self._internal.index_spark_column_names
             ],
             index_names=self._internal.index_names,
-            index_fields=[field.copy(nullable=True) for field in self._internal.index_fields],
+            index_fields=[InternalField(field.dtype) for field in self._internal.index_fields],
         )
         return DataFrame(internal).index
 
diff --git a/python/pyspark/pandas/indexes/multi.py b/python/pyspark/pandas/indexes/multi.py
index 1355917..4b5ec04 100644
--- a/python/pyspark/pandas/indexes/multi.py
+++ b/python/pyspark/pandas/indexes/multi.py
@@ -1054,7 +1054,7 @@ class MultiIndex(Index):
                 scol_for(sdf, col) for col in self._internal.index_spark_column_names
             ],
             index_names=self._internal.index_names,
-            index_fields=[field.copy(nullable=True) for field in self._internal.index_fields],
+            index_fields=[InternalField(field.dtype) for field in self._internal.index_fields],
         )
         return DataFrame(internal).index
 
diff --git a/python/pyspark/pandas/indexing.py b/python/pyspark/pandas/indexing.py
index d36c453..bf74f06 100644
--- a/python/pyspark/pandas/indexing.py
+++ b/python/pyspark/pandas/indexing.py
@@ -1536,22 +1536,10 @@ class iLocIndexer(LocIndexerLike):
     def _internal(self) -> "InternalFrame":
         # Use resolved_copy to fix the natural order.
         internal = super()._internal.resolved_copy
-        sdf, force_nullable = InternalFrame.attach_distributed_sequence_column(
+        sdf = InternalFrame.attach_distributed_sequence_column(
             internal.spark_frame, column_name=self._sequence_col
         )
-        return internal.with_new_sdf(
-            spark_frame=sdf.orderBy(NATURAL_ORDER_COLUMN_NAME),
-            index_fields=(
-                [field.copy(nullable=True) for field in internal.index_fields]
-                if force_nullable
-                else internal.index_fields
-            ),
-            data_fields=(
-                [field.copy(nullable=True) for field in internal.data_fields]
-                if force_nullable
-                else internal.data_fields
-            ),
-        )
+        return internal.with_new_sdf(spark_frame=sdf.orderBy(NATURAL_ORDER_COLUMN_NAME))
 
     @lazy_property
     def _sequence_col(self) -> str:
diff --git a/python/pyspark/pandas/internal.py b/python/pyspark/pandas/internal.py
index 0c7d9d5..53bb964 100644
--- a/python/pyspark/pandas/internal.py
+++ b/python/pyspark/pandas/internal.py
@@ -20,15 +20,12 @@ An internal immutable DataFrame with some metadata to manage indexes.
 """
 import re
 from typing import Any, Dict, List, Optional, Sequence, Tuple, Union, TYPE_CHECKING, cast
-from itertools import accumulate
-import py4j
 
 import numpy as np
 import pandas as pd
 from pandas.api.types import CategoricalDtype  # noqa: F401
 from pyspark._globals import _NoValue, _NoValueType
 from pyspark.sql import functions as F, Column, DataFrame as SparkDataFrame, Window
-from pyspark.sql.functions import pandas_udf
 from pyspark.sql.types import (  # noqa: F401
     BooleanType,
     DataType,
@@ -64,7 +61,6 @@ from pyspark.pandas.utils import (
     name_like_string,
     scol_for,
     spark_column_equals,
-    verify_temp_column_name,
 )
 
 
@@ -636,7 +632,7 @@ class InternalFrame(object):
             )
 
             # Create default index.
-            spark_frame, force_nullable = InternalFrame.attach_default_index(spark_frame)
+            spark_frame = InternalFrame.attach_default_index(spark_frame)
             index_spark_columns = [scol_for(spark_frame, SPARK_DEFAULT_INDEX_NAME)]
 
             index_fields = [
@@ -658,7 +654,6 @@ class InternalFrame(object):
                     data_fields = [
                         field.copy(
                             name=name_like_string(struct_field.name),
-                            nullable=(force_nullable or field.nullable),
                         )
                         for field, struct_field in zip(data_fields, data_struct_fields)
                     ]
@@ -836,7 +831,7 @@ class InternalFrame(object):
     @staticmethod
     def attach_default_index(
         sdf: SparkDataFrame, default_index_type: Optional[str] = None
-    ) -> Tuple[SparkDataFrame, bool]:
+    ) -> SparkDataFrame:
         """
         This method attaches a default index to Spark DataFrame. Spark does not have the index
         notion so corresponding column should be generated.
@@ -848,13 +843,13 @@ class InternalFrame(object):
 
         It adds the default index column '__index_level_0__'.
 
-        >>> spark_frame = InternalFrame.attach_default_index(spark_frame)[0]
+        >>> spark_frame = InternalFrame.attach_default_index(spark_frame)
         >>> spark_frame
         DataFrame[__index_level_0__: bigint, id: bigint]
 
         It throws an exception if the given column name already exists.
 
-        >>> InternalFrame.attach_default_index(spark_frame)[0]
+        >>> InternalFrame.attach_default_index(spark_frame)
         ... # doctest: +ELLIPSIS
         Traceback (most recent call last):
           ...
@@ -881,34 +876,26 @@ class InternalFrame(object):
             )
 
     @staticmethod
-    def attach_sequence_column(
-        sdf: SparkDataFrame, column_name: str
-    ) -> Tuple[SparkDataFrame, bool]:
+    def attach_sequence_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
         scols = [scol_for(sdf, column) for column in sdf.columns]
         sequential_index = (
             F.row_number().over(Window.orderBy(F.monotonically_increasing_id())).cast("long") - 1
         )
-        return sdf.select(sequential_index.alias(column_name), *scols), False
+        return sdf.select(sequential_index.alias(column_name), *scols)
 
     @staticmethod
-    def attach_distributed_column(
-        sdf: SparkDataFrame, column_name: str
-    ) -> Tuple[SparkDataFrame, bool]:
+    def attach_distributed_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
         scols = [scol_for(sdf, column) for column in sdf.columns]
-        return sdf.select(F.monotonically_increasing_id().alias(column_name), *scols), False
+        return sdf.select(F.monotonically_increasing_id().alias(column_name), *scols)
 
     @staticmethod
-    def attach_distributed_sequence_column(
-        sdf: SparkDataFrame, column_name: str
-    ) -> Tuple[SparkDataFrame, bool]:
+    def attach_distributed_sequence_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
         """
         This method attaches a Spark column that has a sequence in a distributed manner.
         This is equivalent to the column assigned when default index type 'distributed-sequence'.
 
         >>> sdf = ps.DataFrame(['a', 'b', 'c']).to_spark()
-        >>> sdf, force_nullable = (
-        ...     InternalFrame.attach_distributed_sequence_column(sdf, column_name="sequence")
-        ... )
+        >>> sdf = InternalFrame.attach_distributed_sequence_column(sdf, column_name="sequence")
         >>> sdf.show()  # doctest: +NORMALIZE_WHITESPACE
         +--------+---+
         |sequence|  0|
@@ -917,124 +904,21 @@ class InternalFrame(object):
         |       1|  b|
         |       2|  c|
         +--------+---+
-        >>> force_nullable
-        True
         """
         if len(sdf.columns) > 0:
-            try:
-                jdf = sdf._jdf.toDF()  # type: ignore
-
-                sql_ctx = sdf.sql_ctx
-                encoders = sql_ctx._jvm.org.apache.spark.sql.Encoders  # type: ignore
-                encoder = encoders.tuple(jdf.exprEnc(), encoders.scalaLong())
-
-                jrdd = jdf.localCheckpoint(False).rdd().zipWithIndex()
-
-                df = SparkDataFrame(
-                    sql_ctx.sparkSession._jsparkSession.createDataset(  # type: ignore
-                        jrdd, encoder
-                    ).toDF(),
-                    sql_ctx,
-                )
-                columns = df.columns
-                return (
-                    df.selectExpr(
-                        "`{}` as `{}`".format(columns[1], column_name), "`{}`.*".format(columns[0])
-                    ),
-                    True,
-                )
-            except py4j.protocol.Py4JError:
-                if is_testing():
-                    raise
-                return InternalFrame._attach_distributed_sequence_column(sdf, column_name)
+            return SparkDataFrame(
+                sdf._jdf.toDF().withSequenceColumn(column_name),  # type: ignore
+                sdf.sql_ctx,
+            )
         else:
             cnt = sdf.count()
             if cnt > 0:
-                return default_session().range(cnt).toDF(column_name), False
+                return default_session().range(cnt).toDF(column_name)
             else:
-                return (
-                    default_session().createDataFrame(
-                        [],
-                        schema=StructType().add(column_name, data_type=LongType(), nullable=False),
-                    ),
-                    False,
+                return default_session().createDataFrame(
+                    [], schema=StructType().add(column_name, data_type=LongType(), nullable=False)
                 )
 
-    @staticmethod
-    def _attach_distributed_sequence_column(
-        sdf: SparkDataFrame, column_name: str
-    ) -> Tuple[SparkDataFrame, bool]:
-        """
-        >>> sdf = ps.DataFrame(['a', 'b', 'c']).to_spark()
-        >>> sdf, force_nullable = (
-        ...     InternalFrame._attach_distributed_sequence_column(sdf, column_name="sequence")
-        ... )
-        >>> sdf.sort("sequence").show()  # doctest: +NORMALIZE_WHITESPACE
-        +--------+---+
-        |sequence|  0|
-        +--------+---+
-        |       0|  a|
-        |       1|  b|
-        |       2|  c|
-        +--------+---+
-        >>> force_nullable
-        False
-        """
-        scols = [scol_for(sdf, column) for column in sdf.columns]
-
-        spark_partition_column = verify_temp_column_name(sdf, "__spark_partition_id__")
-        offset_column = verify_temp_column_name(sdf, "__offset__")
-        row_number_column = verify_temp_column_name(sdf, "__row_number__")
-
-        # 1. Calculates counts per each partition ID. `counts` here is, for instance,
-        #     {
-        #         1: 83,
-        #         6: 83,
-        #         3: 83,
-        #         ...
-        #     }
-        sdf = sdf.withColumn(spark_partition_column, F.spark_partition_id())
-
-        # Checkpoint the DataFrame to fix the partition ID.
-        sdf = sdf.localCheckpoint(eager=False)
-
-        counts = map(
-            lambda x: (x["key"], x["count"]),
-            sdf.groupby(sdf[spark_partition_column].alias("key")).count().collect(),
-        )
-
-        # 2. Calculates cumulative sum in an order of partition id.
-        #     Note that it does not matter if partition id guarantees its order or not.
-        #     We just need a one-by-one sequential id.
-
-        # sort by partition key.
-        sorted_counts = sorted(counts, key=lambda x: x[0])
-        # get cumulative sum in an order of partition key.
-        cumulative_counts = [0] + list(accumulate(map(lambda count: count[1], sorted_counts)))
-        # zip it with partition key.
-        sums = dict(zip(map(lambda count: count[0], sorted_counts), cumulative_counts))
-
-        # 3. Attach offset for each partition.
-        @pandas_udf(returnType=LongType())  # type: ignore
-        def offset(id: pd.Series) -> pd.Series:
-            current_partition_offset = sums[id.iloc[0]]
-            return pd.Series(current_partition_offset).repeat(len(id))
-
-        sdf = sdf.withColumn(offset_column, offset(spark_partition_column))
-
-        # 4. Calculate row_number in each partition.
-        w = Window.partitionBy(spark_partition_column).orderBy(F.monotonically_increasing_id())
-        row_number = F.row_number().over(w)
-        sdf = sdf.withColumn(row_number_column, row_number)
-
-        # 5. Calculate the index.
-        return (
-            sdf.select(
-                (sdf[offset_column] + sdf[row_number_column] - 1).alias(column_name), *scols
-            ),
-            False,
-        )
-
     def spark_column_for(self, label: Label) -> Column:
         """Return Spark Column for the given column label."""
         column_labels_to_scol = dict(zip(self.column_labels, self.data_spark_columns))
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index 70f6a96..2d309b6 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -5533,7 +5533,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         sdf_for_index = notnull._internal.spark_frame.select(notnull._internal.index_spark_columns)
 
         tmp_join_key = verify_temp_column_name(sdf_for_index, "__tmp_join_key__")
-        sdf_for_index, _ = InternalFrame.attach_distributed_sequence_column(
+        sdf_for_index = InternalFrame.attach_distributed_sequence_column(
             sdf_for_index, tmp_join_key
         )
         # sdf_for_index:
@@ -5550,7 +5550,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         sdf_for_data = notnull._internal.spark_frame.select(
             notnull.spark.column.alias("values"), NATURAL_ORDER_COLUMN_NAME
         )
-        sdf_for_data, _ = InternalFrame.attach_distributed_sequence_column(
+        sdf_for_data = InternalFrame.attach_distributed_sequence_column(
             sdf_for_data, SPARK_DEFAULT_SERIES_NAME
         )
         # sdf_for_data:
@@ -5569,9 +5569,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
         ).drop("values", NATURAL_ORDER_COLUMN_NAME)
 
         tmp_join_key = verify_temp_column_name(sdf_for_data, "__tmp_join_key__")
-        sdf_for_data, _ = InternalFrame.attach_distributed_sequence_column(
-            sdf_for_data, tmp_join_key
-        )
+        sdf_for_data = InternalFrame.attach_distributed_sequence_column(sdf_for_data, tmp_join_key)
         # sdf_for_index:                         sdf_for_data:
         # +----------------+-----------------+   +----------------+---+
         # |__tmp_join_key__|__index_level_0__|   |__tmp_join_key__|  0|
@@ -5639,7 +5637,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
             return -1
         # We should remember the natural sequence started from 0
         seq_col_name = verify_temp_column_name(sdf, "__distributed_sequence_column__")
-        sdf, _ = InternalFrame.attach_distributed_sequence_column(
+        sdf = InternalFrame.attach_distributed_sequence_column(
             sdf.drop(NATURAL_ORDER_COLUMN_NAME), seq_col_name
         )
         # If the maximum is achieved in multiple locations, the first row position is returned.
@@ -5686,7 +5684,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
             return -1
         # We should remember the natural sequence started from 0
         seq_col_name = verify_temp_column_name(sdf, "__distributed_sequence_column__")
-        sdf, _ = InternalFrame.attach_distributed_sequence_column(
+        sdf = InternalFrame.attach_distributed_sequence_column(
             sdf.drop(NATURAL_ORDER_COLUMN_NAME), seq_col_name
         )
         # If the minimum is achieved in multiple locations, the first row position is returned.
diff --git a/python/pyspark/pandas/tests/test_dataframe.py b/python/pyspark/pandas/tests/test_dataframe.py
index 6192541..6b24611 100644
--- a/python/pyspark/pandas/tests/test_dataframe.py
+++ b/python/pyspark/pandas/tests/test_dataframe.py
@@ -5146,23 +5146,26 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
             sys.stdout = prev
 
     def test_explain_hint(self):
-        psdf1 = ps.DataFrame(
-            {"lkey": ["foo", "bar", "baz", "foo"], "value": [1, 2, 3, 5]}, columns=["lkey", "value"]
-        )
-        psdf2 = ps.DataFrame(
-            {"rkey": ["foo", "bar", "baz", "foo"], "value": [5, 6, 7, 8]}, columns=["rkey", "value"]
-        )
-        merged = psdf1.merge(psdf2.spark.hint("broadcast"), left_on="lkey", right_on="rkey")
-        prev = sys.stdout
-        try:
-            out = StringIO()
-            sys.stdout = out
-            merged.spark.explain()
-            actual = out.getvalue().strip()
-
-            self.assertTrue("Broadcast" in actual, actual)
-        finally:
-            sys.stdout = prev
+        with ps.option_context("compute.default_index_type", "sequence"):
+            psdf1 = ps.DataFrame(
+                {"lkey": ["foo", "bar", "baz", "foo"], "value": [1, 2, 3, 5]},
+                columns=["lkey", "value"],
+            )
+            psdf2 = ps.DataFrame(
+                {"rkey": ["foo", "bar", "baz", "foo"], "value": [5, 6, 7, 8]},
+                columns=["rkey", "value"],
+            )
+            merged = psdf1.merge(psdf2.spark.hint("broadcast"), left_on="lkey", right_on="rkey")
+            prev = sys.stdout
+            try:
+                out = StringIO()
+                sys.stdout = out
+                merged.spark.explain()
+                actual = out.getvalue().strip()
+
+                self.assertTrue("Broadcast" in actual, actual)
+            finally:
+                sys.stdout = prev
 
     def test_mad(self):
         pdf = pd.DataFrame(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e98912b..fb8620c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3510,6 +3510,31 @@ class Dataset[T] private[sql](
   ////////////////////////////////////////////////////////////////////////////
 
   /**
+   * It adds a new long column with the name `name` that increases one by one.
+   * This is for 'distributed-sequence' default index in pandas API on Spark.
+   */
+  private[sql] def withSequenceColumn(name: String) = {
+    val rdd: RDD[InternalRow] =
+      // Checkpoint the DataFrame to fix the partition ID.
+      localCheckpoint(false)
+      .queryExecution.toRdd.zipWithIndex().mapPartitions { iter =>
+      val joinedRow = new JoinedRow
+      val unsafeRowWriter =
+        new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1)
+
+      iter.map { case (row, id) =>
+        // Writes to an UnsafeRow directly
+        unsafeRowWriter.reset()
+        unsafeRowWriter.write(0, id)
+        joinedRow(unsafeRowWriter.getRow, row)
+      }
+    }
+
+    sparkSession.internalCreateDataFrame(
+      rdd, StructType(StructField(name, LongType, nullable = false) +: schema), isStreaming)
+  }
+
+  /**
    * Converts a JavaRDD to a PythonRDD.
    */
   private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 1e3d2192..2fd5993 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2937,6 +2937,12 @@ class DataFrameSuite extends QueryTest
       checkAnswer(sql("SELECT sum(c1 * c3) + sum(c2 * c3) FROM tbl"), Row(2.00000000000) :: Nil)
     }
   }
+
+  test("SPARK-36338: DataFrame.withSequenceColumn should append unique sequence IDs") {
+    val ids = spark.range(10).repartition(5)
+      .withSequenceColumn("default_index").collect().map(_.getLong(0))
+    assert(ids.toSet === Range(0, 10).toSet)
+  }
 }
 
 case class GroupByKey(a: Int, b: Int)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org