You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "es94129 (via GitHub)" <gi...@apache.org> on 2023/07/06 03:18:11 UTC

[GitHub] [spark] es94129 commented on a diff in pull request #41778: [WIP] DeepspeedDistributor Class That Will Utilize the Deepspeed Launcher Boilerplate

es94129 commented on code in PR #41778:
URL: https://github.com/apache/spark/pull/41778#discussion_r1253876215


##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that deepspeed is able to use ssh to coordinate among the nodes for the distributed training"""
+
+    HOME = os.path.expanduser("~")
+    HOSTFILE = f"/{HOME}/hostfile"
+
+    def __init__(
+        self,
+        num_processes: int = 1,
+        local_mode: bool = True,
+        use_gpu: bool = True,
+        deepspeed_config=None,
+    ):
+        super().__init__(num_processes, local_mode, use_gpu)
+        self.deepspeed_config = deepspeed_config
+        self.temp_deepspeed_fname = None
+        self.input_params = self._create_input_params()
+        self.worker_hosts = self._setup_hostfile_info()
+        self.setup_env()
+
+    def _get_gpus_on_node(self, executor_ip: str):
+        # TODO: ask Ricky, Lu, or Maddie if this is the best way to get the GPU information of a particular worker node
+        command = f"ssh {executor_ip} nvidia-smi -L | grep GPU | wc -l"  # pyspark doesn't support this out of the box for some reason, so sadge
+        proc_res = subprocess.run(command, capture_output=True, text=True, shell=True)
+        if proc_res.returncode:
+            raise RuntimeError(
+                f"something went wrong when running the command {command}. Is nvidia-smi installed?"
+            )
+        num_gpus_on_worker = proc_res.stdout
+        return int(num_gpus_on_worker)
+
+    def _assign_procs_to_worker(self, gpu_node_map: Dict[str, int]) -> Dict[str, int]:
+        procs_left = self.num_processes
+        workers_left_to_serve = len(gpu_node_map)
+        average_procs_per_node = procs_left // workers_left_to_serve
+        gpu_mapped_to_node = {}
+        # sorting allows us to just do a single pass, as filling the smallest capacity nodes first will allow for a single pass
+        sorted_buckets = sorted(gpu_node_map.items(), key=lambda x: x[1])
+        for worker, capacity in sorted_buckets:
+            average_procs_per_node = procs_left // workers_left_to_serve
+            gpu_mapped_to_node[worker] = min(average_procs_per_node, capacity)
+            procs_left -= gpu_mapped_to_node[worker]
+            workers_left_to_serve -= 1
+        if procs_left != 0:
+            self.logger.warning(
+                msg=f"There are not enough GPUS to fully assign processes to nodes; there are {procs_left} processes left over"
+            )
+        return gpu_mapped_to_node
+
+    def _setup_hostfile_info(self):
+        worker_hosts = [
+            executor.host()
+            for executor in self.spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()
+        ]  # we should check if this returns the driver or not
+        worker_count = len(worker_hosts)  # find out if this number includes the driver or not
+        rdd = spark.sparkContext.parallelize(range(worker_count), numSlices=worker_count)
+
+        # what do I do if the use_gpu flag is false?
+        slots_on_workers = {}
+        if self.use_gpu:
+            for worker_host in worker_hosts:
+                slots_on_workers[worker_host] = self._get_gpus_on_node(worker_host)
+        else:
+            raise RuntimeError("Deepspeed doesn't work with non-GPU clusters at this time")
+
+        assigned_slots = self._assign_procs_to_worker(slots_on_workers)

Review Comment:
   It doesn't seem intuitive for users to pass in the total `num_processes` and let `DeepspeedDistributor` assign them to the GPUs of each node. Can it just be `f"{worker_host} slots={self._get_gpus_on_node(worker_host)}"`?
   https://www.deepspeed.ai/getting-started/#resource-configuration-multi-node



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that deepspeed is able to use ssh to coordinate among the nodes for the distributed training"""
+
+    HOME = os.path.expanduser("~")
+    HOSTFILE = f"/{HOME}/hostfile"
+
+    def __init__(
+        self,
+        num_processes: int = 1,
+        local_mode: bool = True,
+        use_gpu: bool = True,
+        deepspeed_config=None,
+    ):
+        super().__init__(num_processes, local_mode, use_gpu)
+        self.deepspeed_config = deepspeed_config
+        self.temp_deepspeed_fname = None
+        self.input_params = self._create_input_params()
+        self.worker_hosts = self._setup_hostfile_info()
+        self.setup_env()
+
+    def _get_gpus_on_node(self, executor_ip: str):
+        # TODO: ask Ricky, Lu, or Maddie if this is the best way to get the GPU information of a particular worker node
+        command = f"ssh {executor_ip} nvidia-smi -L | grep GPU | wc -l"  # pyspark doesn't support this out of the box for some reason, so sadge
+        proc_res = subprocess.run(command, capture_output=True, text=True, shell=True)
+        if proc_res.returncode:
+            raise RuntimeError(
+                f"something went wrong when running the command {command}. Is nvidia-smi installed?"
+            )
+        num_gpus_on_worker = proc_res.stdout
+        return int(num_gpus_on_worker)
+
+    def _assign_procs_to_worker(self, gpu_node_map: Dict[str, int]) -> Dict[str, int]:
+        procs_left = self.num_processes
+        workers_left_to_serve = len(gpu_node_map)
+        average_procs_per_node = procs_left // workers_left_to_serve
+        gpu_mapped_to_node = {}
+        # sorting allows us to just do a single pass, as filling the smallest capacity nodes first will allow for a single pass
+        sorted_buckets = sorted(gpu_node_map.items(), key=lambda x: x[1])
+        for worker, capacity in sorted_buckets:
+            average_procs_per_node = procs_left // workers_left_to_serve
+            gpu_mapped_to_node[worker] = min(average_procs_per_node, capacity)
+            procs_left -= gpu_mapped_to_node[worker]
+            workers_left_to_serve -= 1
+        if procs_left != 0:
+            self.logger.warning(
+                msg=f"There are not enough GPUS to fully assign processes to nodes; there are {procs_left} processes left over"
+            )
+        return gpu_mapped_to_node
+
+    def _setup_hostfile_info(self):
+        worker_hosts = [
+            executor.host()
+            for executor in self.spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()
+        ]  # we should check if this returns the driver or not
+        worker_count = len(worker_hosts)  # find out if this number includes the driver or not
+        rdd = spark.sparkContext.parallelize(range(worker_count), numSlices=worker_count)
+
+        # what do I do if the use_gpu flag is false?
+        slots_on_workers = {}
+        if self.use_gpu:
+            for worker_host in worker_hosts:
+                slots_on_workers[worker_host] = self._get_gpus_on_node(worker_host)
+        else:
+            raise RuntimeError("Deepspeed doesn't work with non-GPU clusters at this time")
+
+        assigned_slots = self._assign_procs_to_worker(slots_on_workers)
+        self.logger.info(f"Writing to {DeepspeedDistributor.HOSTFILE}")
+        for worker_host in worker_hosts:
+            line = f"{worker_host} slots={assigned_slots[worker_host]}\n"
+            write_to_location(DeepspeedDistributor.HOSTFILE, line)
+        return worker_hosts
+
+    def setup_env(self):
+        try:
+            subprocess.run("deepspeed --version".split())
+            subprocess.run("ninja --version".split())
+            with open(f"/{DeepspeedDistributor.HOME}/.deepspeed_env", "w") as f:
+                # if this is open; don't add that to path if they're not running on databricks
+                # TODO: figure out what paths to add to this, because if this is OSS we don't want to constantly add a databricks filepath
+                f.write(
+                    "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
+                )
+        except:
+            raise ImportError("Install deepspeed and ninja on the cluster using PyPi")
+
+    def _create_deepspeed_command(
+        self, input_params: Dict[str, Any], path_to_train_file: str, *args: Any
+    ):
+        local_mode = input_params["local_mode"]
+        num_processes = input_params["num_processes"]
+        deepspeed_config = input_params["deepspeed_config"]
+        if isinstance(deepspeed_config, dict):
+            with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".json") as f:
+                json.dump(deepspeed_config, f)
+                deepspeed_config_path = f.name
+                self.temp_deepspeed_fname = f.name
+        else:
+            deepspeed_config_path = deepspeed_config
+        if local_mode:
+            deepspeed_args = [
+                "--num_gpus",
+                str(num_processes),
+            ]  # no need for num nodes, the host file, or any port stuff (no communiation)
+        else:
+            deepspeed_args = [
+                "--num_gpus",
+                str(input_params["num_processes"]),

Review Comment:
   Does user pass `num_processes` as (a) the total number of GPUs among worker nodes, or (b) the max number of GPUs on each node?
   
   `num_processes` for `TorchDistributor` is (a), meanwhile Deepspeed launcher takes `num_gpus` as (b): https://github.com/microsoft/DeepSpeed/blob/master/deepspeed/launcher/runner.py#L105



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that deepspeed is able to use ssh to coordinate among the nodes for the distributed training"""
+
+    HOME = os.path.expanduser("~")
+    HOSTFILE = f"/{HOME}/hostfile"
+
+    def __init__(
+        self,
+        num_processes: int = 1,
+        local_mode: bool = True,
+        use_gpu: bool = True,
+        deepspeed_config=None,
+    ):
+        super().__init__(num_processes, local_mode, use_gpu)
+        self.deepspeed_config = deepspeed_config
+        self.temp_deepspeed_fname = None
+        self.input_params = self._create_input_params()
+        self.worker_hosts = self._setup_hostfile_info()
+        self.setup_env()
+
+    def _get_gpus_on_node(self, executor_ip: str):
+        # TODO: ask Ricky, Lu, or Maddie if this is the best way to get the GPU information of a particular worker node
+        command = f"ssh {executor_ip} nvidia-smi -L | grep GPU | wc -l"  # pyspark doesn't support this out of the box for some reason, so sadge

Review Comment:
   From the Databricks cluster creation UI, it's only possible to create cluster with executors of the same instance type.



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)

Review Comment:
   +1
   It's clearer to put it directly in the implementation in this case.



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that deepspeed is able to use ssh to coordinate among the nodes for the distributed training"""

Review Comment:
   +1
   Can you also provide explanations of the parameters, and an example snippet on how to use `DeepspeedDistributor` in the docstring like https://sourcegraph.com/github.com/apache/spark/-/blob/python/pyspark/ml/torch/distributor.py?L312?
   
   It is not clear for users what `deepspeed_config` is expected to be, and how to set `num_processes`.



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that deepspeed is able to use ssh to coordinate among the nodes for the distributed training"""
+
+    HOME = os.path.expanduser("~")
+    HOSTFILE = f"/{HOME}/hostfile"
+
+    def __init__(
+        self,
+        num_processes: int = 1,
+        local_mode: bool = True,
+        use_gpu: bool = True,
+        deepspeed_config=None,
+    ):
+        super().__init__(num_processes, local_mode, use_gpu)
+        self.deepspeed_config = deepspeed_config
+        self.temp_deepspeed_fname = None
+        self.input_params = self._create_input_params()
+        self.worker_hosts = self._setup_hostfile_info()
+        self.setup_env()
+
+    def _get_gpus_on_node(self, executor_ip: str):
+        # TODO: ask Ricky, Lu, or Maddie if this is the best way to get the GPU information of a particular worker node
+        command = f"ssh {executor_ip} nvidia-smi -L | grep GPU | wc -l"  # pyspark doesn't support this out of the box for some reason, so sadge
+        proc_res = subprocess.run(command, capture_output=True, text=True, shell=True)
+        if proc_res.returncode:
+            raise RuntimeError(
+                f"something went wrong when running the command {command}. Is nvidia-smi installed?"
+            )
+        num_gpus_on_worker = proc_res.stdout
+        return int(num_gpus_on_worker)
+
+    def _assign_procs_to_worker(self, gpu_node_map: Dict[str, int]) -> Dict[str, int]:
+        procs_left = self.num_processes
+        workers_left_to_serve = len(gpu_node_map)
+        average_procs_per_node = procs_left // workers_left_to_serve
+        gpu_mapped_to_node = {}
+        # sorting allows us to just do a single pass, as filling the smallest capacity nodes first will allow for a single pass
+        sorted_buckets = sorted(gpu_node_map.items(), key=lambda x: x[1])
+        for worker, capacity in sorted_buckets:
+            average_procs_per_node = procs_left // workers_left_to_serve
+            gpu_mapped_to_node[worker] = min(average_procs_per_node, capacity)
+            procs_left -= gpu_mapped_to_node[worker]
+            workers_left_to_serve -= 1
+        if procs_left != 0:
+            self.logger.warning(
+                msg=f"There are not enough GPUS to fully assign processes to nodes; there are {procs_left} processes left over"
+            )
+        return gpu_mapped_to_node
+
+    def _setup_hostfile_info(self):
+        worker_hosts = [
+            executor.host()
+            for executor in self.spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()
+        ]  # we should check if this returns the driver or not
+        worker_count = len(worker_hosts)  # find out if this number includes the driver or not
+        rdd = spark.sparkContext.parallelize(range(worker_count), numSlices=worker_count)
+
+        # what do I do if the use_gpu flag is false?

Review Comment:
   It's reasonable to completely not support `use_gpu=False` for `DeepspeedDistributor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org