You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/09/26 23:04:37 UTC
[1/4] git commit: Construct shell commands as sequences for safety
and composability
Updated Branches:
refs/heads/master c514cd158 -> 76677b8fa
Construct shell commands as sequences for safety and composability
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6919a28d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6919a28d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6919a28d
Branch: refs/heads/master
Commit: 6919a28d51c416ff4bb647b03eae2070cf87f039
Parents: 1e15feb
Author: Jey Kottalam <je...@cs.berkeley.edu>
Authored: Fri May 17 17:10:47 2013 -0700
Committer: Jey Kottalam <je...@cs.berkeley.edu>
Committed: Fri Sep 6 14:28:26 2013 -0700
----------------------------------------------------------------------
ec2/spark_ec2.py | 45 ++++++++++++++++++++++++++++++++++-----------
1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6919a28d/ec2/spark_ec2.py
----------------------------------------------------------------------
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 932e70d..75dd0ff 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -23,6 +23,7 @@ from __future__ import with_statement
import logging
import os
+import pipes
import random
import shutil
import subprocess
@@ -536,18 +537,41 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
dest.write(text)
dest.close()
# rsync the whole directory over to the master machine
- command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " +
- "'%s/' '%s@%s:/'") % (opts.identity_file, tmp_dir, opts.user, active_master))
- subprocess.check_call(command, shell=True)
+ command = [
+ 'rsync', '-rv',
+ '-e', stringify_command(ssh_command(opts)),
+ "%s/" % tmp_dir,
+ "%s@%s:/" % (opts.user, active_master)
+ ]
+ subprocess.check_call(command)
# Remove the temp directory we created above
shutil.rmtree(tmp_dir)
+def stringify_command(parts):
+ if isinstance(parts, str):
+ return parts
+ else:
+ return ' '.join(map(pipes.quote, parts))
+
+
+def ssh_args(opts):
+ parts = ['-o', 'StrictHostKeyChecking=no', '-i', opts.identity_file]
+ return parts
+
+
+def ssh_command(opts):
+ return ['ssh'] + ssh_args(opts)
+
+
+def scp_command(opts):
+ return ['scp', '-q'] + ssh_args(opts)
+
+
# Copy a file to a given host through scp, throwing an exception if scp fails
def scp(host, opts, local_file, dest_file):
subprocess.check_call(
- "scp -q -o StrictHostKeyChecking=no -i %s '%s' '%s@%s:%s'" %
- (opts.identity_file, local_file, opts.user, host, dest_file), shell=True)
+ scp_command(opts) + [local_file, "%s@%s:%s" % (opts.user, host, dest_file)])
# Run a command on a host through ssh, retrying up to two times
@@ -557,8 +581,7 @@ def ssh(host, opts, command):
while True:
try:
return subprocess.check_call(
- "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" %
- (opts.identity_file, opts.user, host, command), shell=True)
+ ssh_command(opts) + ['-t', '%s@%s' % (opts.user, host), stringify_command(command)])
except subprocess.CalledProcessError as e:
if (tries > 2):
raise e
@@ -670,11 +693,11 @@ def main():
conn, opts, cluster_name)
master = master_nodes[0].public_dns_name
print "Logging into master " + master + "..."
- proxy_opt = ""
+ proxy_opt = []
if opts.proxy_port != None:
- proxy_opt = "-D " + opts.proxy_port
- subprocess.check_call("ssh -o StrictHostKeyChecking=no -i %s %s %s@%s" %
- (opts.identity_file, proxy_opt, opts.user, master), shell=True)
+ proxy_opt = ['-D', opts.proxy_port]
+ subprocess.check_call(
+ ssh_command(opts) + proxy_opt + ['-t', "%s@%s" % (opts.user, master)])
elif action == "get-master":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
[3/4] git commit: Clarify error messages on SSH failure
Posted by rx...@apache.org.
Clarify error messages on SSH failure
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e86d1d4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e86d1d4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e86d1d4a
Branch: refs/heads/master
Commit: e86d1d4a52147fe52feeda74ca3558f6bc109285
Parents: b98572c
Author: Jey Kottalam <je...@cs.berkeley.edu>
Authored: Wed Sep 11 14:59:42 2013 -0700
Committer: Jey Kottalam <je...@cs.berkeley.edu>
Committed: Wed Sep 11 14:59:42 2013 -0700
----------------------------------------------------------------------
ec2/spark_ec2.py | 27 +++++++++++++++++++++------
1 file changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e86d1d4a/ec2/spark_ec2.py
----------------------------------------------------------------------
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 0858b12..f4babba 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -37,6 +37,9 @@ import boto
from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
from boto import ec2
+class UsageError(Exception):
+ pass
+
# A URL prefix from which to fetch AMI information
AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list"
@@ -580,8 +583,12 @@ def ssh(host, opts, command):
ssh_command(opts) + ['-t', '%s@%s' % (opts.user, host), stringify_command(command)])
except subprocess.CalledProcessError as e:
if (tries > 2):
- raise e
- print "Error connecting to host, sleeping 30: {0}".format(e)
+ # If this was an ssh failure, provide the user with hints.
+ if e.returncode == 255:
+ raise UsageError("Failed to SSH to remote host {0}.\nPlease check that you have provided the correct --identity-file and --key-pair parameters and try again.".format(host))
+ else:
+ raise e
+ print >> stderr, "Error executing remote command, retrying after 30 seconds: {0}".format(e)
time.sleep(30)
tries = tries + 1
@@ -599,12 +606,13 @@ def ssh_write(host, opts, command, input):
stdin=subprocess.PIPE)
proc.stdin.write(input)
proc.stdin.close()
- if proc.wait() == 0:
+ status = proc.wait()
+ if status == 0:
break
elif (tries > 2):
- raise RuntimeError("ssh_write error %s" % proc.returncode)
+ raise RuntimeError("ssh_write failed with error %s" % proc.returncode)
else:
- print "Error connecting to host, sleeping 30"
+ print >> stderr, "Error {0} while executing remote command, retrying after 30 seconds".format(status)
time.sleep(30)
tries = tries + 1
@@ -626,7 +634,7 @@ def get_partition(total, num_partitions, current_partitions):
return num_slaves_this_zone
-def main():
+def real_main():
(opts, action, cluster_name) = parse_args()
try:
conn = ec2.connect_to_region(opts.region)
@@ -755,6 +763,13 @@ def main():
sys.exit(1)
+def main():
+ try:
+ real_main()
+ except UsageError, e:
+ print >> stderr, "\nError:\n", e
+
+
if __name__ == "__main__":
logging.basicConfig()
main()
[4/4] git commit: Merge pull request #670 from
jey/ec2-ssh-improvements
Posted by rx...@apache.org.
Merge pull request #670 from jey/ec2-ssh-improvements
EC2 SSH improvements
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/76677b8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/76677b8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/76677b8f
Branch: refs/heads/master
Commit: 76677b8fa173f39a1c4eee41d1aaacd37b3da7a3
Parents: c514cd1 e86d1d4
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Sep 26 14:03:46 2013 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Sep 26 14:03:46 2013 -0700
----------------------------------------------------------------------
ec2/spark_ec2.py | 106 +++++++++++++++++++++++++++++++++++++-------------
1 file changed, 80 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/76677b8f/ec2/spark_ec2.py
----------------------------------------------------------------------
[2/4] git commit: Generate new SSH key for the cluster,
make "--identity-file" optional
Posted by rx...@apache.org.
Generate new SSH key for the cluster, make "--identity-file" optional
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b98572c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b98572c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b98572c7
Branch: refs/heads/master
Commit: b98572c70ad3932381a55f23f82600d7e435d2eb
Parents: 6919a28
Author: Jey Kottalam <je...@cs.berkeley.edu>
Authored: Wed Jul 3 16:57:22 2013 -0700
Committer: Jey Kottalam <je...@cs.berkeley.edu>
Committed: Fri Sep 6 14:51:47 2013 -0700
----------------------------------------------------------------------
ec2/spark_ec2.py | 58 ++++++++++++++++++++++++++++++++-------------------
1 file changed, 37 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b98572c7/ec2/spark_ec2.py
----------------------------------------------------------------------
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 75dd0ff..0858b12 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -104,11 +104,7 @@ def parse_args():
parser.print_help()
sys.exit(1)
(action, cluster_name) = args
- if opts.identity_file == None and action in ['launch', 'login', 'start']:
- print >> stderr, ("ERROR: The -i or --identity-file argument is " +
- "required for " + action)
- sys.exit(1)
-
+
# Boto config check
# http://boto.cloudhackers.com/en/latest/boto_config_tut.html
home_dir = os.getenv('HOME')
@@ -392,10 +388,18 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
master = master_nodes[0].public_dns_name
if deploy_ssh_key:
- print "Copying SSH key %s to master..." % opts.identity_file
- ssh(master, opts, 'mkdir -p ~/.ssh')
- scp(master, opts, opts.identity_file, '~/.ssh/id_rsa')
- ssh(master, opts, 'chmod 600 ~/.ssh/id_rsa')
+ print "Generating cluster's SSH key on master..."
+ key_setup = """
+ [ -f ~/.ssh/id_rsa ] ||
+ (ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa &&
+ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys)
+ """
+ ssh(master, opts, key_setup)
+ dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
+ print "Transferring cluster's SSH key to slaves..."
+ for slave in slave_nodes:
+ print slave.public_dns_name
+ ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar)
modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs',
'mapreduce', 'spark-standalone']
@@ -556,7 +560,9 @@ def stringify_command(parts):
def ssh_args(opts):
- parts = ['-o', 'StrictHostKeyChecking=no', '-i', opts.identity_file]
+ parts = ['-o', 'StrictHostKeyChecking=no']
+ if opts.identity_file is not None:
+ parts += ['-i', opts.identity_file]
return parts
@@ -564,16 +570,6 @@ def ssh_command(opts):
return ['ssh'] + ssh_args(opts)
-def scp_command(opts):
- return ['scp', '-q'] + ssh_args(opts)
-
-
-# Copy a file to a given host through scp, throwing an exception if scp fails
-def scp(host, opts, local_file, dest_file):
- subprocess.check_call(
- scp_command(opts) + [local_file, "%s@%s:%s" % (opts.user, host, dest_file)])
-
-
# Run a command on a host through ssh, retrying up to two times
# and then throwing an exception if ssh continues to fail.
def ssh(host, opts, command):
@@ -585,13 +581,33 @@ def ssh(host, opts, command):
except subprocess.CalledProcessError as e:
if (tries > 2):
raise e
- print "Couldn't connect to host {0}, waiting 30 seconds".format(e)
+ print "Error connecting to host, sleeping 30: {0}".format(e)
time.sleep(30)
tries = tries + 1
+def ssh_read(host, opts, command):
+ return subprocess.check_output(
+ ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)])
+def ssh_write(host, opts, command, input):
+ tries = 0
+ while True:
+ proc = subprocess.Popen(
+ ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)],
+ stdin=subprocess.PIPE)
+ proc.stdin.write(input)
+ proc.stdin.close()
+ if proc.wait() == 0:
+ break
+ elif (tries > 2):
+ raise RuntimeError("ssh_write error %s" % proc.returncode)
+ else:
+ print "Error connecting to host, sleeping 30"
+ time.sleep(30)
+ tries = tries + 1
+
# Gets a list of zones to launch instances in
def get_zones(conn, opts):