You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/10/08 01:54:40 UTC

git commit: [SPARK-3398] [EC2] Have spark-ec2 intelligently wait for specific cluster states

Repository: spark
Updated Branches:
  refs/heads/master b32bb72e8 -> 5912ca671


[SPARK-3398] [EC2] Have spark-ec2 intelligently wait for specific cluster states

Instead of waiting arbitrary amounts of time for the cluster to reach a specific state, this patch lets `spark-ec2` explicitly wait for a cluster to reach a desired state.

This is useful in a couple of situations:
* The cluster is launching and you want to wait until SSH is available before installing stuff.
* The cluster is being terminated and you want to wait until all the instances are terminated before trying to delete security groups.

This patch removes the need for the `--wait` option and removes some of the time-based retry logic that was being used.

Author: Nicholas Chammas <ni...@gmail.com>

Closes #2339 from nchammas/spark-ec2-wait-properly and squashes the following commits:

43a69f0 [Nicholas Chammas] short-circuit SSH check; linear backoff
9a9e035 [Nicholas Chammas] remove extraneous comment
26c5ed0 [Nicholas Chammas] replace print with write()
bb67c06 [Nicholas Chammas] deprecate wait option; remove dead code
7969265 [Nicholas Chammas] fix long line (PEP 8)
126e4cf [Nicholas Chammas] wait for specific cluster states


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5912ca67
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5912ca67
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5912ca67

Branch: refs/heads/master
Commit: 5912ca67140eed5dea66745aa3af4febdbd80781
Parents: b32bb72
Author: Nicholas Chammas <ni...@gmail.com>
Authored: Tue Oct 7 16:54:32 2014 -0700
Committer: Josh Rosen <jo...@apache.org>
Committed: Tue Oct 7 16:54:32 2014 -0700

----------------------------------------------------------------------
 ec2/spark_ec2.py | 111 ++++++++++++++++++++++++++++++++++++++------------
 1 file changed, 86 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5912ca67/ec2/spark_ec2.py
----------------------------------------------------------------------
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 941dfb9..27f468e 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -32,6 +32,7 @@ import sys
 import tempfile
 import time
 import urllib2
+import warnings
 from optparse import OptionParser
 from sys import stderr
 import boto
@@ -61,8 +62,8 @@ def parse_args():
         "-s", "--slaves", type="int", default=1,
         help="Number of slaves to launch (default: %default)")
     parser.add_option(
-        "-w", "--wait", type="int", default=120,
-        help="Seconds to wait for nodes to start (default: %default)")
+        "-w", "--wait", type="int",
+        help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start")
     parser.add_option(
         "-k", "--key-pair",
         help="Key pair to use on instances")
@@ -195,18 +196,6 @@ def get_or_make_group(conn, name):
         return conn.create_security_group(name, "Spark EC2 group")
 
 
-# Wait for a set of launched instances to exit the "pending" state
-# (i.e. either to start running or to fail and be terminated)
-def wait_for_instances(conn, instances):
-    while True:
-        for i in instances:
-            i.update()
-        if len([i for i in instances if i.state == 'pending']) > 0:
-            time.sleep(5)
-        else:
-            return
-
-
 # Check whether a given EC2 instance object is in a state we consider active,
 # i.e. not terminating or terminated. We count both stopping and stopped as
 # active since we can restart stopped clusters.
@@ -619,14 +608,64 @@ def setup_spark_cluster(master, opts):
         print "Ganglia started at http://%s:5080/ganglia" % master
 
 
-# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
-def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes):
-    print "Waiting for instances to start up..."
-    time.sleep(5)
-    wait_for_instances(conn, master_nodes)
-    wait_for_instances(conn, slave_nodes)
-    print "Waiting %d more seconds..." % wait_secs
-    time.sleep(wait_secs)
+def is_ssh_available(host, opts):
+    "Checks if SSH is available on the host."
+    try:
+        with open(os.devnull, 'w') as devnull:
+            ret = subprocess.check_call(
+                ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
+                                     '%s@%s' % (opts.user, host), stringify_command('true')],
+                stdout=devnull,
+                stderr=devnull
+            )
+        return ret == 0
+    except subprocess.CalledProcessError as e:
+        return False
+
+
+def is_cluster_ssh_available(cluster_instances, opts):
+    for i in cluster_instances:
+        if not is_ssh_available(host=i.ip_address, opts=opts):
+            return False
+    else:
+        return True
+
+
+def wait_for_cluster_state(cluster_instances, cluster_state, opts):
+    """
+    cluster_instances: a list of boto.ec2.instance.Instance
+    cluster_state: a string representing the desired state of all the instances in the cluster
+           value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
+           'running', 'terminated', etc.
+           (would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
+    """
+    sys.stdout.write(
+        "Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state)
+    )
+    sys.stdout.flush()
+
+    num_attempts = 0
+
+    while True:
+        time.sleep(3 * num_attempts)
+
+        for i in cluster_instances:
+            s = i.update()  # capture output to suppress print to screen in newer versions of boto
+
+        if cluster_state == 'ssh-ready':
+            if all(i.state == 'running' for i in cluster_instances) and \
+               is_cluster_ssh_available(cluster_instances, opts):
+                break
+        else:
+            if all(i.state == cluster_state for i in cluster_instances):
+                break
+
+        num_attempts += 1
+
+        sys.stdout.write(".")
+        sys.stdout.flush()
+
+    sys.stdout.write("\n")
 
 
 # Get number of local disks available for a given EC2 instance type.
@@ -868,6 +907,16 @@ def real_main():
     (opts, action, cluster_name) = parse_args()
 
     # Input parameter validation
+    if opts.wait is not None:
+        # NOTE: DeprecationWarnings are silent in 2.7+ by default.
+        #       To show them, run Python with the -Wdefault switch.
+        # See: https://docs.python.org/3.5/whatsnew/2.7.html
+        warnings.warn(
+            "This option is deprecated and has no effect. "
+            "spark-ec2 automatically waits as long as necessary for clusters to startup.",
+            DeprecationWarning
+        )
+
     if opts.ebs_vol_num > 8:
         print >> stderr, "ebs-vol-num cannot be greater than 8"
         sys.exit(1)
@@ -890,7 +939,11 @@ def real_main():
             (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
         else:
             (master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
-            wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
+        wait_for_cluster_state(
+            cluster_instances=(master_nodes + slave_nodes),
+            cluster_state='ssh-ready',
+            opts=opts
+        )
         setup_cluster(conn, master_nodes, slave_nodes, opts, True)
 
     elif action == "destroy":
@@ -919,7 +972,11 @@ def real_main():
                 else:
                     group_names = [opts.security_group_prefix + "-master",
                                    opts.security_group_prefix + "-slaves"]
-
+                wait_for_cluster_state(
+                    cluster_instances=(master_nodes + slave_nodes),
+                    cluster_state='terminated',
+                    opts=opts
+                )
                 attempt = 1
                 while attempt <= 3:
                     print "Attempt %d" % attempt
@@ -1019,7 +1076,11 @@ def real_main():
         for inst in master_nodes:
             if inst.state not in ["shutting-down", "terminated"]:
                 inst.start()
-        wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
+        wait_for_cluster_state(
+            cluster_instances=(master_nodes + slave_nodes),
+            cluster_state='ssh-ready',
+            opts=opts
+        )
         setup_cluster(conn, master_nodes, slave_nodes, opts, False)
 
     else:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org