You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "panchalhp-db (via GitHub)" <gi...@apache.org> on 2023/07/03 20:49:43 UTC

[GitHub] [spark] panchalhp-db commented on a diff in pull request #41801: [Spark Ticket Here]SSH Environment Manager

panchalhp-db commented on code in PR #41801:
URL: https://github.com/apache/spark/pull/41801#discussion_r1251269706


##########
python/pyspark/ml/torch/deepspeed/utils.py:
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os
+import subprocess
+import shutil
+from typing import (List)
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+class SSHEnvManager:
+    """Is responsible for writing to the known_hosts file, setting up authorized users"""
+    KNOWN_HOSTS = "/root/.ssh/known_hosts"
+    KNOWN_HOSTS_TEMP = "/root/.ssh/known_hosts_temp"
+
+    def __init__(self):
+        self.known_hosts_exists = os.path.exists(SSHEnvManager.KNOWN_HOSTS)
+        if self.known_hosts_exists:
+            shutil.copyfile(SSHEnvManager.KNOWN_HOSTS, SSHEnvManager.KNOWN_HOSTS_TEMP)
+
+    def create_ssh_key(self, ssh_key_path: str):
+        if os.path.exists(ssh_key_path):
+            print(f"{ssh_key_path} already exists")
+        else:
+            print(f"Creating the ssh key to {ssh_key_path}")
+            # the empty string at the end of this command is used to provide an empty passphrase
+            cmd_status = subprocess.run(["ssh-keygen", "-t", "rsa", "-f", ssh_key_path, "-q", "-N", ""], capture_output=True)
+            if cmd_status.returncode != 0:
+                raise RuntimeError(f"Was unabled to create ssh-key to {ssh_key_path}\n. Output: {cmd_status.stdout.decode('utf-8')}")
+
+    def get_ssh_key(self, ssh_pub_key: str):
+        with open(ssh_pub_key) as f:
+            ssh_key = f.read()
+        return ssh_key
+
+    def ssh_keyscan(self, ip_list: List[str]):
+        """ Runs the ssh-keyscan on each IP in the ip_list and then writes the public key of that IP to the known_hosts file in ssh"""
+        # if there is a known_hosts file, we need to preserve old one as we modify it
+        # otherwise, just write to it
+        print("Trying to add the worker node public ssh keys to the ssh known_hosts file")

Review Comment:
   Should this be using a logging library rather than print? logger.info(..) or something?



##########
python/pyspark/ml/torch/deepspeed/utils.py:
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os
+import subprocess
+import shutil
+from typing import (List)
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+class SSHEnvManager:
+    """Is responsible for writing to the known_hosts file, setting up authorized users"""
+    KNOWN_HOSTS = "/root/.ssh/known_hosts"
+    KNOWN_HOSTS_TEMP = "/root/.ssh/known_hosts_temp"
+
+    def __init__(self):
+        self.known_hosts_exists = os.path.exists(SSHEnvManager.KNOWN_HOSTS)
+        if self.known_hosts_exists:
+            shutil.copyfile(SSHEnvManager.KNOWN_HOSTS, SSHEnvManager.KNOWN_HOSTS_TEMP)
+
+    def create_ssh_key(self, ssh_key_path: str):
+        if os.path.exists(ssh_key_path):
+            print(f"{ssh_key_path} already exists")
+        else:
+            print(f"Creating the ssh key to {ssh_key_path}")
+            # the empty string at the end of this command is used to provide an empty passphrase
+            cmd_status = subprocess.run(["ssh-keygen", "-t", "rsa", "-f", ssh_key_path, "-q", "-N", ""], capture_output=True)
+            if cmd_status.returncode != 0:
+                raise RuntimeError(f"Was unabled to create ssh-key to {ssh_key_path}\n. Output: {cmd_status.stdout.decode('utf-8')}")

Review Comment:
   nit:
   ```suggestion
                   raise RuntimeError(f"Unable to create ssh-key to {ssh_key_path}\n. Output: {cmd_status.stdout.decode('utf-8')}")
   ```



##########
python/pyspark/ml/torch/deepspeed/utils.py:
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os
+import subprocess
+import shutil
+from typing import (List)
+def write_to_location(location: str, content: str) -> None:

Review Comment:
   Why is this a separate function? Can we just make it a staticmethod of the class below?



##########
python/pyspark/ml/torch/deepspeed/utils.py:
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os
+import subprocess
+import shutil
+from typing import (List)
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+class SSHEnvManager:

Review Comment:
   Is there a chance that there are two instances of this class executing at the same time? What if there already exists a tmp file?



##########
python/pyspark/ml/torch/deepspeed/utils.py:
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os
+import subprocess
+import shutil
+from typing import (List)
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+class SSHEnvManager:
+    """Is responsible for writing to the known_hosts file, setting up authorized users"""
+    KNOWN_HOSTS = "/root/.ssh/known_hosts"
+    KNOWN_HOSTS_TEMP = "/root/.ssh/known_hosts_temp"
+
+    def __init__(self):
+        self.known_hosts_exists = os.path.exists(SSHEnvManager.KNOWN_HOSTS)
+        if self.known_hosts_exists:
+            shutil.copyfile(SSHEnvManager.KNOWN_HOSTS, SSHEnvManager.KNOWN_HOSTS_TEMP)
+
+    def create_ssh_key(self, ssh_key_path: str):
+        if os.path.exists(ssh_key_path):
+            print(f"{ssh_key_path} already exists")
+        else:
+            print(f"Creating the ssh key to {ssh_key_path}")
+            # the empty string at the end of this command is used to provide an empty passphrase
+            cmd_status = subprocess.run(["ssh-keygen", "-t", "rsa", "-f", ssh_key_path, "-q", "-N", ""], capture_output=True)
+            if cmd_status.returncode != 0:
+                raise RuntimeError(f"Was unabled to create ssh-key to {ssh_key_path}\n. Output: {cmd_status.stdout.decode('utf-8')}")
+
+    def get_ssh_key(self, ssh_pub_key: str):
+        with open(ssh_pub_key) as f:
+            ssh_key = f.read()
+        return ssh_key
+
+    def ssh_keyscan(self, ip_list: List[str]):
+        """ Runs the ssh-keyscan on each IP in the ip_list and then writes the public key of that IP to the known_hosts file in ssh"""
+        # if there is a known_hosts file, we need to preserve old one as we modify it
+        # otherwise, just write to it
+        print("Trying to add the worker node public ssh keys to the ssh known_hosts file")
+        for ip in ip_list:
+            cmd_args = ["ssh-keyscan", ip]
+            error_code = subprocess.run(cmd_args, capture_output=True)

Review Comment:
   nit: should we call this output rather than error code?



##########
python/pyspark/ml/torch/deepspeed/utils.py:
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os
+import subprocess
+import shutil
+from typing import (List)
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+class SSHEnvManager:
+    """Is responsible for writing to the known_hosts file, setting up authorized users"""
+    KNOWN_HOSTS = "/root/.ssh/known_hosts"
+    KNOWN_HOSTS_TEMP = "/root/.ssh/known_hosts_temp"
+
+    def __init__(self):
+        self.known_hosts_exists = os.path.exists(SSHEnvManager.KNOWN_HOSTS)
+        if self.known_hosts_exists:
+            shutil.copyfile(SSHEnvManager.KNOWN_HOSTS, SSHEnvManager.KNOWN_HOSTS_TEMP)
+
+    def create_ssh_key(self, ssh_key_path: str):
+        if os.path.exists(ssh_key_path):
+            print(f"{ssh_key_path} already exists")
+        else:
+            print(f"Creating the ssh key to {ssh_key_path}")
+            # the empty string at the end of this command is used to provide an empty passphrase
+            cmd_status = subprocess.run(["ssh-keygen", "-t", "rsa", "-f", ssh_key_path, "-q", "-N", ""], capture_output=True)
+            if cmd_status.returncode != 0:
+                raise RuntimeError(f"Was unabled to create ssh-key to {ssh_key_path}\n. Output: {cmd_status.stdout.decode('utf-8')}")
+
+    def get_ssh_key(self, ssh_pub_key: str):
+        with open(ssh_pub_key) as f:
+            ssh_key = f.read()
+        return ssh_key
+
+    def ssh_keyscan(self, ip_list: List[str]):
+        """ Runs the ssh-keyscan on each IP in the ip_list and then writes the public key of that IP to the known_hosts file in ssh"""
+        # if there is a known_hosts file, we need to preserve old one as we modify it
+        # otherwise, just write to it
+        print("Trying to add the worker node public ssh keys to the ssh known_hosts file")
+        for ip in ip_list:
+            cmd_args = ["ssh-keyscan", ip]
+            error_code = subprocess.run(cmd_args, capture_output=True)
+            if error_code.returncode != 0:
+                raise RuntimeError(f"Something went wrong when running ssh_keyscan {ip}. Command tried to run: ", cmd_args)
+            cmd_output = error_code.stdout.decode('utf-8') # get the output from the command so we can write to right location
+            write_to_location(SSHEnvManager.KNOWN_HOSTS, cmd_output)
+        print("Successfully finished writing worker ssh public keys to known_hosts on driver")
+
+    def cleanup_ssh_env(self):

Review Comment:
   What happens if the cleanup fails? Will not restoring the known hosts cause an issue?



##########
python/pyspark/ml/torch/deepspeed/utils.py:
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os
+import subprocess
+import shutil
+from typing import (List)
+def write_to_location(location: str, content: str) -> None:
+    os.makedirs(os.path.dirname(location), exist_ok=True)
+    with open(location, "a") as f:
+        f.write(content)
+
+class SSHEnvManager:
+    """Is responsible for writing to the known_hosts file, setting up authorized users"""

Review Comment:
   Does this write to known host files on all the machines (driver + worker) or only driver? Can we add details on that in the comments here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org