You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@libcloud.apache.org by to...@apache.org on 2011/10/30 00:52:07 UTC

svn commit: r1195065 - in /libcloud/trunk/libcloud/common: base.py cloudstack.py

Author: tomaz
Date: Sat Oct 29 22:52:07 2011
New Revision: 1195065

URL: http://svn.apache.org/viewvc?rev=1195065&view=rev
Log:
Add a new "PollingConnection" class which can work with "async" APIs and use it
in the CloudStackConnection class.

Modified:
    libcloud/trunk/libcloud/common/base.py
    libcloud/trunk/libcloud/common/cloudstack.py

Modified: libcloud/trunk/libcloud/common/base.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/common/base.py?rev=1195065&r1=1195064&r2=1195065&view=diff
==============================================================================
--- libcloud/trunk/libcloud/common/base.py (original)
+++ libcloud/trunk/libcloud/common/base.py Sat Oct 29 22:52:07 2011
@@ -17,6 +17,7 @@ import httplib
 import urllib
 import StringIO
 import ssl
+import time
 
 from xml.etree import ElementTree as ET
 from pipes import quote as pquote
@@ -274,6 +275,7 @@ class LoggingHTTPConnection(LoggingConne
         return LibcloudHTTPConnection.request(self, method, url,
                                                body, headers)
 
+
 class Connection(object):
     """
     A Base Connection class to derive from.
@@ -534,7 +536,116 @@ class Connection(object):
         Override in a provider's subclass.
         """
         return data
-    
+
+class PollingConnection(Connection):
+    """
+    Connection class which can also work with the async APIs.
+
+    After initial requests, this class periodically polls for jobs status and
+    waits until the job has finished.
+    If job doesn't finish in timeout seconds, an Exception thrown.
+    """
+    poll_interval = 0.5
+    timeout = 10
+    request_method = 'request'
+
+    def async_request(self, action, params=None, data='', headers=None,
+                      method='GET', context=None):
+        """
+        Perform an 'async' request to the specified path. Keep in mind that
+        this function is *blocking* and 'async' in this case means that the
+        hit URL only returns a job ID which is the periodically polled until 
+        the job has completed.
+
+        This function works like this:
+
+        - Perform a request to the specified path. Response should contain a
+          'job_id'.
+
+        - Returned 'job_id' is then used to construct a URL which is used for
+          retrieving job status. Constructed URL is then periodically polled
+          until the response indicates that the job has completed or the timeout
+          of 'self.timeout' seconds has been reached.
+
+        @type action: C{str}
+        @param action: A path
+
+        @type params: C{dict}
+        @param params: Optional mapping of additional parameters to send. If
+            None, leave as an empty C{dict}.
+
+        @type data: C{unicode}
+        @param data: A body of data to send with the request.
+
+        @type headers: C{dict}
+        @param headers: Extra headers to add to the request
+            None, leave as an empty C{dict}.
+
+        @type method: C{str}
+        @param method: An HTTP method such as "GET" or "POST".
+
+        @type context: C{dict}
+        @param context: Context dictionary which is passed to the functions
+        which construct initial and poll URL.
+
+        @return: An instance of type I{responseCls}
+        """
+
+        request = getattr(self, self.request_method)
+        kwargs = self.get_request_kwargs(action=action, params=params,
+                                         data=data, headers=headers,
+                                         method=method,
+                                         context=context)
+        response = request(**kwargs)
+        kwargs = self.get_poll_request_kwargs(response=response,
+                                              context=context)
+
+        end = time.time() + self.timeout
+        completed = False
+        while time.time() < end and not completed:
+            response = request(**kwargs)
+            completed = self.has_completed(response=response)
+
+        if not completed:
+            raise LibcloudError('Job did not complete in %s seconds' %
+                                (self.timeout))
+
+        return response
+
+    def get_request_kwargs(self, action, params=None, data='', headers=None,
+                           method='GET', context=None):
+        """
+        Arguments which are passed to the initial request() call inside
+        async_request.
+        """
+        kwargs = {'action': action, 'params': params, 'data': data,
+                  'headers': headers, 'method': method}
+        return kwargs
+
+    def get_poll_request_kwargs(self, response, context):
+        """
+        Return keyword arguments which are passed to the request() method when
+        polling for the job status.
+
+        @param response: Response object returned by poll request.
+        @type response: C{HTTPResponse}
+
+        @return C{dict} Keyword arguments
+        """
+        raise NotImplementedError('get_poll_request_kwargs not implemented')
+
+    def has_completed(self, response):
+        """
+        Return job completion status.
+
+        @param response: Response object returned by poll request.
+        @type response: C{HTTPResponse}
+
+        @return C{bool} True if the job has completed, False otherwise.
+        """
+        raise NotImplementedError('has_completed not implemented')
+
+
 class ConnectionKey(Connection):
     """
     A Base Connection class to derive from, which includes a

Modified: libcloud/trunk/libcloud/common/cloudstack.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/common/cloudstack.py?rev=1195065&r1=1195064&r2=1195065&view=diff
==============================================================================
--- libcloud/trunk/libcloud/common/cloudstack.py (original)
+++ libcloud/trunk/libcloud/common/cloudstack.py Sat Oct 29 22:52:07 2011
@@ -19,13 +19,16 @@ import hmac
 import time
 import urllib
 
-from libcloud.common.base import ConnectionUserAndKey, JsonResponse
+from libcloud.common.base import ConnectionUserAndKey, PollingConnection
+from libcloud.common.base import JsonResponse
 from libcloud.common.types import MalformedResponseError
 
 class CloudStackResponse(JsonResponse): pass
 
-class CloudStackConnection(ConnectionUserAndKey):
+class CloudStackConnection(ConnectionUserAndKey, PollingConnection):
     responseCls = CloudStackResponse
+    poll_interval = 1
+    request_method = '_sync_request'
 
     ASYNC_PENDING = 0
     ASYNC_SUCCESS = 1
@@ -50,6 +53,33 @@ class CloudStackConnection(ConnectionUse
 
         return params, headers
 
+    def _async_request(self, command, **kwargs):
+        context = {'command': command, 'kwargs': kwargs}
+        result = super(CloudStackConnection, self).async_request(action=None,
+                                                               params=None,
+                                                               data=None,
+                                                               headers=None,
+                                                               method=None,
+                                                               context=context)
+        return result['jobresult']
+
+    def get_request_kwargs(self, action, params=None, data='', headers=None,
+                           method='GET', context=None):
+        return context
+
+    def get_poll_request_kwargs(self, response, context):
+        job_id = response['jobid']
+        kwargs = {'command': 'queryAsyncJobResult', 'jobid': job_id}
+        return kwargs
+
+    def has_completed(self, response):
+        status = response.get('jobstatus', self.ASYNC_PENDING)
+
+        if status == self.ASYNC_FAILURE:
+            raise Exception(status)
+
+        return status == self.ASYNC_SUCCESS
+
     def _sync_request(self, command, **kwargs):
         """This method handles synchronous calls which are generally fast
            information retrieval requests and thus return 'quickly'."""
@@ -65,34 +95,9 @@ class CloudStackConnection(ConnectionUse
         result = result.object[command]
         return result
 
-    def _async_request(self, command, **kwargs):
-        """This method handles asynchronous calls which are generally
-           requests for the system to do something and can thus take time.
-
-           In these cases the initial call will either fail fast and return
-           an error, or it can return a job ID.  We then poll for the status
-           of the job ID which can either be pending, successful or failed."""
-
-        result = self._sync_request(command, **kwargs)
-        job_id = result['jobid']
-        success = True
-
-        while True:
-            result = self._sync_request('queryAsyncJobResult', jobid=job_id)
-            status = result.get('jobstatus', self.ASYNC_PENDING)
-            if status != self.ASYNC_PENDING:
-                break
-            time.sleep(self.driver.async_poll_frequency)
-
-        if result['jobstatus'] == self.ASYNC_FAILURE:
-            raise Exception(result)
-
-        return result['jobresult']
-
 class CloudStackDriverMixIn(object):
     host = None
     path = None
-    async_poll_frequency = 1
 
     connectionCls = CloudStackConnection