You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "wbo4958 (via GitHub)" <gi...@apache.org> on 2024/01/23 10:59:36 UTC

[PR] Make mapInPandas / mapInArrow support ResourceProfile [spark]

wbo4958 opened a new pull request, #44852:
URL: https://github.com/apache/spark/pull/44852

   ### What changes were proposed in this pull request?
   
   Make mapInPandas / mapInArrow support ResourceProfile
   
   ### Why are the changes needed?
   
   Supporting ResourceProfile (Stage-Level scheduling) for `mapInPandas / mapInArrow` is required by machine learning use cases.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   Manual tests.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1911749884

   Hi @tgravescs, Thx for your comment. 
   
   I just updated the description and submitted a new commit which adds the unit tests to test the resource profile for sql dataframe and connect dataframe API.
   
   @tgravescs @WeichenXu123 @zhengruifeng Could you help review this PR. Thx. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on code in PR #44852:
URL: https://github.com/apache/spark/pull/44852#discussion_r1475544579


##########
python/pyspark/sql/tests/test_resources.py:
##########
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from pyspark import SparkContext, TaskContext
+from pyspark.resource import TaskResourceRequests, ResourceProfileBuilder
+from pyspark.sql import SparkSession
+from pyspark.testing.sqlutils import (
+    have_pandas,
+    have_pyarrow,
+    pandas_requirement_message,
+    pyarrow_requirement_message,
+)
+from pyspark.testing.utils import ReusedPySparkTestCase
+
+
+@unittest.skipIf(
+    not have_pandas or not have_pyarrow,
+    pandas_requirement_message or pyarrow_requirement_message,
+)
+class ResourceProfileTestsMixin(object):
+    def test_map_in_arrow_without_profile(self):
+        def func(iterator):
+            tc = TaskContext.get()
+            assert tc.cpus() == 1
+            for batch in iterator:
+                yield batch
+
+        df = self.spark.range(10)
+        df.mapInArrow(func, "id long").collect()
+
+    def test_map_in_arrow_with_profile(self):
+        def func(iterator):
+            tc = TaskContext.get()
+            assert tc.cpus() == 3
+            for batch in iterator:
+                yield batch
+
+        df = self.spark.range(10)
+
+        treqs = TaskResourceRequests().cpus(3)
+        rp = ResourceProfileBuilder().require(treqs).build
+        df.mapInArrow(func, "id long", False, rp).collect()
+
+    def test_map_in_pandas_without_profile(self):
+        def func(iterator):
+            tc = TaskContext.get()
+            assert tc.cpus() == 1
+            for batch in iterator:
+                yield batch
+
+        df = self.spark.range(10)
+        df.mapInPandas(func, "id long").collect()
+
+    def test_map_in_pandas_with_profile(self):
+        def func(iterator):
+            tc = TaskContext.get()
+            assert tc.cpus() == 3
+            for batch in iterator:
+                yield batch
+
+        df = self.spark.range(10)
+
+        treqs = TaskResourceRequests().cpus(3)
+        rp = ResourceProfileBuilder().require(treqs).build
+        df.mapInPandas(func, "id long", False, rp).collect()
+
+
+class ResourceProfileTests(ResourceProfileTestsMixin, ReusedPySparkTestCase):
+    @classmethod
+    def setUpClass(cls):
+        cls.sc = SparkContext("local-cluster[1, 4, 1024]", cls.__name__, conf=cls.conf())
+        cls.spark = SparkSession(cls.sc)
+
+    @classmethod
+    def tearDownClass(cls):
+        super(ResourceProfileTests, cls).tearDownClass()
+        cls.spark.stop()
+
+
+if __name__ == "__main__":
+    from pyspark.sql.tests.test_resources import *  # noqa: F401

Review Comment:
   Thx for the review. Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44852:
URL: https://github.com/apache/spark/pull/44852#discussion_r1480797674


##########
python/pyspark/sql/pandas/map_ops.py:
##########
@@ -15,9 +15,14 @@
 # limitations under the License.
 #
 import sys
-from typing import Union, TYPE_CHECKING
+from typing import Union, TYPE_CHECKING, Optional
+
+from py4j.java_gateway import JavaObject
+
+from pyspark.resource.requests import ExecutorResourceRequests, TaskResourceRequests
 

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #44852:
URL: https://github.com/apache/spark/pull/44852#discussion_r1475419041


##########
python/pyspark/sql/pandas/map_ops.py:
##########
@@ -65,6 +74,12 @@ def mapInPandas(
 
             .. versionadded: 3.5.0
 
+        profile : :class:`pyspark.resource.ResourceProfile`. The optional ResourceProfile
+            to be used for mapInPandas.
+
+            .. versionadded: 3.5.1

Review Comment:
   we don't add new feature in maintenance release



##########
python/pyspark/sql/tests/test_resources.py:
##########
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from pyspark import SparkContext, TaskContext
+from pyspark.resource import TaskResourceRequests, ResourceProfileBuilder
+from pyspark.sql import SparkSession
+from pyspark.testing.sqlutils import (
+    have_pandas,
+    have_pyarrow,
+    pandas_requirement_message,
+    pyarrow_requirement_message,
+)
+from pyspark.testing.utils import ReusedPySparkTestCase
+
+
+@unittest.skipIf(
+    not have_pandas or not have_pyarrow,
+    pandas_requirement_message or pyarrow_requirement_message,
+)
+class ResourceProfileTestsMixin(object):
+    def test_map_in_arrow_without_profile(self):
+        def func(iterator):
+            tc = TaskContext.get()
+            assert tc.cpus() == 1
+            for batch in iterator:
+                yield batch
+
+        df = self.spark.range(10)
+        df.mapInArrow(func, "id long").collect()
+
+    def test_map_in_arrow_with_profile(self):
+        def func(iterator):
+            tc = TaskContext.get()
+            assert tc.cpus() == 3
+            for batch in iterator:
+                yield batch
+
+        df = self.spark.range(10)
+
+        treqs = TaskResourceRequests().cpus(3)
+        rp = ResourceProfileBuilder().require(treqs).build
+        df.mapInArrow(func, "id long", False, rp).collect()
+
+    def test_map_in_pandas_without_profile(self):
+        def func(iterator):
+            tc = TaskContext.get()
+            assert tc.cpus() == 1
+            for batch in iterator:
+                yield batch
+
+        df = self.spark.range(10)
+        df.mapInPandas(func, "id long").collect()
+
+    def test_map_in_pandas_with_profile(self):
+        def func(iterator):
+            tc = TaskContext.get()
+            assert tc.cpus() == 3
+            for batch in iterator:
+                yield batch
+
+        df = self.spark.range(10)
+
+        treqs = TaskResourceRequests().cpus(3)
+        rp = ResourceProfileBuilder().require(treqs).build
+        df.mapInPandas(func, "id long", False, rp).collect()
+
+
+class ResourceProfileTests(ResourceProfileTestsMixin, ReusedPySparkTestCase):
+    @classmethod
+    def setUpClass(cls):
+        cls.sc = SparkContext("local-cluster[1, 4, 1024]", cls.__name__, conf=cls.conf())
+        cls.spark = SparkSession(cls.sc)
+
+    @classmethod
+    def tearDownClass(cls):
+        super(ResourceProfileTests, cls).tearDownClass()
+        cls.spark.stop()
+
+
+if __name__ == "__main__":
+    from pyspark.sql.tests.test_resources import *  # noqa: F401

Review Comment:
   please add this test in `dev/sparktestsupport/modules.py`, otherwise it is skipped in GA



##########
python/pyspark/sql/pandas/map_ops.py:
##########
@@ -175,6 +196,11 @@ def mapInArrow(
 
             .. versionadded: 3.5.0
 
+        profile : :class:`pyspark.resource.ResourceProfile`. The optional ResourceProfile
+            to be used for mapInPandas.
+
+            .. versionadded: 3.5.1

Review Comment:
   ```suggestion
               .. versionadded: 4.0.0
   ```



##########
python/pyspark/sql/pandas/map_ops.py:
##########
@@ -65,6 +74,12 @@ def mapInPandas(
 
             .. versionadded: 3.5.0
 
+        profile : :class:`pyspark.resource.ResourceProfile`. The optional ResourceProfile
+            to be used for mapInPandas.
+
+            .. versionadded: 3.5.1

Review Comment:
   ```suggestion
               .. versionadded: 4.0.0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on code in PR #44852:
URL: https://github.com/apache/spark/pull/44852#discussion_r1467421025


##########
python/pyspark/resource/profile.py:
##########
@@ -106,6 +106,13 @@ def __init__(
             self._executor_resource_requests = _exec_req or {}
             self._task_resource_requests = _task_req or {}
 
+            from pyspark.sql import is_remote

Review Comment:
   I'm wondering if I need to move this part of building the remote resource file  into the function of `def id(self)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1908122812

   ## With dynamic allocation enabled.
   
   ``` bash
   pyspark --master spark://192.168.141.19:7077 --conf spark.executor.cores=4 --conf spark.task.cpus=1 \
     --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=1\
   ```
   
   The above command enables the dynamic allocation and the max executors required is set to 1 in order to test.
   
   ### TaskResourceProfile without any specific executor request information
   
   Test code,
   
   ```python
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
   
   def filter_func(iterator):
       for pdf in iterator:
           yield pdf
   
   df = spark.range(0, 100, 1, 4)
   
   treqs = TaskResourceRequests().cpus(3)
   rp = ResourceProfileBuilder().require(treqs).build
   df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()
   ```
   
   The rp refers to the TaskResourceProfile without any specific executor request information, thus the executor information will utilize the default values from Default ResourceProfile (executor.cores=4).
   
   The above code will require an extra executor which will have the same `executor.cores/memory` as the default ResourceProfile.
   
   ![0](https://github.com/apache/spark/assets/1320706/127086db-ebad-4676-891c-d74d0a80bd2c)
   
   ![1](https://github.com/apache/spark/assets/1320706/20f01c64-a9cd-4118-bda6-4ff2aaf3d67a)
   
   ![2](https://github.com/apache/spark/assets/1320706/44f02999-b90d-4ae9-b890-45e2786841b1)
   
   
   ### Different executor request information 
   
   ```python
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
   
   def filter_func(iterator):
       for pdf in iterator:
           yield pdf
   
   df = spark.range(0, 100, 1, 4)
   
   ereqs = ExecutorResourceRequests().cores(6)
   treqs = TaskResourceRequests().cpus(5)
   rp = ResourceProfileBuilder().require(treqs).require(ereqs).build
   df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()
   ```
   ![5](https://github.com/apache/spark/assets/1320706/a375cc87-5f06-47f6-93f8-bddc85f5de67)
   
   ![6](https://github.com/apache/spark/assets/1320706/8bd4cf12-1ed1-4b25-9a89-b607e0b48cac)
   
   
   ![7](https://github.com/apache/spark/assets/1320706/36b6660d-206d-4b1f-86c6-bbef897c2b32)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1908127484

   BTW, I will perform the similar manual tests for spark connect. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1908119227

   # Manual tests
   
   The manual tests were conducted on a spark Standalone cluster with only 1 worker. 
   
   
   ## With dynamic allocation disabled.
   
   ``` bash
   pyspark --master spark://192.168.141.19:7077 --conf spark.executor.cores=4 --conf spark.task.cpus=1 \
      --conf spark.dynamicAllocation.enabled=false
   ```
   
   The above command requires 1 executor with 4 CPU cores, and the default `task.cpus = 1`, so the default tasks parallelism is 4 at a time.
   
   1. `task.cores=1`
   
   Test code:
   
   ``` python
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
   
   def filter_func(iterator):
       for pdf in iterator:
           yield pdf
   
   df = spark.range(0, 100, 1, 6)
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
   treqs = TaskResourceRequests().cpus(1)
   rp = ResourceProfileBuilder().require(treqs).build
   df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()
   ```
   
   When the required `task.cpus=1`, `executor.cores=4` (No executor resource specified, use the default one), there will be 4 tasks running for rp at the same time.
   
   The entire Spark application consists of a single Spark job that will be divided into two stages. The first shuffle stage comprises 6 tasks, the first 4 tasks will be executed simultaneously, then the last 2 tasks.
   
   ![1](https://github.com/apache/spark/assets/1320706/6d587336-01c8-4646-aa4a-adaf0c78450e)
   
   
   The second ResultStage comprises 3 tasks, all of which will be executed simultaneously since the required `task.cpus` is  1.
   
   ![2](https://github.com/apache/spark/assets/1320706/a2040f5f-abc7-4b63-a6b0-f1984688a58c)
   
   
   2. `task.cores=2`
   
   Test code,
   
   ``` python
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
   
   def filter_func(iterator):
       for pdf in iterator:
           yield pdf
   
   df = spark.range(0, 100, 1, 6)
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
   treqs = TaskResourceRequests().cpus(2)
   rp = ResourceProfileBuilder().require(treqs).build
   df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()
   ```
   
   When the required `task.cpus=2`, `executor.cores=4` (No executor resource specified, use the default one), there will be 2 tasks running for rp.
   
   The first shuffle stage behaves the same as the first one. 
   
   The second ResultStage comprises 3 tasks, so the first 2 tasks will be running at a time, and then execute the last task.
   
   ![3](https://github.com/apache/spark/assets/1320706/61f8c765-b3f8-4fa5-b074-80197ad0de59)
   
   
   3. `task.cores=3`
   
   Test code,
   
   ``` python
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
   
   def filter_func(iterator):
       for pdf in iterator:
           yield pdf
   
   df = spark.range(0, 100, 1, 6)
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
   treqs = TaskResourceRequests().cpus(3)
   rp = ResourceProfileBuilder().require(treqs).build
   df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()
   ```
   
   When the required `task.cpus=3`, `executor.cores=4` (No executor resource specified, use the default one), there will be 1 task running for rp.
   
   The first shuffle stage behaves the same as the first one. 
   
   The second ResultStage comprises 3 tasks, all of which will be running serially.
   
   ![4](https://github.com/apache/spark/assets/1320706/5f1694d5-d33c-482c-8b81-08c3cd70734d)
   
   
   4. `task.cores=5`
   
   ``` python
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
   
   def filter_func(iterator):
       for pdf in iterator:
           yield pdf
   
   df = spark.range(0, 100, 1, 6)
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
   treqs = TaskResourceRequests().cpus(5)
   rp = ResourceProfileBuilder().require(treqs).build
   df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()
   ```
   
   exception happened.
   ``` console
   4j.protocol.Py4JJavaError: An error occurred while calling o160.collectToPython.
   : org.apache.spark.SparkException: The number of cores per executor (=4) has to be >= the number of cpus per task = 5.
   	at org.apache.spark.resource.ResourceUtils$.validateTaskCpusLargeEnough(ResourceUtils.scala:413)
   	at org.apache.spark.resource.ResourceProfile.calculateTasksAndLimitingResource(ResourceProfile.scala:193)
   	at org.apache.spark.resource.ResourceProfile.$anonfun$limitingResource$1(ResourceProfile.scala:163)
   	at scala.Option.getOrElse(Option.scala:201)
   	at org.apache.spark.resource.ResourceProfile.limitingResource(ResourceProfile.scala:162)
   	at org.apache.spark.resource.ResourceProfileManager.addResourceProfile(ResourceProfileManager.scala:142)
   	at org.apache.spark.rdd.RDD.withResources(RDD.scala:1844)
   	at org.apache.spark.sql.execution.python.MapInBatchExec.$anonfun$doExecute$3(MapInBatchExec.scala:86)
   	at scala.Option.map(Option.scala:242)
   	at org.apache.spark.sql.execution.python.MapInBatchExec.doExecute(MapInBatchExec.scala:86)
   	at org.apache.spark.sql.execution.python.MapInBatchExec.doExecute$(MapInBatchExec.scala:50)
   	at org.apache.spark.sql.execution.python.MapInArrowExec.doExecute(MapInArrowExec.scala:29)
   	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
   	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
   	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
   	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:364)
   	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:445)
   	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:390)
   	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:418)
   	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
   	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4265)
   	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)
   	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:560)
   	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:150)
   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:241)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:116)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:918)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:72)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:196)
   	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)
   	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4262)
   	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
   	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
   	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
   	at py4j.Gateway.invoke(Gateway.java:282)
   	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   	at py4j.commands.CallCommand.execute(CallCommand.java:79)
   	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
   	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
   	at java.base/java.lang.Thread.run(Thread.java:840)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "tgravescs (via GitHub)" <gi...@apache.org>.
tgravescs commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1908218024

   It would be nice to have more description here about what the challenge is (why it just didn't work) and then how you went about solving/fixing it.
   
   > Does this PR introduce any user-facing change?
   > No
   
   Is this really true? doesn't this now allow user to call these together?  Does it affect spark connect api since you mention that?
   
   Please update description as it should be clear to reviewer without looking at the code what the proposed changes are.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44852:
URL: https://github.com/apache/spark/pull/44852#discussion_r1480797398


##########
python/pyspark/sql/tests/test_resources.py:
##########
@@ -0,0 +1,104 @@
+#

Review Comment:
   This has to be added into `dev/sparktestsupport/modules.py`



##########
python/pyspark/sql/tests/test_resources.py:
##########
@@ -0,0 +1,104 @@
+#

Review Comment:
   This has to be added into `dev/sparktestsupport/modules.py`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on code in PR #44852:
URL: https://github.com/apache/spark/pull/44852#discussion_r1480821808


##########
python/pyspark/sql/pandas/map_ops.py:
##########
@@ -15,9 +15,14 @@
 # limitations under the License.
 #
 import sys
-from typing import Union, TYPE_CHECKING
+from typing import Union, TYPE_CHECKING, Optional
+
+from py4j.java_gateway import JavaObject
+
+from pyspark.resource.requests import ExecutorResourceRequests, TaskResourceRequests
 

Review Comment:
   Thx



##########
python/pyspark/sql/pandas/map_ops.py:
##########
@@ -175,6 +196,11 @@ def mapInArrow(
 
             .. versionadded: 3.5.0
 
+        profile : :class:`pyspark.resource.ResourceProfile`. The optional ResourceProfile
+            to be used for mapInPandas.

Review Comment:
   My bad, Thx very much.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1926343075

   Hi @HyukjinKwon, Could you help to review this PR. Thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "tgravescs (via GitHub)" <gi...@apache.org>.
tgravescs commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1908222726

    ```
    def mapInArrow(
           self,
           func: "ArrowMapIterFunction",
           schema: Union[StructType, str],
           barrier: bool = False,
           profile: Optional[ResourceProfile] = None,
   ```
   
   This is definitely a user facing api change


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1908127200

   Hi @WeichenXu123 @zhengruifeng @tgravescs, Could you please help review this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44852:
URL: https://github.com/apache/spark/pull/44852#discussion_r1480797018


##########
python/pyspark/sql/pandas/map_ops.py:
##########
@@ -175,6 +196,11 @@ def mapInArrow(
 
             .. versionadded: 3.5.0
 
+        profile : :class:`pyspark.resource.ResourceProfile`. The optional ResourceProfile
+            to be used for mapInPandas.

Review Comment:
   ```suggestion
               to be used for mapInArrow.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1933236068

   Hi @HyukjinKwon, thx for your review and comment. The CI on the newest commit got passed, could you help review it again? Thx very much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1937357491

   Hi @HyukjinKwon could you help merge?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1907300220

   I was trying to add unit tests to check if the ResourceProfile is correctly applied to the underlying RDD generated in MapInPandasExec, here is my  testing code,
   
   ``` python
           df1 = df.mapInPandas(lambda iter: iter, "id long")
           assert df1.rdd.getResourceProfile() is None
   
           treqs = TaskResourceRequests().cpus(2)
           expected_rp = ResourceProfileBuilder().require(treqs).build
   
           df2 = df.mapInPandas(lambda iter: iter, "id long", False, expected_rp)
           assert df2.rdd.getResourceProfile() is not None
   ```
   
   But the ResourceProfile got from `df2.rdd.getResourceProfile()` is None, the reason for it is `df2.rdd` will add some other extra MapPartitionRDDs that don't have ResourceProfile attached. 
   
   I also tried to use JVM RDD to get the correct parent RDD with the below code,
   
   ``` python
   df2.rdd._jrdd.firstParent()
   ```
   
   or
   
   ``` python
   df2.rdd._jrdd.parent(0)
   ```
   
   But both of them didn't work, with below error messages,
   
   ``` console
   py4j.protocol.Py4JError: An error occurred while calling o45.parent. Trace:
   py4j.Py4JException: Method parent([class java.lang.Integer]) does not exist
   
   y4j.protocol.Py4JError: An error occurred while calling o45.firstParent. Trace:
   py4j.Py4JException: Method firstParent([]) does not exist
   ```
   
   I don't know how to add unit tests for this PR, but I will perform the manual tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1922510973

   Hi @WeichenXu123, @tgravescs, @zhengruifeng, @Ngone51, I've separated the big PR into 2 PRs, one for sql, the other for connect. This PR is only for sql, Could you help review it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1926216182

   @zhengruifeng Could you help review it again? Thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "wbo4958 (via GitHub)" <gi...@apache.org>.
wbo4958 commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1905802760

   I will perform the manual tests for this PR, will update it after done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 closed pull request #44852: [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile
URL: https://github.com/apache/spark/pull/44852


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

Posted by "WeichenXu123 (via GitHub)" <gi...@apache.org>.
WeichenXu123 commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1951856324

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org