You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/07/30 13:30:54 UTC
[spark] branch branch-3.2 updated: [SPARK-36338][PYTHON][SQL] Move
distributed-sequence implementation to Scala side
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new fee87f1 [SPARK-36338][PYTHON][SQL] Move distributed-sequence implementation to Scala side
fee87f1 is described below
commit fee87f13d1be2fb018ff200d4f6dbfdb30f03a99
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Fri Jul 30 22:29:23 2021 +0900
[SPARK-36338][PYTHON][SQL] Move distributed-sequence implementation to Scala side
### What changes were proposed in this pull request?
This PR proposes to implement `distributed-sequence` index in Scala side.
### Why are the changes needed?
- Avoid unnecessary (de)serialization
- Keep the nullability in the input DataFrame when `distributed-sequence` is enabled. During the serialization, all fields are being nullable for now (see https://github.com/apache/spark/pull/32775#discussion_r645882104)
### Does this PR introduce _any_ user-facing change?
No to end users since pandas API on Spark is not released yet.
```python
import pyspark.pandas as ps
ps.set_option('compute.default_index_type', 'distributed-sequence')
ps.range(1).spark.print_schema()
```
Before:
```
root
|-- id: long (nullable = true)
```
After:
```
root
|-- id: long (nullable = false)
```
### How was this patch tested?
Manually tested, and existing tests should cover them.
Closes #33570 from HyukjinKwon/SPARK-36338.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit c6140d4d0af7f34152c1b681110a077be36f4068)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/pandas/accessors.py | 24 +---
python/pyspark/pandas/config.py | 2 +-
python/pyspark/pandas/indexes/base.py | 16 +--
python/pyspark/pandas/indexes/multi.py | 2 +-
python/pyspark/pandas/indexing.py | 16 +--
python/pyspark/pandas/internal.py | 150 +++------------------
python/pyspark/pandas/series.py | 12 +-
python/pyspark/pandas/tests/test_dataframe.py | 37 ++---
.../main/scala/org/apache/spark/sql/Dataset.scala | 25 ++++
.../org/apache/spark/sql/DataFrameSuite.scala | 6 +
10 files changed, 89 insertions(+), 201 deletions(-)
diff --git a/python/pyspark/pandas/accessors.py b/python/pyspark/pandas/accessors.py
index 6454938..d71ded1 100644
--- a/python/pyspark/pandas/accessors.py
+++ b/python/pyspark/pandas/accessors.py
@@ -169,7 +169,7 @@ class PandasOnSparkFrameMethods(object):
for scol, label in zip(internal.data_spark_columns, internal.column_labels)
]
)
- sdf, force_nullable = attach_func(sdf, name_like_string(column))
+ sdf = attach_func(sdf, name_like_string(column))
return DataFrame(
InternalFrame(
@@ -178,28 +178,18 @@ class PandasOnSparkFrameMethods(object):
scol_for(sdf, SPARK_INDEX_NAME_FORMAT(i)) for i in range(internal.index_level)
],
index_names=internal.index_names,
- index_fields=(
- [field.copy(nullable=True) for field in internal.index_fields]
- if force_nullable
- else internal.index_fields
- ),
+ index_fields=internal.index_fields,
column_labels=internal.column_labels + [column],
data_spark_columns=(
[scol_for(sdf, name_like_string(label)) for label in internal.column_labels]
+ [scol_for(sdf, name_like_string(column))]
),
- data_fields=(
- (
- [field.copy(nullable=True) for field in internal.data_fields]
- if force_nullable
- else internal.data_fields
+ data_fields=internal.data_fields
+ + [
+ InternalField.from_struct_field(
+ StructField(name_like_string(column), LongType(), nullable=False)
)
- + [
- InternalField.from_struct_field(
- StructField(name_like_string(column), LongType(), nullable=False)
- )
- ]
- ),
+ ],
column_label_names=internal.column_label_names,
).resolved_copy
)
diff --git a/python/pyspark/pandas/config.py b/python/pyspark/pandas/config.py
index b03f2e1..10acc8c 100644
--- a/python/pyspark/pandas/config.py
+++ b/python/pyspark/pandas/config.py
@@ -175,7 +175,7 @@ _options = [
Option(
key="compute.default_index_type",
doc=("This sets the default index type: sequence, distributed and distributed-sequence."),
- default="sequence",
+ default="distributed-sequence",
types=str,
check_func=(
lambda v: v in ("sequence", "distributed", "distributed-sequence"),
diff --git a/python/pyspark/pandas/indexes/base.py b/python/pyspark/pandas/indexes/base.py
index 4158d61..6c842bc 100644
--- a/python/pyspark/pandas/indexes/base.py
+++ b/python/pyspark/pandas/indexes/base.py
@@ -1692,9 +1692,7 @@ class Index(IndexOpsMixin):
]
sdf = sdf.select(index_value_columns)
- sdf, force_nullable = InternalFrame.attach_default_index(
- sdf, default_index_type="distributed-sequence"
- )
+ sdf = InternalFrame.attach_default_index(sdf, default_index_type="distributed-sequence")
# sdf here looks as below
# +-----------------+-----------------+-----------------+-----------------+
# |__index_level_0__|__index_value_0__|__index_value_1__|__index_value_2__|
@@ -1727,11 +1725,7 @@ class Index(IndexOpsMixin):
scol_for(sdf, col) for col in self._internal.index_spark_column_names
],
index_names=self._internal.index_names,
- index_fields=(
- [field.copy(nullable=True) for field in self._internal.index_fields]
- if force_nullable
- else self._internal.index_fields
- ),
+ index_fields=self._internal.index_fields,
)
return DataFrame(internal).index
@@ -1829,7 +1823,7 @@ class Index(IndexOpsMixin):
"""
sdf = self._internal.spark_frame.select(self.spark.column)
sequence_col = verify_temp_column_name(sdf, "__distributed_sequence_column__")
- sdf, _ = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
+ sdf = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
# spark_frame here looks like below
# +-----------------+---------------+
# |__index_level_0__|__index_value__|
@@ -1877,7 +1871,7 @@ class Index(IndexOpsMixin):
"""
sdf = self._internal.spark_frame.select(self.spark.column)
sequence_col = verify_temp_column_name(sdf, "__distributed_sequence_column__")
- sdf, _ = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
+ sdf = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
return (
sdf.orderBy(
@@ -2475,7 +2469,7 @@ class Index(IndexOpsMixin):
scol_for(sdf, col) for col in self._internal.index_spark_column_names
],
index_names=self._internal.index_names,
- index_fields=[field.copy(nullable=True) for field in self._internal.index_fields],
+ index_fields=[InternalField(field.dtype) for field in self._internal.index_fields],
)
return DataFrame(internal).index
diff --git a/python/pyspark/pandas/indexes/multi.py b/python/pyspark/pandas/indexes/multi.py
index 1355917..4b5ec04 100644
--- a/python/pyspark/pandas/indexes/multi.py
+++ b/python/pyspark/pandas/indexes/multi.py
@@ -1054,7 +1054,7 @@ class MultiIndex(Index):
scol_for(sdf, col) for col in self._internal.index_spark_column_names
],
index_names=self._internal.index_names,
- index_fields=[field.copy(nullable=True) for field in self._internal.index_fields],
+ index_fields=[InternalField(field.dtype) for field in self._internal.index_fields],
)
return DataFrame(internal).index
diff --git a/python/pyspark/pandas/indexing.py b/python/pyspark/pandas/indexing.py
index d36c453..bf74f06 100644
--- a/python/pyspark/pandas/indexing.py
+++ b/python/pyspark/pandas/indexing.py
@@ -1536,22 +1536,10 @@ class iLocIndexer(LocIndexerLike):
def _internal(self) -> "InternalFrame":
# Use resolved_copy to fix the natural order.
internal = super()._internal.resolved_copy
- sdf, force_nullable = InternalFrame.attach_distributed_sequence_column(
+ sdf = InternalFrame.attach_distributed_sequence_column(
internal.spark_frame, column_name=self._sequence_col
)
- return internal.with_new_sdf(
- spark_frame=sdf.orderBy(NATURAL_ORDER_COLUMN_NAME),
- index_fields=(
- [field.copy(nullable=True) for field in internal.index_fields]
- if force_nullable
- else internal.index_fields
- ),
- data_fields=(
- [field.copy(nullable=True) for field in internal.data_fields]
- if force_nullable
- else internal.data_fields
- ),
- )
+ return internal.with_new_sdf(spark_frame=sdf.orderBy(NATURAL_ORDER_COLUMN_NAME))
@lazy_property
def _sequence_col(self) -> str:
diff --git a/python/pyspark/pandas/internal.py b/python/pyspark/pandas/internal.py
index 0c7d9d5..53bb964 100644
--- a/python/pyspark/pandas/internal.py
+++ b/python/pyspark/pandas/internal.py
@@ -20,15 +20,12 @@ An internal immutable DataFrame with some metadata to manage indexes.
"""
import re
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union, TYPE_CHECKING, cast
-from itertools import accumulate
-import py4j
import numpy as np
import pandas as pd
from pandas.api.types import CategoricalDtype # noqa: F401
from pyspark._globals import _NoValue, _NoValueType
from pyspark.sql import functions as F, Column, DataFrame as SparkDataFrame, Window
-from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ( # noqa: F401
BooleanType,
DataType,
@@ -64,7 +61,6 @@ from pyspark.pandas.utils import (
name_like_string,
scol_for,
spark_column_equals,
- verify_temp_column_name,
)
@@ -636,7 +632,7 @@ class InternalFrame(object):
)
# Create default index.
- spark_frame, force_nullable = InternalFrame.attach_default_index(spark_frame)
+ spark_frame = InternalFrame.attach_default_index(spark_frame)
index_spark_columns = [scol_for(spark_frame, SPARK_DEFAULT_INDEX_NAME)]
index_fields = [
@@ -658,7 +654,6 @@ class InternalFrame(object):
data_fields = [
field.copy(
name=name_like_string(struct_field.name),
- nullable=(force_nullable or field.nullable),
)
for field, struct_field in zip(data_fields, data_struct_fields)
]
@@ -836,7 +831,7 @@ class InternalFrame(object):
@staticmethod
def attach_default_index(
sdf: SparkDataFrame, default_index_type: Optional[str] = None
- ) -> Tuple[SparkDataFrame, bool]:
+ ) -> SparkDataFrame:
"""
This method attaches a default index to Spark DataFrame. Spark does not have the index
notion so corresponding column should be generated.
@@ -848,13 +843,13 @@ class InternalFrame(object):
It adds the default index column '__index_level_0__'.
- >>> spark_frame = InternalFrame.attach_default_index(spark_frame)[0]
+ >>> spark_frame = InternalFrame.attach_default_index(spark_frame)
>>> spark_frame
DataFrame[__index_level_0__: bigint, id: bigint]
It throws an exception if the given column name already exists.
- >>> InternalFrame.attach_default_index(spark_frame)[0]
+ >>> InternalFrame.attach_default_index(spark_frame)
... # doctest: +ELLIPSIS
Traceback (most recent call last):
...
@@ -881,34 +876,26 @@ class InternalFrame(object):
)
@staticmethod
- def attach_sequence_column(
- sdf: SparkDataFrame, column_name: str
- ) -> Tuple[SparkDataFrame, bool]:
+ def attach_sequence_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
scols = [scol_for(sdf, column) for column in sdf.columns]
sequential_index = (
F.row_number().over(Window.orderBy(F.monotonically_increasing_id())).cast("long") - 1
)
- return sdf.select(sequential_index.alias(column_name), *scols), False
+ return sdf.select(sequential_index.alias(column_name), *scols)
@staticmethod
- def attach_distributed_column(
- sdf: SparkDataFrame, column_name: str
- ) -> Tuple[SparkDataFrame, bool]:
+ def attach_distributed_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
scols = [scol_for(sdf, column) for column in sdf.columns]
- return sdf.select(F.monotonically_increasing_id().alias(column_name), *scols), False
+ return sdf.select(F.monotonically_increasing_id().alias(column_name), *scols)
@staticmethod
- def attach_distributed_sequence_column(
- sdf: SparkDataFrame, column_name: str
- ) -> Tuple[SparkDataFrame, bool]:
+ def attach_distributed_sequence_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
"""
This method attaches a Spark column that has a sequence in a distributed manner.
This is equivalent to the column assigned when default index type 'distributed-sequence'.
>>> sdf = ps.DataFrame(['a', 'b', 'c']).to_spark()
- >>> sdf, force_nullable = (
- ... InternalFrame.attach_distributed_sequence_column(sdf, column_name="sequence")
- ... )
+ >>> sdf = InternalFrame.attach_distributed_sequence_column(sdf, column_name="sequence")
>>> sdf.show() # doctest: +NORMALIZE_WHITESPACE
+--------+---+
|sequence| 0|
@@ -917,124 +904,21 @@ class InternalFrame(object):
| 1| b|
| 2| c|
+--------+---+
- >>> force_nullable
- True
"""
if len(sdf.columns) > 0:
- try:
- jdf = sdf._jdf.toDF() # type: ignore
-
- sql_ctx = sdf.sql_ctx
- encoders = sql_ctx._jvm.org.apache.spark.sql.Encoders # type: ignore
- encoder = encoders.tuple(jdf.exprEnc(), encoders.scalaLong())
-
- jrdd = jdf.localCheckpoint(False).rdd().zipWithIndex()
-
- df = SparkDataFrame(
- sql_ctx.sparkSession._jsparkSession.createDataset( # type: ignore
- jrdd, encoder
- ).toDF(),
- sql_ctx,
- )
- columns = df.columns
- return (
- df.selectExpr(
- "`{}` as `{}`".format(columns[1], column_name), "`{}`.*".format(columns[0])
- ),
- True,
- )
- except py4j.protocol.Py4JError:
- if is_testing():
- raise
- return InternalFrame._attach_distributed_sequence_column(sdf, column_name)
+ return SparkDataFrame(
+ sdf._jdf.toDF().withSequenceColumn(column_name), # type: ignore
+ sdf.sql_ctx,
+ )
else:
cnt = sdf.count()
if cnt > 0:
- return default_session().range(cnt).toDF(column_name), False
+ return default_session().range(cnt).toDF(column_name)
else:
- return (
- default_session().createDataFrame(
- [],
- schema=StructType().add(column_name, data_type=LongType(), nullable=False),
- ),
- False,
+ return default_session().createDataFrame(
+ [], schema=StructType().add(column_name, data_type=LongType(), nullable=False)
)
- @staticmethod
- def _attach_distributed_sequence_column(
- sdf: SparkDataFrame, column_name: str
- ) -> Tuple[SparkDataFrame, bool]:
- """
- >>> sdf = ps.DataFrame(['a', 'b', 'c']).to_spark()
- >>> sdf, force_nullable = (
- ... InternalFrame._attach_distributed_sequence_column(sdf, column_name="sequence")
- ... )
- >>> sdf.sort("sequence").show() # doctest: +NORMALIZE_WHITESPACE
- +--------+---+
- |sequence| 0|
- +--------+---+
- | 0| a|
- | 1| b|
- | 2| c|
- +--------+---+
- >>> force_nullable
- False
- """
- scols = [scol_for(sdf, column) for column in sdf.columns]
-
- spark_partition_column = verify_temp_column_name(sdf, "__spark_partition_id__")
- offset_column = verify_temp_column_name(sdf, "__offset__")
- row_number_column = verify_temp_column_name(sdf, "__row_number__")
-
- # 1. Calculates counts per each partition ID. `counts` here is, for instance,
- # {
- # 1: 83,
- # 6: 83,
- # 3: 83,
- # ...
- # }
- sdf = sdf.withColumn(spark_partition_column, F.spark_partition_id())
-
- # Checkpoint the DataFrame to fix the partition ID.
- sdf = sdf.localCheckpoint(eager=False)
-
- counts = map(
- lambda x: (x["key"], x["count"]),
- sdf.groupby(sdf[spark_partition_column].alias("key")).count().collect(),
- )
-
- # 2. Calculates cumulative sum in an order of partition id.
- # Note that it does not matter if partition id guarantees its order or not.
- # We just need a one-by-one sequential id.
-
- # sort by partition key.
- sorted_counts = sorted(counts, key=lambda x: x[0])
- # get cumulative sum in an order of partition key.
- cumulative_counts = [0] + list(accumulate(map(lambda count: count[1], sorted_counts)))
- # zip it with partition key.
- sums = dict(zip(map(lambda count: count[0], sorted_counts), cumulative_counts))
-
- # 3. Attach offset for each partition.
- @pandas_udf(returnType=LongType()) # type: ignore
- def offset(id: pd.Series) -> pd.Series:
- current_partition_offset = sums[id.iloc[0]]
- return pd.Series(current_partition_offset).repeat(len(id))
-
- sdf = sdf.withColumn(offset_column, offset(spark_partition_column))
-
- # 4. Calculate row_number in each partition.
- w = Window.partitionBy(spark_partition_column).orderBy(F.monotonically_increasing_id())
- row_number = F.row_number().over(w)
- sdf = sdf.withColumn(row_number_column, row_number)
-
- # 5. Calculate the index.
- return (
- sdf.select(
- (sdf[offset_column] + sdf[row_number_column] - 1).alias(column_name), *scols
- ),
- False,
- )
-
def spark_column_for(self, label: Label) -> Column:
"""Return Spark Column for the given column label."""
column_labels_to_scol = dict(zip(self.column_labels, self.data_spark_columns))
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index 70f6a96..2d309b6 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -5533,7 +5533,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
sdf_for_index = notnull._internal.spark_frame.select(notnull._internal.index_spark_columns)
tmp_join_key = verify_temp_column_name(sdf_for_index, "__tmp_join_key__")
- sdf_for_index, _ = InternalFrame.attach_distributed_sequence_column(
+ sdf_for_index = InternalFrame.attach_distributed_sequence_column(
sdf_for_index, tmp_join_key
)
# sdf_for_index:
@@ -5550,7 +5550,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
sdf_for_data = notnull._internal.spark_frame.select(
notnull.spark.column.alias("values"), NATURAL_ORDER_COLUMN_NAME
)
- sdf_for_data, _ = InternalFrame.attach_distributed_sequence_column(
+ sdf_for_data = InternalFrame.attach_distributed_sequence_column(
sdf_for_data, SPARK_DEFAULT_SERIES_NAME
)
# sdf_for_data:
@@ -5569,9 +5569,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
).drop("values", NATURAL_ORDER_COLUMN_NAME)
tmp_join_key = verify_temp_column_name(sdf_for_data, "__tmp_join_key__")
- sdf_for_data, _ = InternalFrame.attach_distributed_sequence_column(
- sdf_for_data, tmp_join_key
- )
+ sdf_for_data = InternalFrame.attach_distributed_sequence_column(sdf_for_data, tmp_join_key)
# sdf_for_index: sdf_for_data:
# +----------------+-----------------+ +----------------+---+
# |__tmp_join_key__|__index_level_0__| |__tmp_join_key__| 0|
@@ -5639,7 +5637,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
return -1
# We should remember the natural sequence started from 0
seq_col_name = verify_temp_column_name(sdf, "__distributed_sequence_column__")
- sdf, _ = InternalFrame.attach_distributed_sequence_column(
+ sdf = InternalFrame.attach_distributed_sequence_column(
sdf.drop(NATURAL_ORDER_COLUMN_NAME), seq_col_name
)
# If the maximum is achieved in multiple locations, the first row position is returned.
@@ -5686,7 +5684,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
return -1
# We should remember the natural sequence started from 0
seq_col_name = verify_temp_column_name(sdf, "__distributed_sequence_column__")
- sdf, _ = InternalFrame.attach_distributed_sequence_column(
+ sdf = InternalFrame.attach_distributed_sequence_column(
sdf.drop(NATURAL_ORDER_COLUMN_NAME), seq_col_name
)
# If the minimum is achieved in multiple locations, the first row position is returned.
diff --git a/python/pyspark/pandas/tests/test_dataframe.py b/python/pyspark/pandas/tests/test_dataframe.py
index 6192541..6b24611 100644
--- a/python/pyspark/pandas/tests/test_dataframe.py
+++ b/python/pyspark/pandas/tests/test_dataframe.py
@@ -5146,23 +5146,26 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
sys.stdout = prev
def test_explain_hint(self):
- psdf1 = ps.DataFrame(
- {"lkey": ["foo", "bar", "baz", "foo"], "value": [1, 2, 3, 5]}, columns=["lkey", "value"]
- )
- psdf2 = ps.DataFrame(
- {"rkey": ["foo", "bar", "baz", "foo"], "value": [5, 6, 7, 8]}, columns=["rkey", "value"]
- )
- merged = psdf1.merge(psdf2.spark.hint("broadcast"), left_on="lkey", right_on="rkey")
- prev = sys.stdout
- try:
- out = StringIO()
- sys.stdout = out
- merged.spark.explain()
- actual = out.getvalue().strip()
-
- self.assertTrue("Broadcast" in actual, actual)
- finally:
- sys.stdout = prev
+ with ps.option_context("compute.default_index_type", "sequence"):
+ psdf1 = ps.DataFrame(
+ {"lkey": ["foo", "bar", "baz", "foo"], "value": [1, 2, 3, 5]},
+ columns=["lkey", "value"],
+ )
+ psdf2 = ps.DataFrame(
+ {"rkey": ["foo", "bar", "baz", "foo"], "value": [5, 6, 7, 8]},
+ columns=["rkey", "value"],
+ )
+ merged = psdf1.merge(psdf2.spark.hint("broadcast"), left_on="lkey", right_on="rkey")
+ prev = sys.stdout
+ try:
+ out = StringIO()
+ sys.stdout = out
+ merged.spark.explain()
+ actual = out.getvalue().strip()
+
+ self.assertTrue("Broadcast" in actual, actual)
+ finally:
+ sys.stdout = prev
def test_mad(self):
pdf = pd.DataFrame(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e98912b..fb8620c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3510,6 +3510,31 @@ class Dataset[T] private[sql](
////////////////////////////////////////////////////////////////////////////
/**
+ * It adds a new long column with the name `name` that increases one by one.
+ * This is for 'distributed-sequence' default index in pandas API on Spark.
+ */
+ private[sql] def withSequenceColumn(name: String) = {
+ val rdd: RDD[InternalRow] =
+ // Checkpoint the DataFrame to fix the partition ID.
+ localCheckpoint(false)
+ .queryExecution.toRdd.zipWithIndex().mapPartitions { iter =>
+ val joinedRow = new JoinedRow
+ val unsafeRowWriter =
+ new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1)
+
+ iter.map { case (row, id) =>
+ // Writes to an UnsafeRow directly
+ unsafeRowWriter.reset()
+ unsafeRowWriter.write(0, id)
+ joinedRow(unsafeRowWriter.getRow, row)
+ }
+ }
+
+ sparkSession.internalCreateDataFrame(
+ rdd, StructType(StructField(name, LongType, nullable = false) +: schema), isStreaming)
+ }
+
+ /**
* Converts a JavaRDD to a PythonRDD.
*/
private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 1e3d2192..2fd5993 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2937,6 +2937,12 @@ class DataFrameSuite extends QueryTest
checkAnswer(sql("SELECT sum(c1 * c3) + sum(c2 * c3) FROM tbl"), Row(2.00000000000) :: Nil)
}
}
+
+ test("SPARK-36338: DataFrame.withSequenceColumn should append unique sequence IDs") {
+ val ids = spark.range(10).repartition(5)
+ .withSequenceColumn("default_index").collect().map(_.getLong(0))
+ assert(ids.toSet === Range(0, 10).toSet)
+ }
}
case class GroupByKey(a: Int, b: Int)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org