You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "maddiedawson (via GitHub)" <gi...@apache.org> on 2023/07/05 22:11:39 UTC

[GitHub] [spark] maddiedawson commented on a diff in pull request #41778: [WIP] DeepspeedDistributor Class That Will Utilize the Deepspeed Launcher Boilerplate

maddiedawson commented on code in PR #41778:
URL: https://github.com/apache/spark/pull/41778#discussion_r1253704912


##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that deepspeed is able to use ssh to coordinate among the nodes for the distributed training"""
+
+    HOME = os.path.expanduser("~")
+    HOSTFILE = f"/{HOME}/hostfile"
+
+    def __init__(
+        self,
+        num_processes: int = 1,
+        local_mode: bool = True,
+        use_gpu: bool = True,
+        deepspeed_config=None,
+    ):
+        super().__init__(num_processes, local_mode, use_gpu)
+        self.deepspeed_config = deepspeed_config
+        self.temp_deepspeed_fname = None
+        self.input_params = self._create_input_params()
+        self.worker_hosts = self._setup_hostfile_info()
+        self.setup_env()
+
+    def _get_gpus_on_node(self, executor_ip: str):
+        # TODO: ask Ricky, Lu, or Maddie if this is the best way to get the GPU information of a particular worker node
+        command = f"ssh {executor_ip} nvidia-smi -L | grep GPU | wc -l"  # pyspark doesn't support this out of the box for some reason, so sadge

Review Comment:
   Let's use pynvm.nvmlDeviceGetCount()



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that deepspeed is able to use ssh to coordinate among the nodes for the distributed training"""

Review Comment:
   Let's provide guidance here on how to do this



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)

Review Comment:
   This does not need to be a util function. Just inline the implementation below



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that deepspeed is able to use ssh to coordinate among the nodes for the distributed training"""
+
+    HOME = os.path.expanduser("~")
+    HOSTFILE = f"/{HOME}/hostfile"
+
+    def __init__(
+        self,
+        num_processes: int = 1,
+        local_mode: bool = True,
+        use_gpu: bool = True,
+        deepspeed_config=None,
+    ):
+        super().__init__(num_processes, local_mode, use_gpu)
+        self.deepspeed_config = deepspeed_config
+        self.temp_deepspeed_fname = None
+        self.input_params = self._create_input_params()
+        self.worker_hosts = self._setup_hostfile_info()
+        self.setup_env()
+
+    def _get_gpus_on_node(self, executor_ip: str):
+        # TODO: ask Ricky, Lu, or Maddie if this is the best way to get the GPU information of a particular worker node
+        command = f"ssh {executor_ip} nvidia-smi -L | grep GPU | wc -l"  # pyspark doesn't support this out of the box for some reason, so sadge
+        proc_res = subprocess.run(command, capture_output=True, text=True, shell=True)
+        if proc_res.returncode:
+            raise RuntimeError(
+                f"something went wrong when running the command {command}. Is nvidia-smi installed?"
+            )
+        num_gpus_on_worker = proc_res.stdout
+        return int(num_gpus_on_worker)
+
+    def _assign_procs_to_worker(self, gpu_node_map: Dict[str, int]) -> Dict[str, int]:
+        procs_left = self.num_processes
+        workers_left_to_serve = len(gpu_node_map)
+        average_procs_per_node = procs_left // workers_left_to_serve
+        gpu_mapped_to_node = {}
+        # sorting allows us to just do a single pass, as filling the smallest capacity nodes first will allow for a single pass
+        sorted_buckets = sorted(gpu_node_map.items(), key=lambda x: x[1])
+        for worker, capacity in sorted_buckets:
+            average_procs_per_node = procs_left // workers_left_to_serve
+            gpu_mapped_to_node[worker] = min(average_procs_per_node, capacity)
+            procs_left -= gpu_mapped_to_node[worker]
+            workers_left_to_serve -= 1
+        if procs_left != 0:
+            self.logger.warning(
+                msg=f"There are not enough GPUS to fully assign processes to nodes; there are {procs_left} processes left over"
+            )
+        return gpu_mapped_to_node
+
+    def _setup_hostfile_info(self):
+        worker_hosts = [
+            executor.host()
+            for executor in self.spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()
+        ]  # we should check if this returns the driver or not
+        worker_count = len(worker_hosts)  # find out if this number includes the driver or not
+        rdd = spark.sparkContext.parallelize(range(worker_count), numSlices=worker_count)
+
+        # what do I do if the use_gpu flag is false?
+        slots_on_workers = {}
+        if self.use_gpu:
+            for worker_host in worker_hosts:
+                slots_on_workers[worker_host] = self._get_gpus_on_node(worker_host)
+        else:
+            raise RuntimeError("Deepspeed doesn't work with non-GPU clusters at this time")
+
+        assigned_slots = self._assign_procs_to_worker(slots_on_workers)
+        self.logger.info(f"Writing to {DeepspeedDistributor.HOSTFILE}")
+        for worker_host in worker_hosts:
+            line = f"{worker_host} slots={assigned_slots[worker_host]}\n"
+            write_to_location(DeepspeedDistributor.HOSTFILE, line)
+        return worker_hosts
+
+    def setup_env(self):
+        try:
+            subprocess.run("deepspeed --version".split())
+            subprocess.run("ninja --version".split())
+            with open(f"/{DeepspeedDistributor.HOME}/.deepspeed_env", "w") as f:
+                # if this is open; don't add that to path if they're not running on databricks
+                # TODO: figure out what paths to add to this, because if this is OSS we don't want to constantly add a databricks filepath
+                f.write(
+                    "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
+                )
+        except:
+            raise ImportError("Install deepspeed and ninja on the cluster using PyPi")
+
+    def _create_deepspeed_command(
+        self, input_params: Dict[str, Any], path_to_train_file: str, *args: Any
+    ):
+        local_mode = input_params["local_mode"]
+        num_processes = input_params["num_processes"]
+        deepspeed_config = input_params["deepspeed_config"]
+        if isinstance(deepspeed_config, dict):
+            with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".json") as f:
+                json.dump(deepspeed_config, f)
+                deepspeed_config_path = f.name
+                self.temp_deepspeed_fname = f.name
+        else:
+            deepspeed_config_path = deepspeed_config
+        if local_mode:
+            deepspeed_args = [
+                "--num_gpus",
+                str(num_processes),
+            ]  # no need for num nodes, the host file, or any port stuff (no communiation)
+        else:
+            deepspeed_args = [
+                "--num_gpus",
+                str(input_params["num_processes"]),
+                "--num_nodes",
+                str(len(self.worker_hosts)),
+                "--hostfile",
+                str(DeepspeedDistributor.HOSTFILE),
+                "--master_addr",
+                str(self.worker_hosts[0]),
+                "--master_port=9902",
+            ]
+        return [
+            "deepspeed",
+            *deepspeed_args,
+            path_to_train_file,
+            *args,
+            "--deepspeed",
+            "--deepspeed_config",
+            deepspeed_config_path,
+        ]
+
+    def _run_training_on_pytorch_file(
+        self, input_params: Dict[str, Any], train_path: str, *args: Any, **kwargs: Any
+    ) -> None:
+        if kwargs:
+            raise ValueError("Running pytorch file does not support key-word type arguments.")
+        training_command = self._create_deepspeed_command(input_params, train_path, *args)
+        TorchDistributor._execute_command(
+            training_command
+        )  # should we include some form of logging here
+
+    def run(self, train_object: Union[Callable, str], *args: Any, **kwargs: Any) -> Optional[Any]:
+        if isinstance(train_object, str):
+            self._run_training_on_pytorch_file(self.input_params, train_object, *args, **kwargs)  # type: ignore
+        else:
+            raise RuntimeError("Using functions isn't implemented yet. Next iteration.")

Review Comment:
   What does "next iteration" mean here?



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that deepspeed is able to use ssh to coordinate among the nodes for the distributed training"""
+
+    HOME = os.path.expanduser("~")
+    HOSTFILE = f"/{HOME}/hostfile"
+
+    def __init__(
+        self,
+        num_processes: int = 1,
+        local_mode: bool = True,
+        use_gpu: bool = True,
+        deepspeed_config=None,
+    ):
+        super().__init__(num_processes, local_mode, use_gpu)
+        self.deepspeed_config = deepspeed_config
+        self.temp_deepspeed_fname = None
+        self.input_params = self._create_input_params()
+        self.worker_hosts = self._setup_hostfile_info()
+        self.setup_env()
+
+    def _get_gpus_on_node(self, executor_ip: str):
+        # TODO: ask Ricky, Lu, or Maddie if this is the best way to get the GPU information of a particular worker node
+        command = f"ssh {executor_ip} nvidia-smi -L | grep GPU | wc -l"  # pyspark doesn't support this out of the box for some reason, so sadge
+        proc_res = subprocess.run(command, capture_output=True, text=True, shell=True)
+        if proc_res.returncode:
+            raise RuntimeError(
+                f"something went wrong when running the command {command}. Is nvidia-smi installed?"
+            )
+        num_gpus_on_worker = proc_res.stdout
+        return int(num_gpus_on_worker)
+
+    def _assign_procs_to_worker(self, gpu_node_map: Dict[str, int]) -> Dict[str, int]:
+        procs_left = self.num_processes
+        workers_left_to_serve = len(gpu_node_map)
+        average_procs_per_node = procs_left // workers_left_to_serve
+        gpu_mapped_to_node = {}
+        # sorting allows us to just do a single pass, as filling the smallest capacity nodes first will allow for a single pass
+        sorted_buckets = sorted(gpu_node_map.items(), key=lambda x: x[1])
+        for worker, capacity in sorted_buckets:
+            average_procs_per_node = procs_left // workers_left_to_serve
+            gpu_mapped_to_node[worker] = min(average_procs_per_node, capacity)
+            procs_left -= gpu_mapped_to_node[worker]
+            workers_left_to_serve -= 1
+        if procs_left != 0:
+            self.logger.warning(
+                msg=f"There are not enough GPUS to fully assign processes to nodes; there are {procs_left} processes left over"
+            )
+        return gpu_mapped_to_node
+
+    def _setup_hostfile_info(self):
+        worker_hosts = [
+            executor.host()
+            for executor in self.spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()
+        ]  # we should check if this returns the driver or not
+        worker_count = len(worker_hosts)  # find out if this number includes the driver or not
+        rdd = spark.sparkContext.parallelize(range(worker_count), numSlices=worker_count)
+
+        # what do I do if the use_gpu flag is false?
+        slots_on_workers = {}
+        if self.use_gpu:
+            for worker_host in worker_hosts:
+                slots_on_workers[worker_host] = self._get_gpus_on_node(worker_host)
+        else:
+            raise RuntimeError("Deepspeed doesn't work with non-GPU clusters at this time")
+
+        assigned_slots = self._assign_procs_to_worker(slots_on_workers)
+        self.logger.info(f"Writing to {DeepspeedDistributor.HOSTFILE}")
+        for worker_host in worker_hosts:
+            line = f"{worker_host} slots={assigned_slots[worker_host]}\n"
+            write_to_location(DeepspeedDistributor.HOSTFILE, line)
+        return worker_hosts
+
+    def setup_env(self):
+        try:
+            subprocess.run("deepspeed --version".split())
+            subprocess.run("ninja --version".split())
+            with open(f"/{DeepspeedDistributor.HOME}/.deepspeed_env", "w") as f:
+                # if this is open; don't add that to path if they're not running on databricks
+                # TODO: figure out what paths to add to this, because if this is OSS we don't want to constantly add a databricks filepath
+                f.write(
+                    "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
+                )
+        except:
+            raise ImportError("Install deepspeed and ninja on the cluster using PyPi")
+
+    def _create_deepspeed_command(
+        self, input_params: Dict[str, Any], path_to_train_file: str, *args: Any
+    ):
+        local_mode = input_params["local_mode"]
+        num_processes = input_params["num_processes"]
+        deepspeed_config = input_params["deepspeed_config"]
+        if isinstance(deepspeed_config, dict):
+            with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".json") as f:
+                json.dump(deepspeed_config, f)
+                deepspeed_config_path = f.name
+                self.temp_deepspeed_fname = f.name
+        else:
+            deepspeed_config_path = deepspeed_config
+        if local_mode:
+            deepspeed_args = [
+                "--num_gpus",
+                str(num_processes),
+            ]  # no need for num nodes, the host file, or any port stuff (no communiation)
+        else:
+            deepspeed_args = [
+                "--num_gpus",
+                str(input_params["num_processes"]),
+                "--num_nodes",
+                str(len(self.worker_hosts)),
+                "--hostfile",
+                str(DeepspeedDistributor.HOSTFILE),
+                "--master_addr",
+                str(self.worker_hosts[0]),
+                "--master_port=9902",
+            ]
+        return [
+            "deepspeed",
+            *deepspeed_args,
+            path_to_train_file,
+            *args,
+            "--deepspeed",
+            "--deepspeed_config",
+            deepspeed_config_path,
+        ]
+
+    def _run_training_on_pytorch_file(
+        self, input_params: Dict[str, Any], train_path: str, *args: Any, **kwargs: Any
+    ) -> None:
+        if kwargs:
+            raise ValueError("Running pytorch file does not support key-word type arguments.")
+        training_command = self._create_deepspeed_command(input_params, train_path, *args)
+        TorchDistributor._execute_command(
+            training_command
+        )  # should we include some form of logging here

Review Comment:
   Let's not reference the TorchDistributor in this file. Could we move _execute_command to the Distributor?



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that deepspeed is able to use ssh to coordinate among the nodes for the distributed training"""
+
+    HOME = os.path.expanduser("~")
+    HOSTFILE = f"/{HOME}/hostfile"
+
+    def __init__(
+        self,
+        num_processes: int = 1,
+        local_mode: bool = True,
+        use_gpu: bool = True,
+        deepspeed_config=None,
+    ):
+        super().__init__(num_processes, local_mode, use_gpu)
+        self.deepspeed_config = deepspeed_config
+        self.temp_deepspeed_fname = None

Review Comment:
   Let's make this name more useful, e.g. deepspeed_config_filepath



##########
python/pyspark/ml/torch/deepspeed/deepspeed_distributer.py:
##########
@@ -0,0 +1,167 @@
+import json
+import os
+import subprocess
+import tempfile
+from typing import (
+    Union,
+    Callable,
+    List,
+    Dict,
+    Optional,
+    Any,
+)
+from pyspark.ml.torch.distributor import Distributor, TorchDistributor
+
+
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+
+class DeepspeedDistributor(Distributor):
+    """The user must ensure that their cluster is ssh-keychained and that deepspeed is able to use ssh to coordinate among the nodes for the distributed training"""
+
+    HOME = os.path.expanduser("~")
+    HOSTFILE = f"/{HOME}/hostfile"
+
+    def __init__(
+        self,
+        num_processes: int = 1,
+        local_mode: bool = True,
+        use_gpu: bool = True,
+        deepspeed_config=None,
+    ):
+        super().__init__(num_processes, local_mode, use_gpu)
+        self.deepspeed_config = deepspeed_config
+        self.temp_deepspeed_fname = None
+        self.input_params = self._create_input_params()
+        self.worker_hosts = self._setup_hostfile_info()
+        self.setup_env()
+
+    def _get_gpus_on_node(self, executor_ip: str):
+        # TODO: ask Ricky, Lu, or Maddie if this is the best way to get the GPU information of a particular worker node
+        command = f"ssh {executor_ip} nvidia-smi -L | grep GPU | wc -l"  # pyspark doesn't support this out of the box for some reason, so sadge
+        proc_res = subprocess.run(command, capture_output=True, text=True, shell=True)
+        if proc_res.returncode:
+            raise RuntimeError(
+                f"something went wrong when running the command {command}. Is nvidia-smi installed?"
+            )
+        num_gpus_on_worker = proc_res.stdout
+        return int(num_gpus_on_worker)
+
+    def _assign_procs_to_worker(self, gpu_node_map: Dict[str, int]) -> Dict[str, int]:
+        procs_left = self.num_processes
+        workers_left_to_serve = len(gpu_node_map)
+        average_procs_per_node = procs_left // workers_left_to_serve
+        gpu_mapped_to_node = {}
+        # sorting allows us to just do a single pass, as filling the smallest capacity nodes first will allow for a single pass
+        sorted_buckets = sorted(gpu_node_map.items(), key=lambda x: x[1])
+        for worker, capacity in sorted_buckets:
+            average_procs_per_node = procs_left // workers_left_to_serve
+            gpu_mapped_to_node[worker] = min(average_procs_per_node, capacity)
+            procs_left -= gpu_mapped_to_node[worker]
+            workers_left_to_serve -= 1
+        if procs_left != 0:
+            self.logger.warning(
+                msg=f"There are not enough GPUS to fully assign processes to nodes; there are {procs_left} processes left over"
+            )
+        return gpu_mapped_to_node
+
+    def _setup_hostfile_info(self):
+        worker_hosts = [
+            executor.host()
+            for executor in self.spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()
+        ]  # we should check if this returns the driver or not
+        worker_count = len(worker_hosts)  # find out if this number includes the driver or not
+        rdd = spark.sparkContext.parallelize(range(worker_count), numSlices=worker_count)
+
+        # what do I do if the use_gpu flag is false?
+        slots_on_workers = {}
+        if self.use_gpu:
+            for worker_host in worker_hosts:
+                slots_on_workers[worker_host] = self._get_gpus_on_node(worker_host)
+        else:
+            raise RuntimeError("Deepspeed doesn't work with non-GPU clusters at this time")
+
+        assigned_slots = self._assign_procs_to_worker(slots_on_workers)
+        self.logger.info(f"Writing to {DeepspeedDistributor.HOSTFILE}")
+        for worker_host in worker_hosts:
+            line = f"{worker_host} slots={assigned_slots[worker_host]}\n"
+            write_to_location(DeepspeedDistributor.HOSTFILE, line)
+        return worker_hosts
+
+    def setup_env(self):
+        try:
+            subprocess.run("deepspeed --version".split())
+            subprocess.run("ninja --version".split())
+            with open(f"/{DeepspeedDistributor.HOME}/.deepspeed_env", "w") as f:
+                # if this is open; don't add that to path if they're not running on databricks
+                # TODO: figure out what paths to add to this, because if this is OSS we don't want to constantly add a databricks filepath
+                f.write(
+                    "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
+                )
+        except:
+            raise ImportError("Install deepspeed and ninja on the cluster using PyPi")

Review Comment:
   Can this be a private function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org