You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/06/09 23:23:19 UTC
[spark] branch master updated: [SPARK-43610][CONNECT][PS] Enable `InternalFrame.attach_distributed_column` in Spark Connect
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 611908afb4f [SPARK-43610][CONNECT][PS] Enable `InternalFrame.attach_distributed_column` in Spark Connect
611908afb4f is described below
commit 611908afb4fdf9adde6de10531fb2ff9bf63029e
Author: itholic <ha...@databricks.com>
AuthorDate: Sat Jun 10 08:23:03 2023 +0900
[SPARK-43610][CONNECT][PS] Enable `InternalFrame.attach_distributed_column` in Spark Connect
### What changes were proposed in this pull request?
This PR proposes to enable `InternalFrame.attach_distributed_column` in Spark Connect.
### Why are the changes needed?
To improve API coverage for pandas API on Spark with Spark Connect.
### Does this PR introduce _any_ user-facing change?
Yes, the pandas-on-Spark APIs depends on `InternalFrame.attach_distributed_column` is now available on Spark Connect as well.
### How was this patch tested?
Reusing existing test cases & manually tested.
Closes #41528 from itholic/SPARK-43610.
Lead-authored-by: itholic <ha...@databricks.com>
Co-authored-by: Haejoon Lee <44...@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/pandas/internal.py | 5 ++++-
.../tests/connect/computation/test_parity_apply_func.py | 6 ------
.../pandas/tests/connect/frame/test_parity_reindexing.py | 12 ------------
.../pandas/tests/connect/frame/test_parity_reshaping.py | 6 ------
.../pyspark/pandas/tests/connect/indexes/test_parity_base.py | 12 ------------
.../pandas/tests/connect/indexes/test_parity_datetime.py | 12 +-----------
.../pyspark/pandas/tests/connect/series/test_parity_as_of.py | 6 +-----
.../pandas/tests/connect/series/test_parity_compute.py | 12 ------------
.../pandas/tests/connect/test_parity_default_index.py | 6 ------
9 files changed, 6 insertions(+), 71 deletions(-)
diff --git a/python/pyspark/pandas/internal.py b/python/pyspark/pandas/internal.py
index 1daae6bf07b..e4d8b4dbe5a 100644
--- a/python/pyspark/pandas/internal.py
+++ b/python/pyspark/pandas/internal.py
@@ -935,10 +935,13 @@ class InternalFrame:
)
return sdf.select(sequential_index.alias(column_name), *scols)
- # TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect.
@staticmethod
def attach_distributed_column(sdf: PySparkDataFrame, column_name: str) -> PySparkDataFrame:
scols = [scol_for(sdf, column) for column in sdf.columns]
+ # Does not add an alias to avoid having some changes in protobuf definition for now.
+ # The alias is more for query strings in DataFrame.explain, and they are cosmetic changes.
+ if is_remote():
+ return sdf.select(F.monotonically_increasing_id().alias(column_name), *scols)
jvm = sdf.sparkSession._jvm
tag = jvm.org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FUNC_ALIAS()
jexpr = F.monotonically_increasing_id()._jc.expr()
diff --git a/python/pyspark/pandas/tests/connect/computation/test_parity_apply_func.py b/python/pyspark/pandas/tests/connect/computation/test_parity_apply_func.py
index e24dc96ff1a..3a720846a9f 100644
--- a/python/pyspark/pandas/tests/connect/computation/test_parity_apply_func.py
+++ b/python/pyspark/pandas/tests/connect/computation/test_parity_apply_func.py
@@ -29,12 +29,6 @@ class FrameParityApplyFunctionTests(
def psdf(self):
return ps.from_pandas(self.pdf)
- @unittest.skip(
- "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
- )
- def test_aggregate(self):
- super().test_aggregate()
-
if __name__ == "__main__":
from pyspark.pandas.tests.connect.computation.test_parity_apply_func import * # noqa: F401
diff --git a/python/pyspark/pandas/tests/connect/frame/test_parity_reindexing.py b/python/pyspark/pandas/tests/connect/frame/test_parity_reindexing.py
index eebd902ae1f..0caa6c32cf8 100644
--- a/python/pyspark/pandas/tests/connect/frame/test_parity_reindexing.py
+++ b/python/pyspark/pandas/tests/connect/frame/test_parity_reindexing.py
@@ -29,18 +29,6 @@ class FrameParityReindexingTests(
def psdf(self):
return ps.from_pandas(self.pdf)
- @unittest.skip(
- "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
- )
- def test_at_time(self):
- super().test_at_time()
-
- @unittest.skip(
- "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
- )
- def test_between_time(self):
- super().test_between_time()
-
if __name__ == "__main__":
from pyspark.pandas.tests.connect.frame.test_parity_reindexing import * # noqa: F401
diff --git a/python/pyspark/pandas/tests/connect/frame/test_parity_reshaping.py b/python/pyspark/pandas/tests/connect/frame/test_parity_reshaping.py
index 92cd63de034..98ebf3ca44a 100644
--- a/python/pyspark/pandas/tests/connect/frame/test_parity_reshaping.py
+++ b/python/pyspark/pandas/tests/connect/frame/test_parity_reshaping.py
@@ -33,12 +33,6 @@ class FrameParityReshapingTests(FrameReshapingMixin, PandasOnSparkTestUtils, Reu
def test_transpose(self):
super().test_transpose()
- @unittest.skip(
- "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
- )
- def test_unstack(self):
- super().test_unstack()
-
if __name__ == "__main__":
from pyspark.pandas.tests.connect.frame.test_parity_reshaping import * # noqa: F401
diff --git a/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py b/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py
index d5ae0a39b5b..d18a20d7290 100644
--- a/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py
+++ b/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py
@@ -35,22 +35,10 @@ class IndexesParityTests(
def test_append(self):
super().test_append()
- @unittest.skip(
- "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
- )
- def test_drop_duplicates(self):
- super().test_drop_duplicates()
-
@unittest.skip("TODO(SPARK-43620): Support `Column` for SparkConnectColumn.__getitem__.")
def test_factorize(self):
super().test_factorize()
- @unittest.skip(
- "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
- )
- def test_index_drop_duplicates(self):
- super().test_index_drop_duplicates()
-
@unittest.skip("TODO(SPARK-43703): Enable IndexesParityTests.test_monotonic.")
def test_monotonic(self):
super().test_monotonic()
diff --git a/python/pyspark/pandas/tests/connect/indexes/test_parity_datetime.py b/python/pyspark/pandas/tests/connect/indexes/test_parity_datetime.py
index aa035d1d608..48c3e490f4a 100644
--- a/python/pyspark/pandas/tests/connect/indexes/test_parity_datetime.py
+++ b/python/pyspark/pandas/tests/connect/indexes/test_parity_datetime.py
@@ -24,17 +24,7 @@ from pyspark.testing.pandasutils import PandasOnSparkTestUtils, TestUtils
class DatetimeIndexParityTests(
DatetimeIndexTestsMixin, PandasOnSparkTestUtils, TestUtils, ReusedConnectTestCase
):
- @unittest.skip(
- "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
- )
- def test_indexer_at_time(self):
- super().test_indexer_at_time()
-
- @unittest.skip(
- "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
- )
- def test_indexer_between_time(self):
- super().test_indexer_between_time()
+ pass
if __name__ == "__main__":
diff --git a/python/pyspark/pandas/tests/connect/series/test_parity_as_of.py b/python/pyspark/pandas/tests/connect/series/test_parity_as_of.py
index d7dde1f501b..ad4faed230c 100644
--- a/python/pyspark/pandas/tests/connect/series/test_parity_as_of.py
+++ b/python/pyspark/pandas/tests/connect/series/test_parity_as_of.py
@@ -22,11 +22,7 @@ from pyspark.testing.pandasutils import PandasOnSparkTestUtils
class SeriesParityArgOpsTests(SeriesAsOfMixin, PandasOnSparkTestUtils, ReusedConnectTestCase):
- @unittest.skip(
- "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
- )
- def test_asof(self):
- super().test_asof()
+ pass
if __name__ == "__main__":
diff --git a/python/pyspark/pandas/tests/connect/series/test_parity_compute.py b/python/pyspark/pandas/tests/connect/series/test_parity_compute.py
index 09fb258b515..24f5f39c9b1 100644
--- a/python/pyspark/pandas/tests/connect/series/test_parity_compute.py
+++ b/python/pyspark/pandas/tests/connect/series/test_parity_compute.py
@@ -22,18 +22,6 @@ from pyspark.testing.pandasutils import PandasOnSparkTestUtils
class SeriesParityComputeTests(SeriesComputeMixin, PandasOnSparkTestUtils, ReusedConnectTestCase):
- @unittest.skip(
- "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
- )
- def test_at_time(self):
- super().test_at_time()
-
- @unittest.skip(
- "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
- )
- def test_between_time(self):
- super().test_between_time()
-
@unittest.skip("TODO(SPARK-43663): Enable SeriesParityTests.test_compare.")
def test_compare(self):
super().test_compare()
diff --git a/python/pyspark/pandas/tests/connect/test_parity_default_index.py b/python/pyspark/pandas/tests/connect/test_parity_default_index.py
index 9d8ebc5eb33..c5410e6dd58 100644
--- a/python/pyspark/pandas/tests/connect/test_parity_default_index.py
+++ b/python/pyspark/pandas/tests/connect/test_parity_default_index.py
@@ -24,12 +24,6 @@ from pyspark.testing.pandasutils import PandasOnSparkTestUtils
class DefaultIndexParityTests(
DefaultIndexTestsMixin, PandasOnSparkTestUtils, ReusedConnectTestCase
):
- @unittest.skip(
- "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
- )
- def test_default_index_distributed(self):
- super().test_default_index_distributed()
-
@unittest.skip(
"TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client."
)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org