You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/28 23:38:13 UTC

[GitHub] [spark] rithwik-db opened a new pull request, #39267: [SPARK-41592] Pytorch file Distributed Training

rithwik-db opened a new pull request, #39267:
URL: https://github.com/apache/spark/pull/39267

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   NOTE: If you want to only view the diff from the other WIP PR regarding the baseline changes, look at the LAST COMMIT in this PR's commit history (titled Running PyTorch Files Distributed-ly). Since I am sending out parallel PRs that are related, you should view this commit to see the diff pertaining to this ticket.
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   This is an addition to https://github.com/apache/spark/pull/39188 to add support for multi node training using PyTorch files. The users would follow the second workflow in the [design document](https://docs.google.com/document/d/1QPO1Ly8WteL6aIPvVcR7Xne9qVtJiB3fdrRn7NwBcpA/edit#heading=h.8yvw9xq428fh) to run training on the executors. I added some new utility functions as well as built on top of current functions. This is largely WIP so testing will be added very soon.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   Look at the [main ticket](https://issues.apache.org/jira/browse/SPARK-41589) for more details.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   No.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   Tested with a pseudo-integration test (doesn't actually reflect pytorch involvement yet)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rithwik-db commented on pull request #39267: [SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by "rithwik-db (via GitHub)" <gi...@apache.org>.
rithwik-db commented on PR #39267:
URL: https://github.com/apache/spark/pull/39267#issuecomment-1503834517

   If we are using CPUs for training, `num_processors` attribute refers to the number of spark tasks that will be created for training, and each spark task can use >1 CPU depending on what `spark.task.cpus` says. (This [function](https://github.com/apache/spark/blob/master/python/pyspark/ml/torch/distributor.py#L167) is where this logic is defined). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rithwik-db commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
rithwik-db commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1069064636


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)
+            else:
+                os.environ[CUDA_VISIBLE_DEVICES] = ""

Review Comment:
   PyTorch Lightning will raise a `MisconfigurationException: No supported gpu backend found!`. This is what we expect to see if the user sets `use_gpu=False` and calls `pl.Trainer(accelerator="gpu")` My understanding is that if a user runs this code on a local cluster with GPUs on each node without `os.environ[CUDA_VISIBLE_DEVICES] = ""`, then the task will be assigning the task a GPU even when `use_gpu=True`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rithwik-db commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
rithwik-db commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1068751120


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:

Review Comment:
   Sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rithwik-db commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
rithwik-db commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1069064636


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)
+            else:
+                os.environ[CUDA_VISIBLE_DEVICES] = ""

Review Comment:
   PyTorch Lightning will raise a `MisconfigurationException: No supported gpu backend found!`. This is what we expect to see if the user sets `use_gpu=False` and calls `pl.Trainer(accelerator="gpu")` My understanding is that if a user runs this code on a local cluster with GPUs on each node without `"os.environ[CUDA_VISIBLE_DEVICES] = ""`, then the task will be assigning the task a GPU even when `use_gpu=True`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lu-wang-dl commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
lu-wang-dl commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1068711508


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)
+            else:
+                os.environ[CUDA_VISIBLE_DEVICES] = ""

Review Comment:
   Do we need to do this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lu-wang-dl commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
lu-wang-dl commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1068709600


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?

Review Comment:
   What does this mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rithwik-db commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
rithwik-db commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1072943054


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)

Review Comment:
   https://github.com/apache/spark/pull/39267/files#diff-76c395a6b98138662faaec37460ccda966f5cc0df0bccd224dfefcd81b2a7a79R459 <- Is this what you were suggesting?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
WeichenXu123 commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1072267585


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:

Review Comment:
   and I recommend to set maximum retry number for get_free_port loop, to avoid dead loop in some unexpected cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
WeichenXu123 commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1072289202


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)
+            else:
+                os.environ[CUDA_VISIBLE_DEVICES] = ""
+            set_torch_config(context)
+
+            output = framework_wrapper_fn(input_params, train_fn, *args)
+
+            if context.partitionId() == 0:
+                return [output]
+            return [None]
+
+        return wrapped_train_fn
+
+    def _run_distributed_training(
+        self,
+        framework_wrapper_fn: Optional[Callable],
+        train_fn: Union[Callable, str],
+        *args: Any,
+    ) -> Optional[Any]:
+        if not framework_wrapper_fn:
+            raise RuntimeError("Unknown combination of parameters")
+        spark_task_program = self._get_spark_task_program(framework_wrapper_fn, train_fn, *args)
+        self._check_encryption()
+        result = (
+            self.sc.parallelize(range(self.num_tasks), self.num_tasks)
+            .barrier()
+            .mapPartitions(spark_task_program)

Review Comment:
   var rename: `spark_task_function`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lu-wang-dl commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
lu-wang-dl commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1068713107


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)
+            else:
+                os.environ[CUDA_VISIBLE_DEVICES] = ""
+            set_torch_config(context)
+
+            output = framework_wrapper_fn(input_params, train_fn, *args)
+
+            if context.partitionId() == 0:
+                return [output]
+            return [None]
+
+        return wrapped_train_fn
+
+    def _run_distributed_training(
+        self,
+        framework_wrapper_fn: Optional[Callable],
+        train_fn: Union[Callable, str],
+        *args: Any,
+    ) -> Optional[Any]:
+        if not framework_wrapper_fn:
+            raise RuntimeError("Unknown combination of parameters")
+        spark_task_program = self._get_spark_task_program(framework_wrapper_fn, train_fn, *args)

Review Comment:
   Why not just define the function here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lu-wang-dl commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
lu-wang-dl commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1068708067


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:

Review Comment:
   Add doc string?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rithwik-db commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
rithwik-db commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1068750849


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?

Review Comment:
   Something like the following seems to error:
   ```
   import socket
   sock = socket.socket()
   sock.bind((master_address, 0))
   port = sock.getsockname()[1]
   ```
   So I just find a port using randomness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rithwik-db commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
rithwik-db commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1067576354


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -407,13 +418,6 @@ def _run_local_training(
         try:
             if self.use_gpu:
                 gpus_owned = get_gpus_owned(self.sc)
-

Review Comment:
   This is actually no longer needed since if `num_processes > len(gpus_owned)`, then we set `num_processes = len(gpus_owned)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
WeichenXu123 commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1072266412


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:

Review Comment:
   Shall we add a sleep(0.1) in the loop body ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
WeichenXu123 commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1072267585


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:

Review Comment:
   and I recommend to set maximum retry number (e.g. 100) for get_free_port loop, to avoid dead loop in some unexpected cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
WeichenXu123 commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1072290872


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)
+            else:
+                os.environ[CUDA_VISIBLE_DEVICES] = ""
+            set_torch_config(context)
+
+            output = framework_wrapper_fn(input_params, train_fn, *args)
+
+            if context.partitionId() == 0:
+                return [output]
+            return [None]

Review Comment:
   We usually write 
   ```
   if context.partitionId() == 0:
     yield output
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
WeichenXu123 commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1072300627


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -325,8 +329,15 @@ def _create_torchrun_command(
             torchrun_args = ["--standalone", "--nnodes=1"]
             processes_per_node = num_processes
         else:
-            pass
-            # TODO(SPARK-41592): Handle distributed training
+            master_addr, master_port = os.environ["MASTER_ADDR"], os.environ["MASTER_PORT"]
+            node_rank = os.environ["RANK"]
+            torchrun_args = [
+                f"--nnodes={num_processes}",
+                f"--node_rank={node_rank}",
+                f"--rdzv_endpoint={master_addr}:{master_port}",
+                "--rdzv_id=0",
+            ]  # TODO: setup random ID that is gleaned from env variables
+            processes_per_node = 1

Review Comment:
   We don't need setting `preexec_fn=sigterm_on_parent_death` when executing `torch_run_process_wrapper` subprocess



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #39267: [SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
HyukjinKwon closed pull request #39267: [SPARK-41592][PYTHON][ML]  Pytorch file Distributed Training
URL: https://github.com/apache/spark/pull/39267


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rithwik-db commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
rithwik-db commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1069068934


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?

Review Comment:
   I believe it will raise a `RuntimeError: Address already in use`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rithwik-db commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
rithwik-db commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1068755416


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)
+            else:
+                os.environ[CUDA_VISIBLE_DEVICES] = ""

Review Comment:
   I think it should be added because in the case the user runs training with `TorchDistributor(use_gpu=False, **kwargs).run(train_fn)` but accidentally has some PyTorch Lightning code like `pl.Trainer(accelerator="gpu")` in their `train_fn`, an error should be raised saying no cuda devices available even though you specified a gpu accelerator.
   
   We already have a check in `get_num_tasks` that checks when `use_gpu=True` but no GPUs are available, and I think this code addresses the case when `use_gpu=False` but the internal code has usage of GPUs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #39267:
URL: https://github.com/apache/spark/pull/39267#issuecomment-1368277978

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
WeichenXu123 commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1072294810


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any

Review Comment:
   train_fn --> train_object ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mattoh91 commented on pull request #39267: [SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by "mattoh91 (via GitHub)" <gi...@apache.org>.
mattoh91 commented on PR #39267:
URL: https://github.com/apache/spark/pull/39267#issuecomment-1503329907

   @rithwik-db Can I clarify that the `num_processors` attribute of the TorchDistributor class refers to the number of spark.executor.cores used, and not the number of spark.executor.instances?
   Trying to use >1 `num_processors` seems to take up more cores / slots on a single executor during training (using spark operator on k8s).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lu-wang-dl commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
lu-wang-dl commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1069049507


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?

Review Comment:
   What happened if two process choose the same port?



##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?

Review Comment:
   What happened if two processes choose the same port?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rithwik-db commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
rithwik-db commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1068751565


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)
+            else:
+                os.environ[CUDA_VISIBLE_DEVICES] = ""
+            set_torch_config(context)
+
+            output = framework_wrapper_fn(input_params, train_fn, *args)
+
+            if context.partitionId() == 0:
+                return [output]
+            return [None]
+
+        return wrapped_train_fn
+
+    def _run_distributed_training(
+        self,
+        framework_wrapper_fn: Optional[Callable],
+        train_fn: Union[Callable, str],
+        *args: Any,
+    ) -> Optional[Any]:
+        if not framework_wrapper_fn:
+            raise RuntimeError("Unknown combination of parameters")
+        spark_task_program = self._get_spark_task_program(framework_wrapper_fn, train_fn, *args)

Review Comment:
   I guess just for the sake of modularity. We could just define the function here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rithwik-db commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
rithwik-db commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1068755416


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)
+            else:
+                os.environ[CUDA_VISIBLE_DEVICES] = ""

Review Comment:
   I think it should be added because in the case the user runs training with `TorchDistributor(use_gpu=False, **kwargs).run(train_fn)` but accidentally has some PyTorch Lightning code like `pl.Trainer(accelerator="gpu")` in their `train_fn`, an error should be raised saying no cuda devices available even though you specified a gpu accelerator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #39267:
URL: https://github.com/apache/spark/pull/39267#issuecomment-1396277228

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
WeichenXu123 commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1072309036


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -325,8 +329,15 @@ def _create_torchrun_command(
             torchrun_args = ["--standalone", "--nnodes=1"]
             processes_per_node = num_processes
         else:
-            pass
-            # TODO(SPARK-41592): Handle distributed training
+            master_addr, master_port = os.environ["MASTER_ADDR"], os.environ["MASTER_PORT"]
+            node_rank = os.environ["RANK"]
+            torchrun_args = [
+                f"--nnodes={num_processes}",
+                f"--node_rank={node_rank}",
+                f"--rdzv_endpoint={master_addr}:{master_port}",
+                "--rdzv_id=0",
+            ]  # TODO: setup random ID that is gleaned from env variables
+            processes_per_node = 1

Review Comment:
   In `torch_run_process_wrapper` code,
   we don't need to set subprocess stdout/stderr argument, the default behavior is sub process stdout/stderr is redirected to parent process stdout/stderr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rithwik-db commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
rithwik-db commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1068757070


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)
+            else:
+                os.environ[CUDA_VISIBLE_DEVICES] = ""

Review Comment:
   @WeichenXu123 @lu-wang-dl is my logic reasonable here or did I misunderstand anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rithwik-db commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
rithwik-db commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1069064636


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)
+            else:
+                os.environ[CUDA_VISIBLE_DEVICES] = ""

Review Comment:
   PyTorch Lightning will raise a `MisconfigurationException: No supported gpu backend found!`. This is what we expect to see if the user sets `use_gpu=False` and calls `pl.Trainer(accelerator="gpu")` My understanding is that if a user runs this code on a local cluster with GPUs on each node without "os.environ[CUDA_VISIBLE_DEVICES] = ""`, then the task will be assigning the task a GPU even when `use_gpu=True`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rithwik-db commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
rithwik-db commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1069064636


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)
+            else:
+                os.environ[CUDA_VISIBLE_DEVICES] = ""

Review Comment:
   PyTorch Lightning will raise a `MisconfigurationException: No supported gpu backend found!`. This is what we expect to see if the user sets `use_gpu=False` and calls `pl.Trainer(accelerator="gpu")` My understanding is that if a user runs this code on a local cluster with GPUs on each node without `os.environ[CUDA_VISIBLE_DEVICES] = ""`, then the task will be assigned a GPU even when `use_gpu=True`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lu-wang-dl commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
lu-wang-dl commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1069056451


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)
+            else:
+                os.environ[CUDA_VISIBLE_DEVICES] = ""

Review Comment:
   I still don't understand. If the user run  something like `pl.Trainer(accelerator="gpu")` on a CPU cluster, what is the behavior from PyTorch lighting?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] WeichenXu123 commented on a diff in pull request #39267: [WIP][SPARK-41592][PYTHON][ML] Pytorch file Distributed Training

Posted by GitBox <gi...@apache.org>.
WeichenXu123 commented on code in PR #39267:
URL: https://github.com/apache/spark/pull/39267#discussion_r1072285042


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -428,6 +432,84 @@ def _run_local_training(
 
         return output
 
+    def _get_spark_task_program(
+        self, framework_wrapper_fn: Optional[Callable], train_fn: Union[Callable, str], *args: Any
+    ) -> Callable:
+        num_processes = self.num_processes
+        num_tasks = self.num_tasks
+        use_gpu = self.use_gpu
+        input_params = self.input_params
+
+        # Spark task program
+        def wrapped_train_fn(_):  # type: ignore[no-untyped-def]
+            import os
+            from pyspark import BarrierTaskContext
+
+            CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+
+            # The idea of setting the random port to 0 doesn't seem to work?
+            def get_free_port(address: str) -> int:
+                import socket
+                import random
+
+                while True:
+                    port = random.randint(32768, 61000)
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    if not (sock.connect_ex((address, port)) == 0):
+                        return port
+
+            def set_torch_config(context: "BarrierTaskContext") -> None:
+                addrs = [e.address.split(":")[0] for e in context.getTaskInfos()]
+
+                os.environ["MASTER_ADDR"] = str(addrs[0])
+                os.environ["MASTER_PORT"] = str(get_free_port(addrs[0]))
+                os.environ["WORLD_SIZE"] = str(num_processes)
+                os.environ["NODE_RANK"] = str(context.partitionId())
+                os.environ["RANK"] = str(context.partitionId())
+
+            def set_gpus(context: "BarrierTaskContext") -> None:
+                gpus_owned = get_gpus_owned(context)
+
+                my_num_gpus = (num_processes // num_tasks) + (
+                    context.partitionId() < (num_processes % num_tasks)
+                )
+                gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)]
+                os.environ[CUDA_VISIBLE_DEVICES] = ",".join(gpu_addresses)
+
+            context = BarrierTaskContext.get()
+
+            if use_gpu:
+                set_gpus(context)

Review Comment:
   We can simplify `set_gpus` function:
   
   if `CUDA_VISIBLE_DEVICES` env var exists, do nothing (spark already set CUDA_VISIBLE_DEVICES properly
   otherwise generates `CUDA_VISIBLE_DEVICES` from `taskcontext.resources["gpu"].addresses`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org