You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:58:10 UTC

svn commit: r1132202 - /incubator/mesos/trunk/ec2/mesos_ec2.py

Author: benh
Date: Sun Jun  5 08:58:09 2011
New Revision: 1132202

URL: http://svn.apache.org/viewvc?rev=1132202&view=rev
Log:
Launch nodes from all reservation IDs with a given cluster name instead
of always taking the first one.

Modified:
    incubator/mesos/trunk/ec2/mesos_ec2.py

Modified: incubator/mesos/trunk/ec2/mesos_ec2.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/ec2/mesos_ec2.py?rev=1132202&r1=1132201&r2=1132202&view=diff
==============================================================================
--- incubator/mesos/trunk/ec2/mesos_ec2.py (original)
+++ incubator/mesos/trunk/ec2/mesos_ec2.py Sun Jun  5 08:58:09 2011
@@ -94,20 +94,13 @@ def get_or_make_group(conn, name):
 
 # Wait for a set of launched instances to exit the "pending" state
 # (i.e. either to start running or to fail and be terminated)
-def wait_for_instances(conn, reservation):
-  instance_ids = [i.id for i in reservation.instances]
+def wait_for_instances(conn, instances):
   while True:
-    reservations = conn.get_all_instances(instance_ids)
-    some_pending = False
-    for res in reservations:
-      if len([i for i in res.instances if i.state == 'pending']) > 0:
-        some_pending = True
-        break
-    if some_pending:
+    for i in instances:
+      i.update()
+    if len([i for i in instances if i.state == 'pending']) > 0:
       time.sleep(5)
     else:
-      for i in reservation.instances:
-        i.update()
       return
 
 
@@ -191,6 +184,7 @@ def launch_cluster(conn, opts, cluster_n
                         min_count = opts.slaves,
                         max_count = opts.slaves,
                         block_device_map = block_map)
+  slave_nodes = slave_res.instances
   print "Launched slaves, regid = " + slave_res.id
 
   # Launch masters
@@ -204,6 +198,7 @@ def launch_cluster(conn, opts, cluster_n
                          min_count = opts.ft,
                          max_count = opts.ft,
                          block_device_map = block_map)
+  master_nodes = master_res.instances
   print "Launched master, regid = " + master_res.id
 
   # Launch ZooKeeper nodes if required
@@ -215,12 +210,13 @@ def launch_cluster(conn, opts, cluster_n
                         min_count = 3,
                         max_count = 3,
                         block_device_map = block_map)
+    zoo_nodes = zoo_res.instances
     print "Launched zoo, regid = " + zoo_res.id
   else:
-    zoo_res = None
+    zoo_nodes = []
 
   # Return all the instances
-  return (master_res, slave_res, zoo_res)
+  return (master_nodes, slave_nodes, zoo_nodes)
 
 
 # Get the EC2 instances in an existing cluster if available.
@@ -229,29 +225,27 @@ def launch_cluster(conn, opts, cluster_n
 def get_existing_cluster(conn, opts, cluster_name):
   print "Searching for existing cluster " + cluster_name + "..."
   reservations = conn.get_all_instances()
-  master_res = None
-  slave_res = None
-  zoo_res = None
+  master_nodes = []
+  slave_nodes = []
+  zoo_nodes = []
   for res in reservations:
     active = [i for i in res.instances if is_active(i)]
     if len(active) > 0:
       group_names = [g.id for g in res.groups]
       if group_names == [cluster_name + "-master"]:
-        master_res = res
+        master_nodes += res.instances
       elif group_names == [cluster_name + "-slaves"]:
-        slave_res = res
+        slave_nodes += res.instances
       elif group_names == [cluster_name + "-zoo"]:
-        zoo_res = res
-  if master_res != None and slave_res != None:
-    print "Found master regid: " + master_res.id
-    print "Found slave regid: " + slave_res.id
-    if zoo_res != None:
-      print "Found zoo regid: " + zoo_res.id
-    return (master_res, slave_res, zoo_res)
+        zoo_nodes += res.instances
+  if master_nodes != [] and slave_nodes != []:
+    print ("Found %d master(s), %d slaves, %d ZooKeeper nodes" %
+           (len(master_nodes), len(slave_nodes), len(zoo_nodes)))
+    return (master_nodes, slave_nodes, zoo_nodes)
   else:
-    if master_res == None and slave_res != None:
+    if master_nodes == [] and slave_nodes != []:
       print "ERROR: Could not find master in group " + cluster_name + "-master"
-    elif master_res != None and slave_res == None:
+    elif master_nodes != [] and slave_nodes == []:
       print "ERROR: Could not find slaves in group " + cluster_name + "-slaves"
     else:
       print "ERROR: Could not find any existing cluster"
@@ -260,10 +254,10 @@ def get_existing_cluster(conn, opts, clu
 
 # Deploy configuration files and run setup scripts on a newly launched
 # or started EC2 cluster.
-def setup_cluster(conn, master_res, slave_res, zoo_res, opts, deploy_ssh_key):
+def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_key):
   print "Deploying files to master..."
-  deploy_files(conn, "deploy." + opts.os, opts, master_res, slave_res, zoo_res)
-  master = master_res.instances[0].public_dns_name
+  deploy_files(conn, "deploy." + opts.os, opts, master_nodes, slave_nodes, zoo_nodes)
+  master = master_nodes[0].public_dns_name
   if deploy_ssh_key:
     print "Copying SSH key %s to master..." % opts.identity_file
     ssh(master, opts, 'mkdir -p /root/.ssh')
@@ -275,13 +269,13 @@ def setup_cluster(conn, master_res, slav
 
 
 # Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
-def wait_for_cluster(conn, wait_secs, master_res, slave_res, zoo_res):
+def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes, zoo_nodes):
   print "Waiting for instances to start up..."
   time.sleep(5)
-  wait_for_instances(conn, master_res)
-  wait_for_instances(conn, slave_res)
-  if zoo_res != None:
-    wait_for_instances(conn, zoo_res)
+  wait_for_instances(conn, master_nodes)
+  wait_for_instances(conn, slave_nodes)
+  if zoo_nodes != []:
+    wait_for_instances(conn, zoo_nodes)
   print "Waiting %d more seconds..." % wait_secs
   time.sleep(wait_secs)
 
@@ -301,8 +295,8 @@ def get_num_disks(instance_type):
 # cluster (e.g. lists of masters and slaves). Files are only deployed to
 # the first master instance in the cluster, and we expect the setup
 # script to be run on that instance to copy them to other nodes.
-def deploy_files(conn, root_dir, opts, master_res, slave_res, zoo_res):
-  active_master = master_res.instances[0].public_dns_name
+def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
+  active_master = master_nodes[0].public_dns_name
 
   num_disks = get_num_disks(opts.instance_type)
   hdfs_data_dirs = "/mnt/ephemeral-hdfs/data"
@@ -312,18 +306,18 @@ def deploy_files(conn, root_dir, opts, m
       hdfs_data_dirs += ",/mnt%d/ephemeral-hdfs/data" % i
       mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i
 
-  if zoo_res != None:
-    zoo_list = '\n'.join([i.public_dns_name for i in zoo_res.instances])
+  if zoo_nodes != []:
+    zoo_list = '\n'.join([i.public_dns_name for i in zoo_nodes])
     cluster_url = "zoo://" + ",".join(
-        ["%s:2181/mesos" % i.public_dns_name for i in zoo_res.instances])
+        ["%s:2181/mesos" % i.public_dns_name for i in zoo_nodes])
   else:
     zoo_list = "NONE"
     cluster_url = "1@%s:5050" % active_master
 
   template_vars = {
-    "master_list": '\n'.join([i.public_dns_name for i in master_res.instances]),
+    "master_list": '\n'.join([i.public_dns_name for i in master_nodes]),
     "active_master": active_master,
-    "slave_list": '\n'.join([i.public_dns_name for i in slave_res.instances]),
+    "slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]),
     "zoo_list": zoo_list,
     "cluster_url": cluster_url,
     "hdfs_data_dirs": hdfs_data_dirs,
@@ -377,36 +371,36 @@ def main():
 
   if action == "launch":
     if opts.resume:
-      (master_res, slave_res, zoo_res) = get_existing_cluster(
+      (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
           conn, opts, cluster_name)
     else:
-      (master_res, slave_res, zoo_res) = launch_cluster(
+      (master_nodes, slave_nodes, zoo_nodes) = launch_cluster(
           conn, opts, cluster_name)
-      wait_for_cluster(conn, opts.wait, master_res, slave_res, zoo_res)
-    setup_cluster(conn, master_res, slave_res, zoo_res, opts, True)
+      wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes, zoo_nodes)
+    setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, True)
 
   elif action == "destroy":
     response = raw_input("Are you sure you want to destroy the cluster " +
         cluster_name + "?\nALL DATA ON ALL NODES WILL BE LOST!!\n" +
         "Destroy cluster " + cluster_name + " (y/N): ")
     if response == "y":
-      (master_res, slave_res, zoo_res) = get_existing_cluster(
+      (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
           conn, opts, cluster_name)
       print "Terminating master..."
-      for inst in master_res.instances:
+      for inst in master_nodes:
         inst.terminate()
       print "Terminating slaves..."
-      for inst in slave_res.instances:
+      for inst in slave_nodes:
         inst.terminate()
-      if zoo_res != None:
+      if zoo_nodes != []:
         print "Terminating zoo..."
-        for inst in zoo_res.instances:
+        for inst in zoo_nodes:
           inst.terminate()
 
   elif action == "login":
-    (master_res, slave_res, zoo_res) = get_existing_cluster(
+    (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
         conn, opts, cluster_name)
-    master = master_res.instances[0].public_dns_name
+    master = master_nodes[0].public_dns_name
     print "Logging into master " + master + "..."
     proxy_opt = ""
     if opts.proxy_port != None:
@@ -415,8 +409,8 @@ def main():
         (opts.identity_file, proxy_opt, master), shell=True)
 
   elif action == "get-master":
-    (master_res, slave_res, zoo_res) = get_existing_cluster(conn, opts, cluster_name)
-    print master_res.instances[0].public_dns_name
+    (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(conn, opts, cluster_name)
+    print master_nodes[0].public_dns_name
 
   elif action == "stop":
     response = raw_input("Are you sure you want to stop the cluster " +
@@ -425,40 +419,40 @@ def main():
         "AMAZON EBS IF IT IS EBS-BACKED!!\n" +
         "Stop cluster " + cluster_name + " (y/N): ")
     if response == "y":
-      (master_res, slave_res, zoo_res) = get_existing_cluster(
+      (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
           conn, opts, cluster_name)
       print "Stopping master..."
-      for inst in master_res.instances:
+      for inst in master_nodes:
         if inst.state not in ["shutting-down", "terminated"]:
           inst.stop()
       print "Stopping slaves..."
-      for inst in slave_res.instances:
+      for inst in slave_nodes:
         if inst.state not in ["shutting-down", "terminated"]:
           inst.stop()
-      if zoo_res != None:
+      if zoo_nodes != []:
         print "Stopping zoo..."
-        for inst in zoo_res.instances:
+        for inst in zoo_nodes:
           if inst.state not in ["shutting-down", "terminated"]:
             inst.stop()
 
   elif action == "start":
-    (master_res, slave_res, zoo_res) = get_existing_cluster(
+    (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
         conn, opts, cluster_name)
     print "Starting slaves..."
-    for inst in slave_res.instances:
+    for inst in slave_nodes:
       if inst.state not in ["shutting-down", "terminated"]:
         inst.start()
     print "Starting master..."
-    for inst in master_res.instances:
+    for inst in master_nodes:
       if inst.state not in ["shutting-down", "terminated"]:
         inst.start()
-    if zoo_res != None:
+    if zoo_nodes != []:
       print "Starting zoo..."
-      for inst in zoo_res.instances:
+      for inst in zoo_nodes:
         if inst.state not in ["shutting-down", "terminated"]:
           inst.start()
-    wait_for_cluster(conn, opts.wait, master_res, slave_res, zoo_res)
-    setup_cluster(conn, master_res, slave_res, zoo_res, opts, False)
+    wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes, zoo_nodes)
+    setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, False)
 
   elif action == "shutdown":
     print >> stderr, ("The shutdown action is no longer available.\n" +