You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:58:10 UTC
svn commit: r1132202 - /incubator/mesos/trunk/ec2/mesos_ec2.py
Author: benh
Date: Sun Jun 5 08:58:09 2011
New Revision: 1132202
URL: http://svn.apache.org/viewvc?rev=1132202&view=rev
Log:
Launch nodes from all reservation IDs with a given cluster name instead
of always taking the first one.
Modified:
incubator/mesos/trunk/ec2/mesos_ec2.py
Modified: incubator/mesos/trunk/ec2/mesos_ec2.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/ec2/mesos_ec2.py?rev=1132202&r1=1132201&r2=1132202&view=diff
==============================================================================
--- incubator/mesos/trunk/ec2/mesos_ec2.py (original)
+++ incubator/mesos/trunk/ec2/mesos_ec2.py Sun Jun 5 08:58:09 2011
@@ -94,20 +94,13 @@ def get_or_make_group(conn, name):
# Wait for a set of launched instances to exit the "pending" state
# (i.e. either to start running or to fail and be terminated)
-def wait_for_instances(conn, reservation):
- instance_ids = [i.id for i in reservation.instances]
+def wait_for_instances(conn, instances):
while True:
- reservations = conn.get_all_instances(instance_ids)
- some_pending = False
- for res in reservations:
- if len([i for i in res.instances if i.state == 'pending']) > 0:
- some_pending = True
- break
- if some_pending:
+ for i in instances:
+ i.update()
+ if len([i for i in instances if i.state == 'pending']) > 0:
time.sleep(5)
else:
- for i in reservation.instances:
- i.update()
return
@@ -191,6 +184,7 @@ def launch_cluster(conn, opts, cluster_n
min_count = opts.slaves,
max_count = opts.slaves,
block_device_map = block_map)
+ slave_nodes = slave_res.instances
print "Launched slaves, regid = " + slave_res.id
# Launch masters
@@ -204,6 +198,7 @@ def launch_cluster(conn, opts, cluster_n
min_count = opts.ft,
max_count = opts.ft,
block_device_map = block_map)
+ master_nodes = master_res.instances
print "Launched master, regid = " + master_res.id
# Launch ZooKeeper nodes if required
@@ -215,12 +210,13 @@ def launch_cluster(conn, opts, cluster_n
min_count = 3,
max_count = 3,
block_device_map = block_map)
+ zoo_nodes = zoo_res.instances
print "Launched zoo, regid = " + zoo_res.id
else:
- zoo_res = None
+ zoo_nodes = []
# Return all the instances
- return (master_res, slave_res, zoo_res)
+ return (master_nodes, slave_nodes, zoo_nodes)
# Get the EC2 instances in an existing cluster if available.
@@ -229,29 +225,27 @@ def launch_cluster(conn, opts, cluster_n
def get_existing_cluster(conn, opts, cluster_name):
print "Searching for existing cluster " + cluster_name + "..."
reservations = conn.get_all_instances()
- master_res = None
- slave_res = None
- zoo_res = None
+ master_nodes = []
+ slave_nodes = []
+ zoo_nodes = []
for res in reservations:
active = [i for i in res.instances if is_active(i)]
if len(active) > 0:
group_names = [g.id for g in res.groups]
if group_names == [cluster_name + "-master"]:
- master_res = res
+ master_nodes += res.instances
elif group_names == [cluster_name + "-slaves"]:
- slave_res = res
+ slave_nodes += res.instances
elif group_names == [cluster_name + "-zoo"]:
- zoo_res = res
- if master_res != None and slave_res != None:
- print "Found master regid: " + master_res.id
- print "Found slave regid: " + slave_res.id
- if zoo_res != None:
- print "Found zoo regid: " + zoo_res.id
- return (master_res, slave_res, zoo_res)
+ zoo_nodes += res.instances
+ if master_nodes != [] and slave_nodes != []:
+ print ("Found %d master(s), %d slaves, %d ZooKeeper nodes" %
+ (len(master_nodes), len(slave_nodes), len(zoo_nodes)))
+ return (master_nodes, slave_nodes, zoo_nodes)
else:
- if master_res == None and slave_res != None:
+ if master_nodes == [] and slave_nodes != []:
print "ERROR: Could not find master in group " + cluster_name + "-master"
- elif master_res != None and slave_res == None:
+ elif master_nodes != [] and slave_nodes == []:
print "ERROR: Could not find slaves in group " + cluster_name + "-slaves"
else:
print "ERROR: Could not find any existing cluster"
@@ -260,10 +254,10 @@ def get_existing_cluster(conn, opts, clu
# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
-def setup_cluster(conn, master_res, slave_res, zoo_res, opts, deploy_ssh_key):
+def setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, deploy_ssh_key):
print "Deploying files to master..."
- deploy_files(conn, "deploy." + opts.os, opts, master_res, slave_res, zoo_res)
- master = master_res.instances[0].public_dns_name
+ deploy_files(conn, "deploy." + opts.os, opts, master_nodes, slave_nodes, zoo_nodes)
+ master = master_nodes[0].public_dns_name
if deploy_ssh_key:
print "Copying SSH key %s to master..." % opts.identity_file
ssh(master, opts, 'mkdir -p /root/.ssh')
@@ -275,13 +269,13 @@ def setup_cluster(conn, master_res, slav
# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
-def wait_for_cluster(conn, wait_secs, master_res, slave_res, zoo_res):
+def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes, zoo_nodes):
print "Waiting for instances to start up..."
time.sleep(5)
- wait_for_instances(conn, master_res)
- wait_for_instances(conn, slave_res)
- if zoo_res != None:
- wait_for_instances(conn, zoo_res)
+ wait_for_instances(conn, master_nodes)
+ wait_for_instances(conn, slave_nodes)
+ if zoo_nodes != []:
+ wait_for_instances(conn, zoo_nodes)
print "Waiting %d more seconds..." % wait_secs
time.sleep(wait_secs)
@@ -301,8 +295,8 @@ def get_num_disks(instance_type):
# cluster (e.g. lists of masters and slaves). Files are only deployed to
# the first master instance in the cluster, and we expect the setup
# script to be run on that instance to copy them to other nodes.
-def deploy_files(conn, root_dir, opts, master_res, slave_res, zoo_res):
- active_master = master_res.instances[0].public_dns_name
+def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
+ active_master = master_nodes[0].public_dns_name
num_disks = get_num_disks(opts.instance_type)
hdfs_data_dirs = "/mnt/ephemeral-hdfs/data"
@@ -312,18 +306,18 @@ def deploy_files(conn, root_dir, opts, m
hdfs_data_dirs += ",/mnt%d/ephemeral-hdfs/data" % i
mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i
- if zoo_res != None:
- zoo_list = '\n'.join([i.public_dns_name for i in zoo_res.instances])
+ if zoo_nodes != []:
+ zoo_list = '\n'.join([i.public_dns_name for i in zoo_nodes])
cluster_url = "zoo://" + ",".join(
- ["%s:2181/mesos" % i.public_dns_name for i in zoo_res.instances])
+ ["%s:2181/mesos" % i.public_dns_name for i in zoo_nodes])
else:
zoo_list = "NONE"
cluster_url = "1@%s:5050" % active_master
template_vars = {
- "master_list": '\n'.join([i.public_dns_name for i in master_res.instances]),
+ "master_list": '\n'.join([i.public_dns_name for i in master_nodes]),
"active_master": active_master,
- "slave_list": '\n'.join([i.public_dns_name for i in slave_res.instances]),
+ "slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]),
"zoo_list": zoo_list,
"cluster_url": cluster_url,
"hdfs_data_dirs": hdfs_data_dirs,
@@ -377,36 +371,36 @@ def main():
if action == "launch":
if opts.resume:
- (master_res, slave_res, zoo_res) = get_existing_cluster(
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
conn, opts, cluster_name)
else:
- (master_res, slave_res, zoo_res) = launch_cluster(
+ (master_nodes, slave_nodes, zoo_nodes) = launch_cluster(
conn, opts, cluster_name)
- wait_for_cluster(conn, opts.wait, master_res, slave_res, zoo_res)
- setup_cluster(conn, master_res, slave_res, zoo_res, opts, True)
+ wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes, zoo_nodes)
+ setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, True)
elif action == "destroy":
response = raw_input("Are you sure you want to destroy the cluster " +
cluster_name + "?\nALL DATA ON ALL NODES WILL BE LOST!!\n" +
"Destroy cluster " + cluster_name + " (y/N): ")
if response == "y":
- (master_res, slave_res, zoo_res) = get_existing_cluster(
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
conn, opts, cluster_name)
print "Terminating master..."
- for inst in master_res.instances:
+ for inst in master_nodes:
inst.terminate()
print "Terminating slaves..."
- for inst in slave_res.instances:
+ for inst in slave_nodes:
inst.terminate()
- if zoo_res != None:
+ if zoo_nodes != []:
print "Terminating zoo..."
- for inst in zoo_res.instances:
+ for inst in zoo_nodes:
inst.terminate()
elif action == "login":
- (master_res, slave_res, zoo_res) = get_existing_cluster(
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
conn, opts, cluster_name)
- master = master_res.instances[0].public_dns_name
+ master = master_nodes[0].public_dns_name
print "Logging into master " + master + "..."
proxy_opt = ""
if opts.proxy_port != None:
@@ -415,8 +409,8 @@ def main():
(opts.identity_file, proxy_opt, master), shell=True)
elif action == "get-master":
- (master_res, slave_res, zoo_res) = get_existing_cluster(conn, opts, cluster_name)
- print master_res.instances[0].public_dns_name
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(conn, opts, cluster_name)
+ print master_nodes[0].public_dns_name
elif action == "stop":
response = raw_input("Are you sure you want to stop the cluster " +
@@ -425,40 +419,40 @@ def main():
"AMAZON EBS IF IT IS EBS-BACKED!!\n" +
"Stop cluster " + cluster_name + " (y/N): ")
if response == "y":
- (master_res, slave_res, zoo_res) = get_existing_cluster(
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
conn, opts, cluster_name)
print "Stopping master..."
- for inst in master_res.instances:
+ for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.stop()
print "Stopping slaves..."
- for inst in slave_res.instances:
+ for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.stop()
- if zoo_res != None:
+ if zoo_nodes != []:
print "Stopping zoo..."
- for inst in zoo_res.instances:
+ for inst in zoo_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.stop()
elif action == "start":
- (master_res, slave_res, zoo_res) = get_existing_cluster(
+ (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
conn, opts, cluster_name)
print "Starting slaves..."
- for inst in slave_res.instances:
+ for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
print "Starting master..."
- for inst in master_res.instances:
+ for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
- if zoo_res != None:
+ if zoo_nodes != []:
print "Starting zoo..."
- for inst in zoo_res.instances:
+ for inst in zoo_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
- wait_for_cluster(conn, opts.wait, master_res, slave_res, zoo_res)
- setup_cluster(conn, master_res, slave_res, zoo_res, opts, False)
+ wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes, zoo_nodes)
+ setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, False)
elif action == "shutdown":
print >> stderr, ("The shutdown action is no longer available.\n" +