You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "ueshin (via GitHub)" <gi...@apache.org> on 2023/07/24 22:37:34 UTC

[GitHub] [spark] ueshin opened a new pull request, #42135: [SPARK-44533][PYTHON] Add support for accumulator, broadcast, and Spark files in Python UDTF's analyze

ueshin opened a new pull request, #42135:
URL: https://github.com/apache/spark/pull/42135

   ### What changes were proposed in this pull request?
   
   Adds support for `accumulator`, `broadcast` for vanilla PySpark, and Spark files for both vanilla PySpark and Spark Connect Python client, in Python UDTF's analyze.
   
   For example, in vanilla PySpark:
   
   ```py
   >>> colname = sc.broadcast("col1")
   >>> test_accum = sc.accumulator(0)
   
   >>> @udtf
   ... class TestUDTF:
   ...     @staticmethod
   ...     def analyze(a: AnalyzeArgument) -> AnalyzeResult:
   ...         test_accum.add(1)
   ...         return AnalyzeResult(StructType().add(colname.value, a.data_type))
   ...     def eval(self, a):
   ...         test_accum.add(1)
   ...         yield a,
   ...
   >>> df = TestUDTF(lit(10))
   >>> df.printSchema()
   root
    |-- col1: integer (nullable = true)
   
   >>> df.show()
   +----+
   |col1|
   +----+
   |  10|
   +----+
   
   >>> test_accum.value
   2
   ```
   
   or
   
   ```py
   >>> pyfile_path = "my_pyfile.py"
   >>> with open(pyfile_path, "w") as f:
   ...     f.write("my_func = lambda: 'col1'")
   ...
   24
   >>> sc.addPyFile(pyfile_path)
   >>> # or spark.addArtifacts(pyfile_path, pyfile=True)
   >>>
   >>> @udtf
   ... class TestUDTF:
   ...     @staticmethod
   ...     def analyze(a: AnalyzeArgument) -> AnalyzeResult:
   ...         import my_pyfile
   ...         return AnalyzeResult(StructType().add(my_pyfile.my_func(), a.data_type))
   ...     def eval(self, a):
   ...         yield a,
   ...
   >>> df = TestUDTF(lit(10))
   >>> df.printSchema()
   root
    |-- col1: integer (nullable = true)
   
   >>> df.show()
   +----+
   |col1|
   +----+
   |  10|
   +----+
   ```
   
   ### Why are the changes needed?
   
   To support missing features: `accumulator`, `broadcast`, and Spark files in Python UDTF's analyze.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, accumulator, broadcast in vanilla PySpark, and Spark files in both vanilla PySpark and Spark Connect Python client will be available.
   
   ### How was this patch tested?
   
   Added related tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on pull request #42135: [SPARK-44533][PYTHON] Add support for accumulator, broadcast, and Spark files in Python UDTF's analyze

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #42135:
URL: https://github.com/apache/spark/pull/42135#issuecomment-1648719066

   cc @allisonwang-db @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin closed pull request #42135: [SPARK-44533][PYTHON] Add support for accumulator, broadcast, and Spark files in Python UDTF's analyze

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin closed pull request #42135: [SPARK-44533][PYTHON] Add support for accumulator, broadcast, and Spark files in Python UDTF's analyze
URL: https://github.com/apache/spark/pull/42135


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on pull request #42135: [SPARK-44533][PYTHON] Add support for accumulator, broadcast, and Spark files in Python UDTF's analyze

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #42135:
URL: https://github.com/apache/spark/pull/42135#issuecomment-1652875714

   Thanks! merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a diff in pull request #42135: [SPARK-44533][PYTHON] Add support for accumulator, broadcast, and Spark files in Python UDTF's analyze

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #42135:
URL: https://github.com/apache/spark/pull/42135#discussion_r1273976305


##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -1214,6 +1217,153 @@ def eval(self, **kwargs):
         ):
             self.spark.sql("SELECT * FROM test_udtf(1, 'x')").collect()
 
+    def test_udtf_with_analyze_using_broadcast(self):
+        colname = self.sc.broadcast("col1")
+
+        @udtf
+        class TestUDTF:
+            @staticmethod
+            def analyze(a: AnalyzeArgument) -> AnalyzeResult:
+                return AnalyzeResult(StructType().add(colname.value, a.data_type))
+
+            def eval(self, a):
+                yield a,
+
+        df = TestUDTF(lit(10))
+        assertSchemaEqual(df.schema, StructType().add("col1", IntegerType()))
+        assertDataFrameEqual(df, [Row(col=10)])
+
+    def test_udtf_with_analyze_using_accumulator(self):
+        test_accum = self.sc.accumulator(0)
+
+        @udtf
+        class TestUDTF:
+            @staticmethod
+            def analyze(a: AnalyzeArgument) -> AnalyzeResult:
+                test_accum.add(1)
+                return AnalyzeResult(StructType().add("col1", a.data_type))
+
+            def eval(self, a):
+                test_accum.add(1)
+                yield a,
+
+        df = TestUDTF(lit(10))

Review Comment:
   Yes, let me update the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #42135: [SPARK-44533][PYTHON] Add support for accumulator, broadcast, and Spark files in Python UDTF's analyze

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #42135:
URL: https://github.com/apache/spark/pull/42135#discussion_r1273829387


##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -1214,6 +1217,153 @@ def eval(self, **kwargs):
         ):
             self.spark.sql("SELECT * FROM test_udtf(1, 'x')").collect()
 
+    def test_udtf_with_analyze_using_broadcast(self):
+        colname = self.sc.broadcast("col1")
+
+        @udtf
+        class TestUDTF:
+            @staticmethod
+            def analyze(a: AnalyzeArgument) -> AnalyzeResult:
+                return AnalyzeResult(StructType().add(colname.value, a.data_type))
+
+            def eval(self, a):
+                yield a,
+
+        df = TestUDTF(lit(10))
+        assertSchemaEqual(df.schema, StructType().add("col1", IntegerType()))
+        assertDataFrameEqual(df, [Row(col=10)])
+
+    def test_udtf_with_analyze_using_accumulator(self):
+        test_accum = self.sc.accumulator(0)
+
+        @udtf
+        class TestUDTF:
+            @staticmethod
+            def analyze(a: AnalyzeArgument) -> AnalyzeResult:
+                test_accum.add(1)
+                return AnalyzeResult(StructType().add("col1", a.data_type))
+
+            def eval(self, a):
+                test_accum.add(1)
+                yield a,
+
+        df = TestUDTF(lit(10))
+        assertSchemaEqual(df.schema, StructType().add("col1", IntegerType()))
+        assertDataFrameEqual(df, [Row(col=10)])
+        self.assertEqual(test_accum.value, 2)
+
+    def _add_pyfile(self, path):
+        self.sc.addPyFile(path)
+
+    def test_udtf_with_analyze_using_pyfile(self):

Review Comment:
   This is really cool. Can we add/modify this test case to use pyfile in 1) eval 2) terminate?



##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -1214,6 +1217,153 @@ def eval(self, **kwargs):
         ):
             self.spark.sql("SELECT * FROM test_udtf(1, 'x')").collect()
 
+    def test_udtf_with_analyze_using_broadcast(self):
+        colname = self.sc.broadcast("col1")
+
+        @udtf
+        class TestUDTF:
+            @staticmethod
+            def analyze(a: AnalyzeArgument) -> AnalyzeResult:
+                return AnalyzeResult(StructType().add(colname.value, a.data_type))
+
+            def eval(self, a):
+                yield a,
+
+        df = TestUDTF(lit(10))
+        assertSchemaEqual(df.schema, StructType().add("col1", IntegerType()))
+        assertDataFrameEqual(df, [Row(col=10)])
+
+    def test_udtf_with_analyze_using_accumulator(self):
+        test_accum = self.sc.accumulator(0)
+
+        @udtf
+        class TestUDTF:
+            @staticmethod
+            def analyze(a: AnalyzeArgument) -> AnalyzeResult:
+                test_accum.add(1)
+                return AnalyzeResult(StructType().add("col1", a.data_type))
+
+            def eval(self, a):
+                test_accum.add(1)
+                yield a,
+
+        df = TestUDTF(lit(10))

Review Comment:
   Just curious, does the accumulator work if we use the UDTF in a SQL query?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42135: [SPARK-44533][PYTHON] Add support for accumulator, broadcast, and Spark files in Python UDTF's analyze

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42135:
URL: https://github.com/apache/spark/pull/42135#discussion_r1272931830


##########
python/pyspark/sql/tests/connect/test_parity_udtf.py:
##########
@@ -104,6 +106,23 @@ def terminate(self):
         with self.assertRaisesRegex(SparkConnectGrpcException, err_msg):
             TestUDTF(lit(1)).show()
 
+    @unittest.skip("Spark Connect does not support broadcast but the test depends on it.")
+    def test_udtf_with_analyze_using_broadcast(self):
+        super().test_udtf_with_analyze_using_broadcast()
+
+    @unittest.skip("Spark Connect does not support accumulator but the test depends on it.")
+    def test_udtf_with_analyze_using_accumulator(self):
+        super().test_udtf_with_analyze_using_accumulator()
+
+    def _add_pyfile(self, path):

Review Comment:
   Nice, I like it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org