You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@whirr.apache.org by as...@apache.org on 2011/07/14 02:24:28 UTC

svn commit: r1146524 - in /incubator/whirr/trunk: CHANGES.txt contrib/python/src/py/hadoop-ec2 contrib/python/src/py/hadoop/cloud/cli.py contrib/python/src/py/hadoop/cloud/providers/ec2.py contrib/python/src/py/hadoop/cloud/service.py

Author: asavu
Date: Thu Jul 14 00:24:28 2011
New Revision: 1146524

URL: http://svn.apache.org/viewvc?rev=1146524&view=rev
Log:
WHIRR-76. Support spot instances in python scripts (Soren Macbeth via  asavu)

Modified:
    incubator/whirr/trunk/CHANGES.txt
    incubator/whirr/trunk/contrib/python/src/py/hadoop-ec2
    incubator/whirr/trunk/contrib/python/src/py/hadoop/cloud/cli.py
    incubator/whirr/trunk/contrib/python/src/py/hadoop/cloud/providers/ec2.py
    incubator/whirr/trunk/contrib/python/src/py/hadoop/cloud/service.py

Modified: incubator/whirr/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/whirr/trunk/CHANGES.txt?rev=1146524&r1=1146523&r2=1146524&view=diff
==============================================================================
--- incubator/whirr/trunk/CHANGES.txt (original)
+++ incubator/whirr/trunk/CHANGES.txt Thu Jul 14 00:24:28 2011
@@ -9,6 +9,8 @@ Trunk (unreleased changes)
     WHIRR-326. Use jclouds provider metadata to help with cloud 
     provider configuration (asavu)
 
+    WHIRR-76. Support spot instances in python scripts (Soren Macbeth via  asavu)
+
   IMPROVEMENTS
 
     WHIRR-28. Add examples module (asavu)

Modified: incubator/whirr/trunk/contrib/python/src/py/hadoop-ec2
URL: http://svn.apache.org/viewvc/incubator/whirr/trunk/contrib/python/src/py/hadoop-ec2?rev=1146524&r1=1146523&r2=1146524&view=diff
==============================================================================
--- incubator/whirr/trunk/contrib/python/src/py/hadoop-ec2 (original)
+++ incubator/whirr/trunk/contrib/python/src/py/hadoop-ec2 Thu Jul 14 00:24:28 2011
@@ -1,4 +1,4 @@
-#!/usr/bin/env python2.5
+#!/usr/bin/env python
 
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with

Modified: incubator/whirr/trunk/contrib/python/src/py/hadoop/cloud/cli.py
URL: http://svn.apache.org/viewvc/incubator/whirr/trunk/contrib/python/src/py/hadoop/cloud/cli.py?rev=1146524&r1=1146523&r2=1146524&view=diff
==============================================================================
--- incubator/whirr/trunk/contrib/python/src/py/hadoop/cloud/cli.py (original)
+++ incubator/whirr/trunk/contrib/python/src/py/hadoop/cloud/cli.py Thu Jul 14 00:24:28 2011
@@ -296,7 +296,7 @@ def main():
                          opt.get('user_data_file'),
                          opt.get('availability_zone'), opt.get('user_packages'),
                          opt.get('auto_shutdown'), opt.get('env'),
-                         opt.get('security_group'))
+                         opt.get('security_group'), opt.get('spot_price'))
     service.launch_master(template, config_dir, opt.get('client_cidr'))
 
   elif command == 'launch-slaves':
@@ -311,7 +311,7 @@ def main():
                          opt.get('user_data_file'),
                          opt.get('availability_zone'), opt.get('user_packages'),
                          opt.get('auto_shutdown'), opt.get('env'),
-                         opt.get('security_group'))
+                         opt.get('security_group'), opt.get('spot_price'))
     service.launch_slaves(template)
 
   elif command == 'launch-cluster':
@@ -333,7 +333,7 @@ def main():
                          opt.get('user_data_file'),
                          opt.get('availability_zone'), opt.get('user_packages'),
                          opt.get('auto_shutdown'), opt.get('env'),
-                         opt.get('security_group')),
+                         opt.get('security_group'), opt.get('spot_price')),
         InstanceTemplate((DATANODE, TASKTRACKER), number_of_slaves,
                          get_image_id(service.cluster, opt),
                          opt.get('instance_type'), opt.get('key_name'),
@@ -341,7 +341,7 @@ def main():
                          opt.get('user_data_file'),
                          opt.get('availability_zone'), opt.get('user_packages'),
                          opt.get('auto_shutdown'), opt.get('env'),
-                         opt.get('security_group')),
+                         opt.get('security_group'), opt.get('spot_price')),
                          ]
     elif len(args) > 2 and len(args) % 2 == 0:
       print_usage(sys.argv[0])
@@ -358,7 +358,7 @@ def main():
                            opt.get('availability_zone'),
                            opt.get('user_packages'),
                            opt.get('auto_shutdown'), opt.get('env'),
-                           opt.get('security_group')))
+                           opt.get('security_group'), opt.get('spot_price')))
 
     service.launch_cluster(instance_templates, config_dir,
                            opt.get('client_cidr'))

Modified: incubator/whirr/trunk/contrib/python/src/py/hadoop/cloud/providers/ec2.py
URL: http://svn.apache.org/viewvc/incubator/whirr/trunk/contrib/python/src/py/hadoop/cloud/providers/ec2.py?rev=1146524&r1=1146523&r2=1146524&view=diff
==============================================================================
--- incubator/whirr/trunk/contrib/python/src/py/hadoop/cloud/providers/ec2.py (original)
+++ incubator/whirr/trunk/contrib/python/src/py/hadoop/cloud/providers/ec2.py Thu Jul 14 00:24:28 2011
@@ -243,6 +243,46 @@ class Ec2Cluster(Cluster):
       sys.stdout.write(".")
       sys.stdout.flush()
       time.sleep(1)
+    
+  def launch_spot_instances(self, roles, price, number, image_id, size_id,
+                       instance_user_data, **kwargs):
+    
+    for role in roles:
+      self._check_role_name(role)  
+      self._create_groups(role)
+      
+    user_data = instance_user_data.read_as_gzip_stream()
+    security_groups = self._get_group_names(roles) + kwargs.get('security_groups', [])
+    
+    spot_request = self.ec2Connection.request_spot_instances(price=price, image_id=image_id,
+      count=number, type='one-time', valid_from=None, valid_until=None,
+      launch_group=kwargs.get('launch_group', None),
+      availability_zone_group=kwargs.get('availability_zone_group', None),
+      key_name=kwargs.get('key_name', None),
+      security_groups=security_groups, user_data=user_data,
+      instance_type=size_id, placement=kwargs.get('placement', None))
+    spot_instance_request_ids = [request.id for request in spot_request]
+    instance_ids = self.wait_for_spot_instances(spot_instance_request_ids)
+    return instance_ids
+
+  def wait_for_spot_instances(self, request_ids, timeout=1200):
+    start_time = time.time()
+    while True:
+      if (time.time() - start_time >= timeout):
+        raise TimeoutException()
+      if self._all_spot_requests_started(self.ec2Connection.get_all_spot_instance_requests(request_ids)):
+        instance_ids = [request.instance_id for request in self.ec2Connection.get_all_spot_instance_requests(request_ids)]
+        if self._all_started(self.ec2Connection.get_all_instances(instance_ids)):
+          return instance_ids
+      sys.stdout.write(".")
+      sys.stdout.flush()
+      time.sleep(1)
+
+  def _all_spot_requests_started(self, requests):
+    for request in requests:
+      if request.state != "active":
+        return False
+    return True
 
   def _all_started(self, reservations):
     for res in reservations:
@@ -255,6 +295,9 @@ class Ec2Cluster(Cluster):
     instances = self._get_instances(self._get_cluster_group_name(), "running")
     if instances:
       self.ec2Connection.terminate_instances([i.id for i in instances])
+      spot_instance_request_ids = map(lambda x: x.spot_instance_request_id, filter(lambda x: x.spot_instance_request_id is not None, instances))
+      if spot_instance_request_ids:
+          self.ec2Connection.cancel_spot_instance_requests(spot_instance_request_ids)
 
   def delete(self):
     """

Modified: incubator/whirr/trunk/contrib/python/src/py/hadoop/cloud/service.py
URL: http://svn.apache.org/viewvc/incubator/whirr/trunk/contrib/python/src/py/hadoop/cloud/service.py?rev=1146524&r1=1146523&r2=1146524&view=diff
==============================================================================
--- incubator/whirr/trunk/contrib/python/src/py/hadoop/cloud/service.py (original)
+++ incubator/whirr/trunk/contrib/python/src/py/hadoop/cloud/service.py Thu Jul 14 00:24:28 2011
@@ -52,7 +52,7 @@ class InstanceTemplate(object):
                      key_name, public_key, private_key,
                      user_data_file_template=None, placement=None,
                      user_packages=None, auto_shutdown=None, env_strings=[],
-                     security_groups=[]):
+                     security_groups=[], spot_price=None):
     self.roles = roles
     self.number = number
     self.image_id = image_id
@@ -66,6 +66,7 @@ class InstanceTemplate(object):
     self.auto_shutdown = auto_shutdown
     self.env_strings = env_strings
     self.security_groups = security_groups
+    self.spot_price = spot_price
 
   def add_env_strings(self, env_strings):
     new_env_strings = list(self.env_strings or [])
@@ -210,12 +211,21 @@ class Service(object):
       "EBS_MAPPINGS": ebs_mappings,
     }) }
     instance_user_data = InstanceUserData(user_data_file_template, replacements)
-    instance_ids = self.cluster.launch_instances(it.roles, it.number, it.image_id,
-                                            it.size_id,
-                                            instance_user_data,
-                                            key_name=it.key_name,
-                                            public_key=it.public_key,
-                                            placement=it.placement)
+
+    if it.spot_price is None:
+        instance_ids = self.cluster.launch_instances(it.roles, it.number, it.image_id,
+                                                it.size_id,
+                                                instance_user_data,
+                                                key_name=it.key_name,
+                                                public_key=it.public_key,
+                                                placement=it.placement)
+    else:
+        instance_ids = self.cluster.launch_spot_instances(it.roles, it.spot_price, it.number, it.image_id,
+                                                it.size_id,
+                                                instance_user_data,
+                                                key_name=it.key_name,
+                                                public_key=it.public_key,
+                                                placement=it.placement)
     print "Waiting for %s instances in role %s to start" % \
       (it.number, ",".join(it.roles))
     try: