You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/06/09 23:23:19 UTC

[spark] branch master updated: [SPARK-43610][CONNECT][PS] Enable `InternalFrame.attach_distributed_column` in Spark Connect

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 611908afb4f [SPARK-43610][CONNECT][PS] Enable `InternalFrame.attach_distributed_column` in Spark Connect
611908afb4f is described below

commit 611908afb4fdf9adde6de10531fb2ff9bf63029e
Author: itholic <ha...@databricks.com>
AuthorDate: Sat Jun 10 08:23:03 2023 +0900

    [SPARK-43610][CONNECT][PS] Enable `InternalFrame.attach_distributed_column` in Spark Connect
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to enable `InternalFrame.attach_distributed_column` in Spark Connect.
    
    ### Why are the changes needed?
    
    To improve API coverage for pandas API on Spark with Spark Connect.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the pandas-on-Spark APIs depends on `InternalFrame.attach_distributed_column` is now available on Spark Connect as well.
    
    ### How was this patch tested?
    
    Reusing existing test cases & manually tested.
    
    Closes #41528 from itholic/SPARK-43610.
    
    Lead-authored-by: itholic <ha...@databricks.com>
    Co-authored-by: Haejoon Lee <44...@users.noreply.github.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/pandas/internal.py                            |  5 ++++-
 .../tests/connect/computation/test_parity_apply_func.py      |  6 ------
 .../pandas/tests/connect/frame/test_parity_reindexing.py     | 12 ------------
 .../pandas/tests/connect/frame/test_parity_reshaping.py      |  6 ------
 .../pyspark/pandas/tests/connect/indexes/test_parity_base.py | 12 ------------
 .../pandas/tests/connect/indexes/test_parity_datetime.py     | 12 +-----------
 .../pyspark/pandas/tests/connect/series/test_parity_as_of.py |  6 +-----
 .../pandas/tests/connect/series/test_parity_compute.py       | 12 ------------
 .../pandas/tests/connect/test_parity_default_index.py        |  6 ------
 9 files changed, 6 insertions(+), 71 deletions(-)

diff --git a/python/pyspark/pandas/internal.py b/python/pyspark/pandas/internal.py
index 1daae6bf07b..e4d8b4dbe5a 100644
--- a/python/pyspark/pandas/internal.py
+++ b/python/pyspark/pandas/internal.py
@@ -935,10 +935,13 @@ class InternalFrame:
         )
         return sdf.select(sequential_index.alias(column_name), *scols)
 
-    # TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect.
     @staticmethod
     def attach_distributed_column(sdf: PySparkDataFrame, column_name: str) -> PySparkDataFrame:
         scols = [scol_for(sdf, column) for column in sdf.columns]
+        # Does not add an alias to avoid having some changes in protobuf definition for now.
+        # The alias is more for query strings in DataFrame.explain, and they are cosmetic changes.
+        if is_remote():
+            return sdf.select(F.monotonically_increasing_id().alias(column_name), *scols)
         jvm = sdf.sparkSession._jvm
         tag = jvm.org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FUNC_ALIAS()
         jexpr = F.monotonically_increasing_id()._jc.expr()
diff --git a/python/pyspark/pandas/tests/connect/computation/test_parity_apply_func.py b/python/pyspark/pandas/tests/connect/computation/test_parity_apply_func.py
index e24dc96ff1a..3a720846a9f 100644
--- a/python/pyspark/pandas/tests/connect/computation/test_parity_apply_func.py
+++ b/python/pyspark/pandas/tests/connect/computation/test_parity_apply_func.py
@@ -29,12 +29,6 @@ class FrameParityApplyFunctionTests(
     def psdf(self):
         return ps.from_pandas(self.pdf)
 
-    @unittest.skip(
-        "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
-    )
-    def test_aggregate(self):
-        super().test_aggregate()
-
 
 if __name__ == "__main__":
     from pyspark.pandas.tests.connect.computation.test_parity_apply_func import *  # noqa: F401
diff --git a/python/pyspark/pandas/tests/connect/frame/test_parity_reindexing.py b/python/pyspark/pandas/tests/connect/frame/test_parity_reindexing.py
index eebd902ae1f..0caa6c32cf8 100644
--- a/python/pyspark/pandas/tests/connect/frame/test_parity_reindexing.py
+++ b/python/pyspark/pandas/tests/connect/frame/test_parity_reindexing.py
@@ -29,18 +29,6 @@ class FrameParityReindexingTests(
     def psdf(self):
         return ps.from_pandas(self.pdf)
 
-    @unittest.skip(
-        "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
-    )
-    def test_at_time(self):
-        super().test_at_time()
-
-    @unittest.skip(
-        "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
-    )
-    def test_between_time(self):
-        super().test_between_time()
-
 
 if __name__ == "__main__":
     from pyspark.pandas.tests.connect.frame.test_parity_reindexing import *  # noqa: F401
diff --git a/python/pyspark/pandas/tests/connect/frame/test_parity_reshaping.py b/python/pyspark/pandas/tests/connect/frame/test_parity_reshaping.py
index 92cd63de034..98ebf3ca44a 100644
--- a/python/pyspark/pandas/tests/connect/frame/test_parity_reshaping.py
+++ b/python/pyspark/pandas/tests/connect/frame/test_parity_reshaping.py
@@ -33,12 +33,6 @@ class FrameParityReshapingTests(FrameReshapingMixin, PandasOnSparkTestUtils, Reu
     def test_transpose(self):
         super().test_transpose()
 
-    @unittest.skip(
-        "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
-    )
-    def test_unstack(self):
-        super().test_unstack()
-
 
 if __name__ == "__main__":
     from pyspark.pandas.tests.connect.frame.test_parity_reshaping import *  # noqa: F401
diff --git a/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py b/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py
index d5ae0a39b5b..d18a20d7290 100644
--- a/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py
+++ b/python/pyspark/pandas/tests/connect/indexes/test_parity_base.py
@@ -35,22 +35,10 @@ class IndexesParityTests(
     def test_append(self):
         super().test_append()
 
-    @unittest.skip(
-        "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
-    )
-    def test_drop_duplicates(self):
-        super().test_drop_duplicates()
-
     @unittest.skip("TODO(SPARK-43620): Support `Column` for SparkConnectColumn.__getitem__.")
     def test_factorize(self):
         super().test_factorize()
 
-    @unittest.skip(
-        "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
-    )
-    def test_index_drop_duplicates(self):
-        super().test_index_drop_duplicates()
-
     @unittest.skip("TODO(SPARK-43703): Enable IndexesParityTests.test_monotonic.")
     def test_monotonic(self):
         super().test_monotonic()
diff --git a/python/pyspark/pandas/tests/connect/indexes/test_parity_datetime.py b/python/pyspark/pandas/tests/connect/indexes/test_parity_datetime.py
index aa035d1d608..48c3e490f4a 100644
--- a/python/pyspark/pandas/tests/connect/indexes/test_parity_datetime.py
+++ b/python/pyspark/pandas/tests/connect/indexes/test_parity_datetime.py
@@ -24,17 +24,7 @@ from pyspark.testing.pandasutils import PandasOnSparkTestUtils, TestUtils
 class DatetimeIndexParityTests(
     DatetimeIndexTestsMixin, PandasOnSparkTestUtils, TestUtils, ReusedConnectTestCase
 ):
-    @unittest.skip(
-        "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
-    )
-    def test_indexer_at_time(self):
-        super().test_indexer_at_time()
-
-    @unittest.skip(
-        "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
-    )
-    def test_indexer_between_time(self):
-        super().test_indexer_between_time()
+    pass
 
 
 if __name__ == "__main__":
diff --git a/python/pyspark/pandas/tests/connect/series/test_parity_as_of.py b/python/pyspark/pandas/tests/connect/series/test_parity_as_of.py
index d7dde1f501b..ad4faed230c 100644
--- a/python/pyspark/pandas/tests/connect/series/test_parity_as_of.py
+++ b/python/pyspark/pandas/tests/connect/series/test_parity_as_of.py
@@ -22,11 +22,7 @@ from pyspark.testing.pandasutils import PandasOnSparkTestUtils
 
 
 class SeriesParityArgOpsTests(SeriesAsOfMixin, PandasOnSparkTestUtils, ReusedConnectTestCase):
-    @unittest.skip(
-        "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
-    )
-    def test_asof(self):
-        super().test_asof()
+    pass
 
 
 if __name__ == "__main__":
diff --git a/python/pyspark/pandas/tests/connect/series/test_parity_compute.py b/python/pyspark/pandas/tests/connect/series/test_parity_compute.py
index 09fb258b515..24f5f39c9b1 100644
--- a/python/pyspark/pandas/tests/connect/series/test_parity_compute.py
+++ b/python/pyspark/pandas/tests/connect/series/test_parity_compute.py
@@ -22,18 +22,6 @@ from pyspark.testing.pandasutils import PandasOnSparkTestUtils
 
 
 class SeriesParityComputeTests(SeriesComputeMixin, PandasOnSparkTestUtils, ReusedConnectTestCase):
-    @unittest.skip(
-        "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
-    )
-    def test_at_time(self):
-        super().test_at_time()
-
-    @unittest.skip(
-        "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
-    )
-    def test_between_time(self):
-        super().test_between_time()
-
     @unittest.skip("TODO(SPARK-43663): Enable SeriesParityTests.test_compare.")
     def test_compare(self):
         super().test_compare()
diff --git a/python/pyspark/pandas/tests/connect/test_parity_default_index.py b/python/pyspark/pandas/tests/connect/test_parity_default_index.py
index 9d8ebc5eb33..c5410e6dd58 100644
--- a/python/pyspark/pandas/tests/connect/test_parity_default_index.py
+++ b/python/pyspark/pandas/tests/connect/test_parity_default_index.py
@@ -24,12 +24,6 @@ from pyspark.testing.pandasutils import PandasOnSparkTestUtils
 class DefaultIndexParityTests(
     DefaultIndexTestsMixin, PandasOnSparkTestUtils, ReusedConnectTestCase
 ):
-    @unittest.skip(
-        "TODO(SPARK-43610): Enable `InternalFrame.attach_distributed_column` in Spark Connect."
-    )
-    def test_default_index_distributed(self):
-        super().test_default_index_distributed()
-
     @unittest.skip(
         "TODO(SPARK-43611): Fix unexpected `AnalysisException` from Spark Connect client."
     )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org