You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:37:28 UTC

svn commit: r1132070 - in /incubator/mesos/trunk/ec2: ./ deploy.lucid64/root/hadoop-0.20.2/conf/ deploy.lucid64/root/mesos-ec2/ deploy.lucid64/root/mesos-ec2/hadoop-framework-conf/

Author: benh
Date: Sun Jun  5 08:37:27 2011
New Revision: 1132070

URL: http://svn.apache.org/viewvc?rev=1132070&view=rev
Log:
Various work on EC2 scripts:
- Added start and stop commands for use with EBS-backed instances
- Added option for creating an EBS volume of a given size to attach to each node as /vol
- Fixed HDFS to use only devices that exist on the host
- Made MapReduce in default framework conf use multiple disks for map outputs

Modified:
    incubator/mesos/trunk/ec2/deploy.lucid64/root/hadoop-0.20.2/conf/hdfs-site.xml
    incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/copy-dir
    incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/hadoop-framework-conf/mapred-site.xml
    incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/redeploy-mesos
    incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/setup
    incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/setup-slave
    incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/start-mesos
    incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/stop-mesos
    incubator/mesos/trunk/ec2/mesos_ec2.py

Modified: incubator/mesos/trunk/ec2/deploy.lucid64/root/hadoop-0.20.2/conf/hdfs-site.xml
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/ec2/deploy.lucid64/root/hadoop-0.20.2/conf/hdfs-site.xml?rev=1132070&r1=1132069&r2=1132070&view=diff
==============================================================================
--- incubator/mesos/trunk/ec2/deploy.lucid64/root/hadoop-0.20.2/conf/hdfs-site.xml (original)
+++ incubator/mesos/trunk/ec2/deploy.lucid64/root/hadoop-0.20.2/conf/hdfs-site.xml Sun Jun  5 08:37:27 2011
@@ -15,7 +15,7 @@
 
   <property>
     <name>dfs.data.dir</name>
-    <value>/mnt/hdfs/dfs/data,/mnt2/hdfs/dfs/data,/mnt3/hdfs/dfs/data,/mnt4/hdfs/dfs/data</value>
+    <value>{{hdfs_data_dirs}}</value>
   </property>
 
   <property>

Modified: incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/copy-dir
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/copy-dir?rev=1132070&r1=1132069&r2=1132070&view=diff
==============================================================================
--- incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/copy-dir (original)
+++ incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/copy-dir Sun Jun  5 08:37:27 2011
@@ -16,5 +16,6 @@ SSH_OPTS="-o StrictHostKeyChecking=no -o
 echo "RSYNC'ing $DIR to slaves..."
 for slave in $SLAVES; do
     echo $slave
-    rsync -e "ssh $SSH_OPTS" -az "$DIR" "$slave:$DEST"
+    rsync -e "ssh $SSH_OPTS" -az "$DIR" "$slave:$DEST" & sleep 0.3
 done
+wait

Modified: incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/hadoop-framework-conf/mapred-site.xml
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/hadoop-framework-conf/mapred-site.xml?rev=1132070&r1=1132069&r2=1132070&view=diff
==============================================================================
--- incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/hadoop-framework-conf/mapred-site.xml (original)
+++ incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/hadoop-framework-conf/mapred-site.xml Sun Jun  5 08:37:27 2011
@@ -11,6 +11,11 @@
   </property>
 
   <property>
+    <name>mapred.local.dir</name>
+    <value>{{mapred_local_dirs}}</value>
+  </property>
+
+  <property>
     <name>mapred.jobtracker.taskScheduler</name>
     <value>org.apache.hadoop.mapred.MesosScheduler</value>
   </property>

Modified: incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/redeploy-mesos
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/redeploy-mesos?rev=1132070&r1=1132069&r2=1132070&view=diff
==============================================================================
--- incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/redeploy-mesos (original)
+++ incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/redeploy-mesos Sun Jun  5 08:37:27 2011
@@ -11,12 +11,14 @@ if [[ $NUM_MASTERS -gt 1 ]]; then
   echo "RSYNC'ing /root/mesos to masters..."
   for master in $MASTERS; do
     echo $master
-    rsync -e "ssh $SSH_OPTS" -az --exclude '*.d' --exclude '*.o' --exclude '*.cpp' --exclude '*.hpp' --exclude '*.pyc' --exclude 'mesos/frameworks/hadoop-0.20.0/logs/*' --exclude 'mesos/work' --exclude 'mesos/logs' --exclude 'mesos/test_output' /root/mesos $master:/root
+    rsync -e "ssh $SSH_OPTS" -az --exclude '*.d' --exclude '*.o' --exclude '*.cpp' --exclude '*.hpp' --exclude '*.pyc' --exclude 'mesos/frameworks/hadoop-0.20.0/logs/*' --exclude 'mesos/work' --exclude 'mesos/logs' --exclude 'mesos/test_output' /root/mesos $master:/root & sleep 0.3
   done
+  wait
 fi
 
 echo "RSYNC'ing /root/mesos to slaves..."
 for slave in $SLAVES; do
   echo $slave
-  rsync -e "ssh $SSH_OPTS" -az --exclude '*.d' --exclude '*.o' --exclude '*.cpp' --exclude '*.hpp' --exclude '*.pyc' --exclude 'mesos/frameworks/hadoop-0.20.0/logs/*' --exclude 'mesos/work' --exclude 'mesos/logs' --exclude 'mesos/test_output' /root/mesos $slave:/root
+  rsync -e "ssh $SSH_OPTS" -az --exclude '*.d' --exclude '*.o' --exclude '*.cpp' --exclude '*.hpp' --exclude '*.pyc' --exclude 'mesos/frameworks/hadoop-0.20.0/logs/*' --exclude 'mesos/work' --exclude 'mesos/logs' --exclude 'mesos/test_output' /root/mesos $slave:/root & sleep 0.3
 done
+wait

Modified: incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/setup
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/setup?rev=1132070&r1=1132069&r2=1132070&view=diff
==============================================================================
--- incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/setup (original)
+++ incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/setup Sun Jun  5 08:37:27 2011
@@ -59,7 +59,7 @@ for master in $MASTERS; do
   sleep 0.3
 done
 
-ssh -q $SSH_OPTS localhost echo -n
+ssh -q $SSH_OPTS localhost echo -n &
 
 if [[ $NUM_ZOOS != 0 ]] ; then
   echo "SSH'ing to Zoo server(s) to approve keys..."
@@ -78,54 +78,61 @@ for slave in $SLAVES; do
   ssh $SSH_OPTS $slave echo -n &
   sleep 0.3
 done
-echo "Waiting for commands to finish..."
+
+echo "Waiting for ssh commands to finish..."
 wait
 
 if [[ $NUM_MASTERS -gt 1 ]] ; then
   echo "RSYNC'ing /root/mesos-ec2 to other master servers..."
   for master in `cat $MASTERS_FILE | sed '1d'`; do
       echo $master
-      rsync -e "ssh $SSH_OPTS" -az /root/mesos-ec2 $master:/root
+      rsync -e "ssh $SSH_OPTS" -az /root/mesos-ec2 $master:/root & sleep 0.3
   done
+  wait
 fi
 
 if [[ $NUM_ZOOS != 0 ]] ; then
   echo "RSYNC'ing /root/mesos-ec2 to other Zoo servers..."
   for zoo in $ZOOS; do
       echo $zoo
-      rsync -e "ssh $SSH_OPTS" -az /root/mesos-ec2 $zoo:/root
+      rsync -e "ssh $SSH_OPTS" -az /root/mesos-ec2 $zoo:/root & sleep 0.3
   done
+  wait
 fi
 
 echo "RSYNC'ing /root/mesos-ec2 to slaves..."
 for slave in $SLAVES; do
   echo $slave
-  rsync -e "ssh $SSH_OPTS" -az /root/mesos-ec2 $slave:/root
-  scp ~/.ssh/id_rsa $slave:.ssh
+  rsync -e "ssh $SSH_OPTS" -az /root/mesos-ec2 $slave:/root &
+  scp ~/.ssh/id_rsa $slave:.ssh &
 done
+wait
 
 echo "Setting up slaves..."
 for slave in $SLAVES; do
   echo $slave
-  ssh -t $SSH_OPTS root@$slave "mesos-ec2/setup-slave"
+  ssh -t $SSH_OPTS root@$slave "mesos-ec2/setup-slave" &
 done
+wait
 
-echo "Setting up master as slave (i.e. for local)..."
+echo "Running slave setup on master (i.e. for local)..."
 ./setup-slave
 
 if [[ $NUM_MASTERS -gt 1 ]] ; then
   echo "RSYNC'ing Hadoop config files for HDFS to other masters..."
   for master in `cat $MASTERS_FILE | sed '1d'`; do
     echo $master
-    rsync -e "ssh $SSH_OPTS" -az $HADOOP_HOME/conf $master:$HADOOP_HOME
+    rsync -e "ssh $SSH_OPTS" -az $HADOOP_HOME/conf $master:$HADOOP_HOME & sleep 0.3
   done
+  wait
 fi
 
 echo "RSYNC'ing Hadoop config files for HDFS to slaves..."
 for slave in $SLAVES; do
   echo $slave
-  rsync -e "ssh $SSH_OPTS" -az $HADOOP_HOME/conf $slave:$HADOOP_HOME
+  rsync -e "ssh $SSH_OPTS" -az $HADOOP_HOME/conf $slave:$HADOOP_HOME & sleep 0.3
 done
+wait
 
 DOWNLOADED=0
 
@@ -143,7 +150,6 @@ if [[ "$DOWNLOAD_METHOD" == "git" ]] ; t
   DOWNLOADED=1
 fi
 
-
 # Build Mesos if we downloaded it
 if [[ "$DOWNLOADED" == "1" ]] ; then
   echo "Building Mesos..."
@@ -184,14 +190,17 @@ if [ ! -e /mnt/nfs ] ; then
   mkdir -p /mnt/nfs
   rm -fr /nfs
   ln -s /mnt/nfs /nfs
-  echo "/nfs    10.0.0.0/8(ro,async,no_subtree_check)" >> /etc/exports
+  if ! grep -e '/nfs' /etc/exports; then
+    echo "/nfs    10.0.0.0/8(ro,async,no_subtree_check)" >> /etc/exports
+  fi
   exportfs -a
 fi
 echo "Mounting NFS on slaves..."
 for slave in $SLAVES; do
   echo $slave
-  ssh -t $SSH_OPTS root@$slave "mkdir -p /nfs; mount $HOSTNAME:/nfs /nfs"
+  ssh -t $SSH_OPTS root@$slave "mkdir -p /nfs; mount $HOSTNAME:/nfs /nfs" & sleep 0.3
 done
+wait
 
 echo "Formatting HDFS namenode..."
 $HADOOP_HOME/bin/hadoop namenode -format
@@ -199,8 +208,16 @@ $HADOOP_HOME/bin/hadoop namenode -format
 echo "Starting HDFS..."
 $HADOOP_HOME/bin/start-dfs.sh
 
-#echo "Setting up torque"
-#./setup-torque
+sleep 1
+
+if [[ $NUM_ZOOS != 0 ]]; then
+  echo "Starting ZooKeeper quorum..."
+  for zoo in $ZOOS; do
+    ssh $SSH_OPTS $zoo "/root/mesos/third_party/zookeeper-*/bin/zkServer.sh start </dev/null >/dev/null" & sleep 0.1
+  done
+  wait
+  sleep 2
+fi
 
 echo "Starting Mesos cluster..."
 ./start-mesos

Modified: incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/setup-slave
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/setup-slave?rev=1132070&r1=1132069&r2=1132070&view=diff
==============================================================================
--- incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/setup-slave (original)
+++ incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/setup-slave Sun Jun  5 08:37:27 2011
@@ -2,6 +2,11 @@
 
 echo "Setting up Mesos slave on `hostname`..."
 
+# Mount options to use for ext3 and xfs disks (the ephemeral disks
+# are ext3, but we use xfs for EBS volumes to format them faster)
+EXT3_MOUNT_OPTS="defaults,noatime,nodiratime"
+XFS_MOUNT_OPTS="defaults,noatime,nodiratime,allocsize=8m"
+
 # Create Hadoop and HDFS directories in a given parent directory
 # (for example /mnt, /mnt2, and so on)
 function create_hadoop_dirs {
@@ -19,11 +24,12 @@ mkdir -p /mnt/mesos-work
 # Mount any ephemeral volumes we might have beyond /mnt
 function setup_extra_volume {
   device=$1
-  mountpoint=$2
-  if [ -e $device ]; then
-    mkdir -p $mountpoint
-    mount $device $mountpoint
-    create_hadoop_dirs $mountpoint
+  mount_point=$2
+  if [[ -e $device && ! -e $mount_point ]]; then
+    mkdir -p $mount_point
+    mount -o $EXT3_MOUNT_OPTS $device $mount_point
+    echo "$device $mount_point auto $EXT3_MOUNT_OPTS 0 0" >> /etc/fstab
+    create_hadoop_dirs $mount_point
   fi
 }
 setup_extra_volume /dev/sdc /mnt2
@@ -31,5 +37,23 @@ setup_extra_volume /dev/sdd /mnt3
 setup_extra_volume /dev/sde /mnt4
 
 # Mount cgroup file system
-mkdir -p /cgroup
-mount -t cgroup cgroup /cgroup
+if [[ ! -e /cgroup ]]; then
+  mkdir -p /cgroup
+  mount -t cgroup none /cgroup
+  echo "none /cgroup cgroup defaults 0 0" >> /etc/fstab
+fi
+
+# Format and mount EBS volume (/dev/sdv) as /vol if the device exists
+# and we have not already created /vol
+if [[ -e /dev/sdv && ! -e /vol ]]; then
+  mkdir /vol
+  if mkfs.xfs -q /dev/sdv; then
+    mount -o $XFS_MOUNT_OPTS /dev/sdv /vol
+    echo "/dev/sdv /vol xfs $XFS_MOUNT_OPTS 0 0" >> /etc/fstab
+  else
+    # mkfs.xfs is not installed on this machine or has failed;
+    # delete /vol so that the user doesn't think we successfully
+    # mounted the EBS volume
+    rmdir /vol
+  fi
+fi

Modified: incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/start-mesos
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/start-mesos?rev=1132070&r1=1132069&r2=1132070&view=diff
==============================================================================
--- incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/start-mesos (original)
+++ incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/start-mesos Sun Jun  5 08:37:27 2011
@@ -30,19 +30,13 @@ fi
 echo "Running with master parameter: "$master_arg
 
 if [[ $NUM_ZOOS != 0 ]]; then
-  echo "Starting ZooKeeper on $ZOOS"
-  for zoo in $ZOOS; do
-    ssh $SSH_OPTS $zoo "/root/mesos/third_party/zookeeper-*/bin/zkServer.sh start </dev/null >/dev/null"
-  done
-
-  sleep 2
-
   masterid=1
   for master in $MASTERS; do
     echo "Starting master $masterid on $master"
-    ssh $SSH_OPTS $master "/root/mesos-ec2/mesos-daemon mesos-master -p 5050 -u $master_arg $@ </dev/null >/dev/null"
+    ssh $SSH_OPTS $master "/root/mesos-ec2/mesos-daemon mesos-master -p 5050 -u $master_arg $@ </dev/null >/dev/null" & sleep 0.1
     masterid=$(($masterid+1))
   done
+  wait
 else
   echo "Starting master on $ACTIVE_MASTER"
   ssh $SSH_OPTS $ACTIVE_MASTER "/root/mesos-ec2/mesos-daemon mesos-master -p 5050 $@ </dev/null >/dev/null"

Modified: incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/stop-mesos
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/stop-mesos?rev=1132070&r1=1132069&r2=1132070&view=diff
==============================================================================
--- incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/stop-mesos (original)
+++ incubator/mesos/trunk/ec2/deploy.lucid64/root/mesos-ec2/stop-mesos Sun Jun  5 08:37:27 2011
@@ -15,6 +15,7 @@ wait
 
 for master in $MASTERS; do
   echo "Stopping master on $master"
-  ssh $SSH_OPTS $master pkill mesos-master
+  ssh $SSH_OPTS $master pkill mesos-master &
   sleep 0.1
 done
+wait

Modified: incubator/mesos/trunk/ec2/mesos_ec2.py
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/ec2/mesos_ec2.py?rev=1132070&r1=1132069&r2=1132070&view=diff
==============================================================================
--- incubator/mesos/trunk/ec2/mesos_ec2.py (original)
+++ incubator/mesos/trunk/ec2/mesos_ec2.py Sun Jun  5 08:37:27 2011
@@ -8,15 +8,16 @@ import os
 import subprocess
 import sys
 import time
-from boto import ec2
+import traceback
 from optparse import OptionParser
 from sys import stderr
 from tempfile import NamedTemporaryFile
+from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
 
 
 def parse_args():
   parser = OptionParser(usage="mesos-ec2 [options] <action> <cluster_name>"
-      + "\n\n<action> can be: launch, shutdown, login, get-master",
+      + "\n\n<action> can be: launch, destroy, login, stop, start, get-master",
       add_help_option=False)
   parser.add_option("-h", "--help", action="help",
                     help="Show this help message and exit")
@@ -27,7 +28,8 @@ def parse_args():
   parser.add_option("-i", "--identity-file", 
       help="SSH private key file to use for logging into instances")
   parser.add_option("-t", "--instance-type", default="m1.large",
-      help="Type of instance to launch (default: m1.large). WARNING: must be 64 bit, thus small instances won't work")
+      help="Type of instance to launch (default: m1.large). " +
+           "WARNING: must be 64 bit, thus small instances won't work")
   parser.add_option("-m", "--master-instance-type", default="",
       help="Master instance type (leave empty for same as instance-type)")
   parser.add_option("-z", "--zone", default="us-east-1b",
@@ -50,6 +52,9 @@ def parse_args():
   parser.add_option("-f", "--ft", metavar="NUM_MASTERS", default="1", 
       help="Number of masters to run. Default is 1. " + 
            "Greater values cause Mesos to run in FT mode with ZooKeeper.")
+  parser.add_option("--ebs-vol-size", metavar="SIZE", type="int", default=0,
+      help="Attach a new EBS volume of size SIZE (in GB) to each node as " +
+           "/vol. The volumes will be deleted when the instances terminate.")
   (opts, args) = parser.parse_args()
   opts.ft = int(opts.ft)
   if len(args) != 2:
@@ -98,8 +103,14 @@ def wait_for_instances(conn, reservation
       return
 
 
+# Check whether a given EC2 instance object is in a state we consider active,
+# i.e. not terminating or terminated. We count both stopping and stopped as
+# active since we can restart stopped clusters.
+def is_active(instance):
+  return (instance.state in ['pending', 'running', 'stopping', 'stopped'])
+
+
 def launch_cluster(conn, opts, cluster_name):
-  zoo_res = None
   print "Setting up security groups..."
   master_group = get_or_make_group(conn, cluster_name + "-master")
   slave_group = get_or_make_group(conn, cluster_name + "-slaves")
@@ -128,15 +139,16 @@ def launch_cluster(conn, opts, cluster_n
     zoo_group.authorize('tcp', 2181, 2181, '0.0.0.0/0')
     zoo_group.authorize('tcp', 2888, 2888, '0.0.0.0/0')
     zoo_group.authorize('tcp', 3888, 3888, '0.0.0.0/0')
+  # Check if instances are already running in our groups
   print "Checking for running cluster..."
   reservations = conn.get_all_instances()
   for res in reservations:
     group_names = [g.id for g in res.groups]
     if master_group.name in group_names or slave_group.name in group_names or zoo_group.name in group_names:
-      active = [i for i in res.instances if i.state in ['pending', 'running']]
+      active = [i for i in res.instances if is_active(i)]
       if len(active) > 0:
         print >> stderr, ("ERROR: There are already instances running in " +
-            "group %s or %s" % (master_group.name, slave_group.name))
+            "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name))
         sys.exit(1)
   print "Launching instances..."
   try:
@@ -144,13 +156,23 @@ def launch_cluster(conn, opts, cluster_n
   except:
     print >> stderr, "Could not find AMI " + opts.ami
     sys.exit(1)
+  # Create block device mapping so that we can add an EBS volume if asked to
+  block_map = BlockDeviceMapping()
+  if opts.ebs_vol_size > 0:
+    device = EBSBlockDeviceType()
+    device.size = opts.ebs_vol_size
+    device.delete_on_termination = True
+    block_map["/dev/sdv"] = device
+  # Launch slaves
   slave_res = image.run(key_name = opts.key_pair,
                         security_groups = [slave_group],
                         instance_type = opts.instance_type,
                         placement = opts.zone,
                         min_count = opts.slaves,
-                        max_count = opts.slaves)
+                        max_count = opts.slaves,
+                        block_device_map = block_map)
   print "Launched slaves, regid = " + slave_res.id
+  # Launch masters
   master_type = opts.master_instance_type
   if master_type == "":
     master_type = opts.instance_type
@@ -159,16 +181,22 @@ def launch_cluster(conn, opts, cluster_n
                          instance_type = master_type,
                          placement = opts.zone,
                          min_count = opts.ft,
-                         max_count = opts.ft)
+                         max_count = opts.ft,
+                         block_device_map = block_map)
   print "Launched master, regid = " + master_res.id
+  # Launch ZooKeeper if required
   if opts.ft > 1:
     zoo_res = image.run(key_name = opts.key_pair,
                         security_groups = [zoo_group],
                         instance_type = opts.instance_type,
                         placement = opts.zone,
                         min_count = 3,
-                        max_count = 3)
+                        max_count = 3,
+                        block_device_map = block_map)
     print "Launched zoo, regid = " + zoo_res.id
+  else:
+    zoo_res = None
+  # Return all the instances
   return (master_res, slave_res, zoo_res)
 
 
@@ -179,7 +207,7 @@ def get_existing_cluster(conn, opts, clu
   slave_res = None
   zoo_res = None
   for res in reservations:
-    active = [i for i in res.instances if i.state in ['pending', 'running']]
+    active = [i for i in res.instances if is_active(i)]
     if len(active) > 0:
       group_names = [g.id for g in res.groups]
       if group_names == [cluster_name + "-master"]:
@@ -204,17 +232,62 @@ def get_existing_cluster(conn, opts, clu
     sys.exit(1)
 
 
+def setup_cluster(conn, master_res, slave_res, zoo_res, opts, deploy_ssh_key):
+  print "Deploying files to master..."
+  deploy_files(conn, "deploy." + opts.os, master_res.instances[0],
+      opts, master_res, slave_res, zoo_res)
+  if deploy_ssh_key:
+    print "Copying SSH key %s to master..." % opts.identity_file
+    master = master_res.instances[0].public_dns_name
+    ssh(master, opts, 'mkdir -p /root/.ssh')
+    scp(master, opts, opts.identity_file, '/root/.ssh/id_rsa')
+  print "Running setup on master..."
+  ssh(master, opts, "chmod u+x mesos-ec2/setup")
+  ssh(master, opts, "mesos-ec2/setup %s %s %s" % (opts.os, opts.download, opts.branch))
+  print "Done!"
+
+
+def wait_for_cluster(conn, master_res, slave_res, zoo_res):
+  print "Waiting for instances to start up..."
+  time.sleep(5)
+  wait_for_instances(conn, master_res)
+  wait_for_instances(conn, slave_res)
+  if zoo_res != None:
+    wait_for_instances(conn, zoo_res)
+  print "Waiting 30 more seconds..."
+  time.sleep(30)
+
+
+def get_num_disks(instance_type):
+  if instance_type in ["m1.xlarge", "c1.xlarge", "m2.xlarge", "cc1.4xlarge"]:
+    return 4
+  elif instance_type in ["m1.small", "c1.medium"]:
+    return 1
+  else:
+    return 2
+
+
 def deploy_files(conn, root_dir, instance, opts, master_res, slave_res, zoo_res):
   # TODO: Speed up deployment by creating a temp directory with the
   # template-transformed files and then rsyncing it
 
   active_master = master_res.instances[0].public_dns_name
 
+  num_disks = get_num_disks(opts.instance_type)
+  hdfs_data_dirs = "/mnt/hdfs/dfs/data"
+  mapred_local_dirs = "/mnt/hadoop/mrlocal"
+  if num_disks > 1:
+    for i in range(2, num_disks + 1):
+      hdfs_data_dirs += ",/mnt%d/hdfs/dfs/data" % i
+      mapred_local_dirs += ",/mnt%d/hadoop/mrlocal" % i
+
   template_vars = {
     "master_list" : '\n'.join([i.public_dns_name for i in master_res.instances]),
     "active_master" : active_master,
     "master_url" : active_master + ":5050",
-    "slave_list" : '\n'.join([i.public_dns_name for i in slave_res.instances])
+    "slave_list" : '\n'.join([i.public_dns_name for i in slave_res.instances]),
+    "hdfs_data_dirs" : hdfs_data_dirs,
+    "mapred_local_dirs" : mapred_local_dirs
   }
 
   if opts.ft > 1:
@@ -229,7 +302,7 @@ def deploy_files(conn, root_dir, instanc
     for filename in files:
       if filename[0] not in '#.~' and filename[-1] != '~':
         dest_file = os.path.join(dest_dir, filename)
-        print "Setting up '%s' file." % dest_file
+        print "Setting up %s" % dest_file
         with open(os.path.join(path, filename)) as file:
           text = file.read()
           for key in template_vars:
@@ -255,44 +328,30 @@ def ssh(host, opts, command):
 
 def main():
   (opts, action, cluster_name) = parse_args()
-  print "Connecting to EC2..."
   conn = boto.connect_ec2()
   if action == "launch":
     if opts.resume:
       (master_res, slave_res, zoo_res) = get_existing_cluster(conn, opts, cluster_name)
     else:
       (master_res, slave_res, zoo_res) = launch_cluster(conn, opts, cluster_name)
-      print "Waiting for instances to start up..."
-      time.sleep(5)
-      wait_for_instances(conn, master_res)
-      wait_for_instances(conn, slave_res)
-      if opts.ft > 1:
-        wait_for_instances(conn, zoo_res)
-      print "Waiting 20 more seconds..."
-      time.sleep(20)
-    print "Deploying files to master..."
-    deploy_files(conn, "deploy." + opts.os, master_res.instances[0],
-        opts, master_res, slave_res, zoo_res)
-    print "Copying SSH key %s to master..." % opts.identity_file
-    master = master_res.instances[0].public_dns_name
-    ssh(master, opts, 'mkdir -p /root/.ssh')
-    scp(master, opts, opts.identity_file, '/root/.ssh/id_rsa')
-    print "Running setup on master..."
-    ssh(master, opts, "chmod u+x mesos-ec2/setup")
-    ssh(master, opts, "mesos-ec2/setup %s %s %s" % (opts.os, opts.download, opts.branch))
-    print "Done!"
-  elif action == "shutdown":
-    response = raw_input("Are you sure you want to shut down the cluster " +
-        cluster_name + "? (y/N) ")
+    wait_for_cluster(conn, master_res, slave_res, zoo_res)
+    setup_cluster(conn, master_res, slave_res, zoo_res, opts, True)
+  elif action == "destroy":
+    response = raw_input("Are you sure you want to destroy the cluster " +
+        cluster_name + "?\nALL DATA ON ALL NODES WILL BE LOST!!\n" +
+        "Destroy cluster " + cluster_name + " (y/N): ")
     if response == "y":
       (master_res, slave_res, zoo_res) = get_existing_cluster(conn, opts, cluster_name)
-      print "Shutting down master..."
-      master_res.stop_all()
-      print "Shutting down slaves..."
-      slave_res.stop_all()
+      print "Terminating master..."
+      for inst in master_res.instances:
+        inst.terminate()
+      print "Terminating slaves..."
+      for inst in slave_res.instances:
+        inst.terminate()
       if opts.ft > 1:
-        print "Shutting down zoo..."
-        zoo_res.stop_all()
+        print "Terminating zoo..."
+        for inst in zoo_res.instances:
+          inst.terminate()
   elif action == "login":
     (master_res, slave_res, zoo_res) = get_existing_cluster(conn, opts, cluster_name)
     master = master_res.instances[0].public_dns_name
@@ -305,8 +364,46 @@ def main():
   elif action == "get-master":
     (master_res, slave_res) = get_existing_cluster(conn, opts, cluster_name)
     print master_res.instances[0].public_dns_name
+  elif action == "stop":
+    response = raw_input("Are you sure you want to stop the cluster " +
+        cluster_name + "?\nDATA ON EPHEMERAL DISKS WILL BE LOST, " +
+        "BUT THE CLUSTER WILL KEEP USING SPACE ON EBS IF IT IS " +
+        "EBS-BACKED!\n" +
+        "Stop cluster " + cluster_name + " (y/N): ")
+    if response == "y":
+      (master_res, slave_res, zoo_res) = get_existing_cluster(conn, opts, cluster_name)
+      print "Stopping master..."
+      for inst in master_res.instances:
+        inst.stop()
+      print "Stopping slaves..."
+      for inst in slave_res.instances:
+        inst.stop()
+      if opts.ft > 1:
+        print "Stopping zoo..."
+        for inst in zoo_res.instances:
+          inst.stop()
+  elif action == "start":
+    (master_res, slave_res, zoo_res) = get_existing_cluster(conn, opts, cluster_name)
+    print "Starting master..."
+    for inst in master_res.instances:
+      inst.start()
+    print "Starting slaves..."
+    for inst in slave_res.instances:
+      inst.start()
+    if opts.ft > 1:
+      print "Starting zoo..."
+      for inst in zoo_res.instances:
+        inst.start()
+    wait_for_cluster(conn, master_res, slave_res, zoo_res)
+    setup_cluster(conn, master_res, slave_res, zoo_res, opts, False)
+  elif action == "shutdown":
+    print >> stderr, ("The shutdown action is no longer available.\n" +
+        "Use either 'destroy' to delete a cluster and all data on it,\n" +
+        "or 'stop' to shut down the machines but have them persist if\n" +
+        "you launched an EBS-backed cluster.")
+    sys.exit(1)
   else:
-    print >> STDERR, "Invalid action: %s" % action
+    print >> stderr, "Invalid action: %s" % action
     sys.exit(1)