You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "mathewjacob1002 (via GitHub)" <gi...@apache.org> on 2023/07/24 03:19:21 UTC

[GitHub] [spark] mathewjacob1002 opened a new pull request, #42118: E2E Testing for Deepspeed

mathewjacob1002 opened a new pull request, #42118:
URL: https://github.com/apache/spark/pull/42118

   Some error atm; will edit this once resolved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42118: [SPARK-44264][PYTHON]E2E Testing for Deepspeed

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42118:
URL: https://github.com/apache/spark/pull/42118#discussion_r1275790827


##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -164,6 +174,178 @@ def test_create_torchrun_command_distributed(self) -> None:
             self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args)
 
 
+def _create_basic_function() -> Callable:
+    # TODO: swap out with better test function
+    # once Deepspeed better supports CPU
+    def pythagoras(leg1: float, leg2: float) -> float:
+        import deepspeed
+
+        print(deepspeed.__version__)
+        return (leg1 * leg1 + leg2 * leg2) ** 0.5
+
+    return pythagoras
+
+
+@contextmanager
+def _create_pytorch_training_test_file():
+    # TODO: swap with better training file like below:
+    # import deepspeed

Review Comment:
   should better remove commented codes. We can add them back when we actually need.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #42118: [SPARK-44264][PYTHON]E2E Testing for Deepspeed

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #42118:
URL: https://github.com/apache/spark/pull/42118#issuecomment-1664917362

   following tests are actually skipped:
   ```
   Skipped tests in pyspark.ml.deepspeed.tests.test_deepspeed_distributor with python3.9:
         test_pytorch_file_e2e (pyspark.ml.deepspeed.tests.test_deepspeed_distributor.DeepspeedDistributorLocalEndToEndTests) ... skip (0.001s)
         test_simple_function_e2e (pyspark.ml.deepspeed.tests.test_deepspeed_distributor.DeepspeedDistributorLocalEndToEndTests) ... skip (0.001s)
         test_pytorch_file_e2e (pyspark.ml.deepspeed.tests.test_deepspeed_distributor.DeepspeedTorchDistributorDistributedEndToEnd) ... skip (0.000s)
         test_simple_function_e2e (pyspark.ml.deepspeed.tests.test_deepspeed_distributor.DeepspeedTorchDistributorDistributedEndToEnd) ... skip (0.000s)
   ```
   
   due to the `deepspeed` was not installed.
   
   The OSS CI doesn't use `dev/requirements.txt` to install dependency, you may install it [here](https://github.com/apache/spark/blob/46491403bcc794a889103bc0a13329d06454227f/dev/infra/Dockerfile#L73-L75)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #42118: [SPARK-44264][PYTHON]E2E Testing for Deepspeed

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #42118: [SPARK-44264][PYTHON]E2E Testing for Deepspeed
URL: https://github.com/apache/spark/pull/42118


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mathewjacob1002 commented on a diff in pull request #42118: [SPARK-44264][WIP]E2E Testing for Deepspeed

Posted by "mathewjacob1002 (via GitHub)" <gi...@apache.org>.
mathewjacob1002 commented on code in PR #42118:
URL: https://github.com/apache/spark/pull/42118#discussion_r1274228543


##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -163,6 +168,93 @@ def test_create_torchrun_command_distributed(self) -> None:
             )
             self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args)
 
+def _create_basic_function():
+   def pythagoras(leg1: float, leg2: float):
+       import deepspeed
+       return (leg1 * leg1 + leg2 * leg2)**0.5
+   return pythagoras 
+
+from pyspark.ml.torch.tests.test_distributor import get_distributed_mode_conf
+
+class DeepspeedTorchDistributorDistributedEndToEnd(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = set_up_test_dirs()
+        print(f"Distributed e2e fname: {cls.gpu_discovery_script_file_name}")
+        conf = SparkConf(loadDefaults=False)
+        print("BEFORE setting anything: ", conf.getAll(), " |||||||||||\n")
+        for k, v in get_distributed_mode_conf().items():
+            if k is "spark.driver.resource.gpu.discoveryScript":
+                raise RuntimeError("Found trying to set the driver discovery script")
+            conf = conf.set(k, v)
+        conf = conf.set(
+            "spark.worker.resource.gpu.discoveryScript", cls.gpu_discovery_script_file_name
+        )
+        print("Distributed babby:", conf.getAll())
+        sc = SparkContext("local-cluster[2,2,512]",cls.__name__,conf=conf)

Review Comment:
   This is the same as the TorchDistributor tests (https://github.com/apache/spark/blob/master/python/pyspark/ml/torch/tests/test_distributor.py). It's just to set up the env so that Spark doesn't complain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mathewjacob1002 commented on a diff in pull request #42118: [SPARK-44264][WIP]E2E Testing for Deepspeed

Posted by "mathewjacob1002 (via GitHub)" <gi...@apache.org>.
mathewjacob1002 commented on code in PR #42118:
URL: https://github.com/apache/spark/pull/42118#discussion_r1274228048


##########
python/pyspark/ml/deepspeed/deepspeed_distributor.py:
##########
@@ -127,6 +151,10 @@ def _run_training_on_pytorch_file(
         training_command = DeepspeedTorchDistributor._create_torchrun_command(
             input_params, train_path, *args
         )
+        # Spark CI doesn't have GPUs (primary use case for deepspeed); 
+        # Just make sure everything else would work
+        if TorchDistributor._E2E_MOCK:
+            return training_command

Review Comment:
   old code before figuring out stuff with the SparkConf - will clean up now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mathewjacob1002 commented on pull request #42118: E2E Testing for Deepspeed

Posted by "mathewjacob1002 (via GitHub)" <gi...@apache.org>.
mathewjacob1002 commented on PR #42118:
URL: https://github.com/apache/spark/pull/42118#issuecomment-1647154195

   Note to self: The following error occurs from the distributed e2e test: Resource script <temp_file_name> to discover gpu doesn't exist!. After commenting out the code, it is involving the Local and Distributed E2E environments I think. Potentially a problem with the setup/teardown of the classes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42118: [SPARK-44264][PYTHON]E2E Testing for Deepspeed

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42118:
URL: https://github.com/apache/spark/pull/42118#issuecomment-1664932454

   @mathewjacob1002 and @maddiedawson can you follow up ^ please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maddiedawson commented on a diff in pull request #42118: [SPARK-44264][WIP]E2E Testing for Deepspeed

Posted by "maddiedawson (via GitHub)" <gi...@apache.org>.
maddiedawson commented on code in PR #42118:
URL: https://github.com/apache/spark/pull/42118#discussion_r1275312626


##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -14,13 +14,21 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from contextlib import contextmanager
 import os
 import sys
+import textwrap
 from typing import Any, Tuple, Dict
+
+import shutil
 import unittest
 
+from pyspark import SparkConf, SparkContext
 from pyspark.ml.deepspeed.deepspeed_distributor import DeepspeedTorchDistributor
-
+from pyspark.ml.torch.distributor import TorchDistributor

Review Comment:
   Remove this? The TorchDistributor itself doesn't seem to be used



##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -14,13 +14,21 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from contextlib import contextmanager
 import os
 import sys
+import textwrap
 from typing import Any, Tuple, Dict
+
+import shutil
 import unittest
 
+from pyspark import SparkConf, SparkContext
 from pyspark.ml.deepspeed.deepspeed_distributor import DeepspeedTorchDistributor
-
+from pyspark.ml.torch.distributor import TorchDistributor
+from pyspark.sql import SparkSession
+from pyspark.testing.utils import SPARK_HOME

Review Comment:
   Remove, unused



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maddiedawson commented on a diff in pull request #42118: [SPARK-44264][WIP]E2E Testing for Deepspeed

Posted by "maddiedawson (via GitHub)" <gi...@apache.org>.
maddiedawson commented on code in PR #42118:
URL: https://github.com/apache/spark/pull/42118#discussion_r1274222866


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -609,7 +609,7 @@ def _run_local_training(
                     del os.environ[CUDA_VISIBLE_DEVICES]
 
         return output
-
+    

Review Comment:
   Undo this change



##########
python/pyspark/ml/deepspeed/deepspeed_distributor.py:
##########
@@ -61,6 +61,30 @@ def __init__(
             The configuration file to be used for launching the deepspeed application.
             If it's a dictionary containing the parameters, then we will create the file.
             If None, deepspeed will fall back to default parameters.
+
+        Examples
+        --------
+        Run Deepspeed training function on a single node
+
+        >>> def train(learning_rate):
+            import deepspeed
+            # rest of training function
+            return model
+        >>> distributor = DeepspeedTorchDistributor(num_gpus=4,
+                                                    nnodes=1,
+                                                    use_gpu=True,
+                                                    local_mode=True,
+                                                    deepspeed_config="path/to/config.json")
+        >>> output = distributor.run(train, 0.01)
+
+        Run Deepspeed training function on multiple nodes
+
+        >>> distributor = DeepspeedTorchDistributor(num_gpus=4,
+                                                    nnodes=3,
+                                                    use_gpu=True,
+                                                    local_mode=False,
+                                                    deepspeed_config="path/to/config.json")
+        >>> output = distributor.run(train, 0.01)

Review Comment:
   Remove this change since it's in the other PR



##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -163,6 +168,93 @@ def test_create_torchrun_command_distributed(self) -> None:
             )
             self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args)
 
+def _create_basic_function():
+   def pythagoras(leg1: float, leg2: float):
+       import deepspeed
+       return (leg1 * leg1 + leg2 * leg2)**0.5
+   return pythagoras 
+
+from pyspark.ml.torch.tests.test_distributor import get_distributed_mode_conf

Review Comment:
   Put this with the other imports



##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -163,6 +168,93 @@ def test_create_torchrun_command_distributed(self) -> None:
             )
             self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args)
 
+def _create_basic_function():
+   def pythagoras(leg1: float, leg2: float):
+       import deepspeed
+       return (leg1 * leg1 + leg2 * leg2)**0.5
+   return pythagoras 
+
+from pyspark.ml.torch.tests.test_distributor import get_distributed_mode_conf
+
+class DeepspeedTorchDistributorDistributedEndToEnd(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = set_up_test_dirs()
+        print(f"Distributed e2e fname: {cls.gpu_discovery_script_file_name}")
+        conf = SparkConf(loadDefaults=False)
+        print("BEFORE setting anything: ", conf.getAll(), " |||||||||||\n")
+        for k, v in get_distributed_mode_conf().items():
+            if k is "spark.driver.resource.gpu.discoveryScript":
+                raise RuntimeError("Found trying to set the driver discovery script")

Review Comment:
   Do we need this?



##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -18,9 +18,14 @@
 import sys
 from typing import Any, Tuple, Dict
 import unittest
+import shutil

Review Comment:
   Alphabetize these



##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -163,6 +168,93 @@ def test_create_torchrun_command_distributed(self) -> None:
             )
             self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args)
 
+def _create_basic_function():
+   def pythagoras(leg1: float, leg2: float):
+       import deepspeed
+       return (leg1 * leg1 + leg2 * leg2)**0.5
+   return pythagoras 
+
+from pyspark.ml.torch.tests.test_distributor import get_distributed_mode_conf
+
+class DeepspeedTorchDistributorDistributedEndToEnd(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = set_up_test_dirs()
+        print(f"Distributed e2e fname: {cls.gpu_discovery_script_file_name}")

Review Comment:
   Let's remove the print statements



##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -163,6 +168,93 @@ def test_create_torchrun_command_distributed(self) -> None:
             )
             self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args)
 
+def _create_basic_function():
+   def pythagoras(leg1: float, leg2: float):
+       import deepspeed
+       return (leg1 * leg1 + leg2 * leg2)**0.5
+   return pythagoras 
+
+from pyspark.ml.torch.tests.test_distributor import get_distributed_mode_conf
+
+class DeepspeedTorchDistributorDistributedEndToEnd(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = set_up_test_dirs()
+        print(f"Distributed e2e fname: {cls.gpu_discovery_script_file_name}")
+        conf = SparkConf(loadDefaults=False)
+        print("BEFORE setting anything: ", conf.getAll(), " |||||||||||\n")
+        for k, v in get_distributed_mode_conf().items():
+            if k is "spark.driver.resource.gpu.discoveryScript":
+                raise RuntimeError("Found trying to set the driver discovery script")
+            conf = conf.set(k, v)
+        conf = conf.set(
+            "spark.worker.resource.gpu.discoveryScript", cls.gpu_discovery_script_file_name
+        )
+        print("Distributed babby:", conf.getAll())
+        sc = SparkContext("local-cluster[2,2,512]",cls.__name__,conf=conf)

Review Comment:
   How did you choose 2,2,512?



##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -918,6 +918,13 @@ def _create_save_dir(root_dir: Optional[str] = None) -> str:
     @staticmethod
     def _cleanup_files(save_dir: str) -> None:
         shutil.rmtree(save_dir, ignore_errors=True)
+    
+    @staticmethod
+    @contextmanager
+    def _setup_e2e_mocking_env():
+        TorchDistributor._E2E_MOCK = True
+        yield
+        TorchDistributor._E2E_MOCK = False

Review Comment:
   How is this used?



##########
python/pyspark/ml/deepspeed/deepspeed_distributor.py:
##########
@@ -127,6 +151,10 @@ def _run_training_on_pytorch_file(
         training_command = DeepspeedTorchDistributor._create_torchrun_command(
             input_params, train_path, *args
         )
+        # Spark CI doesn't have GPUs (primary use case for deepspeed); 
+        # Just make sure everything else would work
+        if TorchDistributor._E2E_MOCK:
+            return training_command

Review Comment:
   Why does this need to return anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mathewjacob1002 commented on pull request #42118: [SPARK-44264][WIP]E2E Testing for Deepspeed

Posted by "mathewjacob1002 (via GitHub)" <gi...@apache.org>.
mathewjacob1002 commented on PR #42118:
URL: https://github.com/apache/spark/pull/42118#issuecomment-1652310112

   cc @jeffra in case you wanted to take a look :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mathewjacob1002 commented on a diff in pull request #42118: [SPARK-44264][WIP]E2E Testing for Deepspeed

Posted by "mathewjacob1002 (via GitHub)" <gi...@apache.org>.
mathewjacob1002 commented on code in PR #42118:
URL: https://github.com/apache/spark/pull/42118#discussion_r1274227715


##########
python/pyspark/ml/torch/distributor.py:
##########
@@ -918,6 +918,13 @@ def _create_save_dir(root_dir: Optional[str] = None) -> str:
     @staticmethod
     def _cleanup_files(save_dir: str) -> None:
         shutil.rmtree(save_dir, ignore_errors=True)
+    
+    @staticmethod
+    @contextmanager
+    def _setup_e2e_mocking_env():
+        TorchDistributor._E2E_MOCK = True
+        yield
+        TorchDistributor._E2E_MOCK = False

Review Comment:
   this is old code - will remove now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42118: [SPARK-44264][PYTHON]E2E Testing for Deepspeed

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42118:
URL: https://github.com/apache/spark/pull/42118#issuecomment-1654787693

   Merged to master and branch-3.5.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maddiedawson commented on a diff in pull request #42118: [SPARK-44264][WIP]E2E Testing for Deepspeed

Posted by "maddiedawson (via GitHub)" <gi...@apache.org>.
maddiedawson commented on code in PR #42118:
URL: https://github.com/apache/spark/pull/42118#discussion_r1274247324


##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -17,10 +17,16 @@
 import os
 import sys
 from typing import Any, Tuple, Dict
+

Review Comment:
   Remove extra newline and alphabetize this



##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -163,6 +169,84 @@ def test_create_torchrun_command_distributed(self) -> None:
             )
             self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args)
 
+def _create_basic_function():
+   def pythagoras(leg1: float, leg2: float):
+       import deepspeed
+       return (leg1 * leg1 + leg2 * leg2)**0.5
+   return pythagoras 
+
+
+class DeepspeedTorchDistributorDistributedEndToEnd(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = set_up_test_dirs()
+        conf = SparkConf(loadDefaults=False)

Review Comment:
   Add a comment explaining why we use loadDefaults=False



##########
python/test_support/test_deepspeed_training_file.py:
##########
@@ -0,0 +1,10 @@
+import sys
+
+def pythagorean_thm(x : int, y: int):
+    import deepspeed
+    return (x*x + y*y)**0.5
+
+def main():

Review Comment:
   Don't need the main() function, you can just call print(pythagorean_thm(...))



##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -163,6 +169,84 @@ def test_create_torchrun_command_distributed(self) -> None:
             )
             self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args)
 
+def _create_basic_function():
+   def pythagoras(leg1: float, leg2: float):
+       import deepspeed
+       return (leg1 * leg1 + leg2 * leg2)**0.5
+   return pythagoras 
+
+
+class DeepspeedTorchDistributorDistributedEndToEnd(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = set_up_test_dirs()
+        conf = SparkConf(loadDefaults=False)
+        for k, v in get_distributed_mode_conf().items():
+            conf = conf.set(k, v)
+        conf = conf.set(
+            "spark.worker.resource.gpu.discoveryScript", cls.gpu_discovery_script_file_name
+        )
+        sc = SparkContext("local-cluster[2,2,512]",cls.__name__,conf=conf)
+        cls.spark = SparkSession(sc)
+
+    def test_simple_function_e2e(self):
+       train_fn = _create_basic_function()
+       # Arguments for the pythagoras function train_fn
+       x = 3
+       y = 4
+       dist = DeepspeedTorchDistributor(num_gpus=2, use_gpu=False, local_mode=False)
+       output = dist.run(train_fn, x, y)
+       self.assertEqual(output, 5)
+
+    def test_pytorch_file_e2e(self):
+        path_to_train_file = "python/test_support/test_deepspeed_training_file.py"

Review Comment:
   This will be an issue if someone runs the test from a different directory. Instead of committing python/test_support/test_deepspeed_training_file.py, let's just write out its contents to the /tmp file in the test setup or test case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] lu-wang-dl commented on a diff in pull request #42118: [SPARK-44264][WIP]E2E Testing for Deepspeed

Posted by "lu-wang-dl (via GitHub)" <gi...@apache.org>.
lu-wang-dl commented on code in PR #42118:
URL: https://github.com/apache/spark/pull/42118#discussion_r1275358299


##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -163,6 +168,173 @@ def test_create_torchrun_command_distributed(self) -> None:
             )
             self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args)
 
+def _create_basic_function():
+    # TODO: swap out with better test function
+    # once Deepspeed better supports CPU
+   def pythagoras(leg1: float, leg2: float):
+       import deepspeed
+       return (leg1 * leg1 + leg2 * leg2)**0.5
+   return pythagoras 
+
+
+@contextmanager
+def _create_pytorch_training_test_file():
+    # TODO: swap with better training file like below:
+    #import deepspeed

Review Comment:
   Could we clean up this code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maddiedawson commented on a diff in pull request #42118: [SPARK-44264][WIP]E2E Testing for Deepspeed

Posted by "maddiedawson (via GitHub)" <gi...@apache.org>.
maddiedawson commented on code in PR #42118:
URL: https://github.com/apache/spark/pull/42118#discussion_r1274305354


##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -163,6 +169,105 @@ def test_create_torchrun_command_distributed(self) -> None:
             )
             self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args)
 
+def _create_basic_function():
+    # TODO: swap out with better test function
+    # once Deepspeed better supports CPU
+   def pythagoras(leg1: float, leg2: float):
+       import deepspeed
+       return (leg1 * leg1 + leg2 * leg2)**0.5
+   return pythagoras 
+
+# The program and function that we use in the end-to-end tests
+# is very simple because in the Spark CI we only have access
+# to CPUs and at this point in time, CPU support is limited
+# in Deepspeed. Once Deepspeed better supports CPU training
+# and inference, the hope is to switch out the training 
+# and file for the tests with more realistic testing 
+# that use Deepspeed constructs. 
+class DeepspeedTorchDistributorDistributedEndToEnd(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = set_up_test_dirs()
+        # This is set to False because if not, the SparkConf will 
+        # use contain configurations from the LocalEndToEnd test,
+        # which causes the test to break.
+        conf = SparkConf(loadDefaults=False)
+        for k, v in get_distributed_mode_conf().items():
+            conf = conf.set(k, v)
+        conf = conf.set(
+            "spark.worker.resource.gpu.discoveryScript", cls.gpu_discovery_script_file_name
+        )
+        sc = SparkContext("local-cluster[2,2,512]",cls.__name__,conf=conf)
+        cls.spark = SparkSession(sc)
+
+    def test_simple_function_e2e(self):
+       train_fn = _create_basic_function()
+       # Arguments for the pythagoras function train_fn
+       x = 3
+       y = 4
+       dist = DeepspeedTorchDistributor(num_gpus=2, use_gpu=False, local_mode=False)
+       output = dist.run(train_fn, x, y)
+       self.assertEqual(output, 5)
+
+    def test_pytorch_file_e2e(self):
+        import textwrap
+        # TODO: change to better test script
+        # once Deepspeed CPU support is better
+        str_to_write = textwrap.dedent(""" 
+import sys
+def pythagorean_thm(x : int, y: int):
+    import deepspeed
+    return (x*x + y*y)**0.5
+print(pythagorean_thm(int(sys.argv[1]), int(sys.argv[2])))
+""")
+        cp_path = f"/tmp/test_deepspeed_training_file.py"
+        with open(cp_path, "w") as f:
+            f.write(str_to_write)
+        dist = DeepspeedTorchDistributor(num_gpus=True, use_gpu=False, local_mode=False)
+        dist.run(cp_path, 2, 5)
+        os.remove(cp_path)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.mnist_dir_path)
+        os.unlink(cls.gpu_discovery_script_file_name)
+        cls.spark.stop()
+
+class DeepspeedDistributorLocalEndToEndTests(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.gpu_discovery_script_file_name, cls.mnist_dir_path = set_up_test_dirs()
+        conf = SparkConf()
+        for k, v in get_local_mode_conf().items():
+            conf = conf.set(k, v)
+        conf = conf.set(
+            "spark.driver.resource.gpu.discoveryScript", cls.gpu_discovery_script_file_name
+        )
+        sc = SparkContext("local-cluster[2,2,512]",cls.__name__,conf=conf)
+        cls.spark = SparkSession(sc)
+    
+    def test_simple_function_e2e(self):
+       train_fn = _create_basic_function()
+       # Arguments for the pythagoras function train_fn
+       x = 3
+       y = 4
+       dist = DeepspeedTorchDistributor(num_gpus=2, use_gpu=False, local_mode=True)
+       output = dist.run(train_fn, x, y)
+       self.assertEqual(output, 5)
+
+
+    def test_pytorch_file_e2e(self):
+        path_to_train_file = "python/test_support/test_deepspeed_training_file.py"
+        dist = DeepspeedTorchDistributor(num_gpus=2, use_gpu=False, local_mode=True)
+        dist.run(path_to_train_file, 2, 5)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.mnist_dir_path)
+        os.unlink(cls.gpu_discovery_script_file_name)
+        cls.spark.stop()

Review Comment:
   Both here and above, tearDownClass should appear next to setUpClass



##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -163,6 +169,105 @@ def test_create_torchrun_command_distributed(self) -> None:
             )
             self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args)
 
+def _create_basic_function():
+    # TODO: swap out with better test function
+    # once Deepspeed better supports CPU
+   def pythagoras(leg1: float, leg2: float):
+       import deepspeed
+       return (leg1 * leg1 + leg2 * leg2)**0.5
+   return pythagoras 
+
+# The program and function that we use in the end-to-end tests
+# is very simple because in the Spark CI we only have access
+# to CPUs and at this point in time, CPU support is limited
+# in Deepspeed. Once Deepspeed better supports CPU training
+# and inference, the hope is to switch out the training 
+# and file for the tests with more realistic testing 
+# that use Deepspeed constructs. 
+class DeepspeedTorchDistributorDistributedEndToEnd(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = set_up_test_dirs()
+        # This is set to False because if not, the SparkConf will 
+        # use contain configurations from the LocalEndToEnd test,
+        # which causes the test to break.
+        conf = SparkConf(loadDefaults=False)
+        for k, v in get_distributed_mode_conf().items():
+            conf = conf.set(k, v)
+        conf = conf.set(
+            "spark.worker.resource.gpu.discoveryScript", cls.gpu_discovery_script_file_name
+        )
+        sc = SparkContext("local-cluster[2,2,512]",cls.__name__,conf=conf)
+        cls.spark = SparkSession(sc)
+
+    def test_simple_function_e2e(self):
+       train_fn = _create_basic_function()
+       # Arguments for the pythagoras function train_fn
+       x = 3
+       y = 4
+       dist = DeepspeedTorchDistributor(num_gpus=2, use_gpu=False, local_mode=False)
+       output = dist.run(train_fn, x, y)
+       self.assertEqual(output, 5)
+
+    def test_pytorch_file_e2e(self):
+        import textwrap
+        # TODO: change to better test script
+        # once Deepspeed CPU support is better
+        str_to_write = textwrap.dedent(""" 
+import sys
+def pythagorean_thm(x : int, y: int):
+    import deepspeed
+    return (x*x + y*y)**0.5
+print(pythagorean_thm(int(sys.argv[1]), int(sys.argv[2])))
+""")
+        cp_path = f"/tmp/test_deepspeed_training_file.py"
+        with open(cp_path, "w") as f:
+            f.write(str_to_write)
+        dist = DeepspeedTorchDistributor(num_gpus=True, use_gpu=False, local_mode=False)
+        dist.run(cp_path, 2, 5)
+        os.remove(cp_path)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.mnist_dir_path)
+        os.unlink(cls.gpu_discovery_script_file_name)
+        cls.spark.stop()
+
+class DeepspeedDistributorLocalEndToEndTests(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.gpu_discovery_script_file_name, cls.mnist_dir_path = set_up_test_dirs()
+        conf = SparkConf()
+        for k, v in get_local_mode_conf().items():
+            conf = conf.set(k, v)
+        conf = conf.set(
+            "spark.driver.resource.gpu.discoveryScript", cls.gpu_discovery_script_file_name
+        )
+        sc = SparkContext("local-cluster[2,2,512]",cls.__name__,conf=conf)
+        cls.spark = SparkSession(sc)
+    
+    def test_simple_function_e2e(self):
+       train_fn = _create_basic_function()
+       # Arguments for the pythagoras function train_fn
+       x = 3
+       y = 4
+       dist = DeepspeedTorchDistributor(num_gpus=2, use_gpu=False, local_mode=True)
+       output = dist.run(train_fn, x, y)
+       self.assertEqual(output, 5)
+
+
+    def test_pytorch_file_e2e(self):
+        path_to_train_file = "python/test_support/test_deepspeed_training_file.py"

Review Comment:
   This needs to be updated



##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -163,6 +169,84 @@ def test_create_torchrun_command_distributed(self) -> None:
             )
             self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args)
 
+def _create_basic_function():
+   def pythagoras(leg1: float, leg2: float):
+       import deepspeed
+       return (leg1 * leg1 + leg2 * leg2)**0.5
+   return pythagoras 
+
+
+class DeepspeedTorchDistributorDistributedEndToEnd(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        (cls.gpu_discovery_script_file_name, cls.mnist_dir_path) = set_up_test_dirs()
+        conf = SparkConf(loadDefaults=False)

Review Comment:
   "This is set to False" -> "loadDefaults is set to False"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42118: [SPARK-44264][PYTHON]E2E Testing for Deepspeed

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42118:
URL: https://github.com/apache/spark/pull/42118#discussion_r1275790827


##########
python/pyspark/ml/deepspeed/tests/test_deepspeed_distributor.py:
##########
@@ -164,6 +174,178 @@ def test_create_torchrun_command_distributed(self) -> None:
             self.assertEqual(distributed_cmd_args_expected, distributed_command_with_args)
 
 
+def _create_basic_function() -> Callable:
+    # TODO: swap out with better test function
+    # once Deepspeed better supports CPU
+    def pythagoras(leg1: float, leg2: float) -> float:
+        import deepspeed
+
+        print(deepspeed.__version__)
+        return (leg1 * leg1 + leg2 * leg2) ** 0.5
+
+    return pythagoras
+
+
+@contextmanager
+def _create_pytorch_training_test_file():
+    # TODO: swap with better training file like below:
+    # import deepspeed

Review Comment:
   should better move commented codes. We can add them back when we actually need.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org