You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@libcloud.apache.org by to...@apache.org on 2011/10/30 00:52:07 UTC
svn commit: r1195065 - in /libcloud/trunk/libcloud/common: base.py
cloudstack.py
Author: tomaz
Date: Sat Oct 29 22:52:07 2011
New Revision: 1195065
URL: http://svn.apache.org/viewvc?rev=1195065&view=rev
Log:
Add a new "PollingConnection" class which can work with "async" APIs and use it
in the CloudStackConnection class.
Modified:
libcloud/trunk/libcloud/common/base.py
libcloud/trunk/libcloud/common/cloudstack.py
Modified: libcloud/trunk/libcloud/common/base.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/common/base.py?rev=1195065&r1=1195064&r2=1195065&view=diff
==============================================================================
--- libcloud/trunk/libcloud/common/base.py (original)
+++ libcloud/trunk/libcloud/common/base.py Sat Oct 29 22:52:07 2011
@@ -17,6 +17,7 @@ import httplib
import urllib
import StringIO
import ssl
+import time
from xml.etree import ElementTree as ET
from pipes import quote as pquote
@@ -274,6 +275,7 @@ class LoggingHTTPConnection(LoggingConne
return LibcloudHTTPConnection.request(self, method, url,
body, headers)
+
class Connection(object):
"""
A Base Connection class to derive from.
@@ -534,7 +536,116 @@ class Connection(object):
Override in a provider's subclass.
"""
return data
-
+
+class PollingConnection(Connection):
+ """
+ Connection class which can also work with the async APIs.
+
+ After initial requests, this class periodically polls for jobs status and
+ waits until the job has finished.
+ If job doesn't finish in timeout seconds, an Exception thrown.
+ """
+ poll_interval = 0.5
+ timeout = 10
+ request_method = 'request'
+
+ def async_request(self, action, params=None, data='', headers=None,
+ method='GET', context=None):
+ """
+ Perform an 'async' request to the specified path. Keep in mind that
+ this function is *blocking* and 'async' in this case means that the
+ hit URL only returns a job ID which is the periodically polled until
+ the job has completed.
+
+ This function works like this:
+
+ - Perform a request to the specified path. Response should contain a
+ 'job_id'.
+
+ - Returned 'job_id' is then used to construct a URL which is used for
+ retrieving job status. Constructed URL is then periodically polled
+ until the response indicates that the job has completed or the timeout
+ of 'self.timeout' seconds has been reached.
+
+ @type action: C{str}
+ @param action: A path
+
+ @type params: C{dict}
+ @param params: Optional mapping of additional parameters to send. If
+ None, leave as an empty C{dict}.
+
+ @type data: C{unicode}
+ @param data: A body of data to send with the request.
+
+ @type headers: C{dict}
+ @param headers: Extra headers to add to the request
+ None, leave as an empty C{dict}.
+
+ @type method: C{str}
+ @param method: An HTTP method such as "GET" or "POST".
+
+ @type context: C{dict}
+ @param context: Context dictionary which is passed to the functions
+ which construct initial and poll URL.
+
+ @return: An instance of type I{responseCls}
+ """
+
+ request = getattr(self, self.request_method)
+ kwargs = self.get_request_kwargs(action=action, params=params,
+ data=data, headers=headers,
+ method=method,
+ context=context)
+ response = request(**kwargs)
+ kwargs = self.get_poll_request_kwargs(response=response,
+ context=context)
+
+ end = time.time() + self.timeout
+ completed = False
+ while time.time() < end and not completed:
+ response = request(**kwargs)
+ completed = self.has_completed(response=response)
+
+ if not completed:
+ raise LibcloudError('Job did not complete in %s seconds' %
+ (self.timeout))
+
+ return response
+
+ def get_request_kwargs(self, action, params=None, data='', headers=None,
+ method='GET', context=None):
+ """
+ Arguments which are passed to the initial request() call inside
+ async_request.
+ """
+ kwargs = {'action': action, 'params': params, 'data': data,
+ 'headers': headers, 'method': method}
+ return kwargs
+
+ def get_poll_request_kwargs(self, response, context):
+ """
+ Return keyword arguments which are passed to the request() method when
+ polling for the job status.
+
+ @param response: Response object returned by poll request.
+ @type response: C{HTTPResponse}
+
+ @return C{dict} Keyword arguments
+ """
+ raise NotImplementedError('get_poll_request_kwargs not implemented')
+
+ def has_completed(self, response):
+ """
+ Return job completion status.
+
+ @param response: Response object returned by poll request.
+ @type response: C{HTTPResponse}
+
+ @return C{bool} True if the job has completed, False otherwise.
+ """
+ raise NotImplementedError('has_completed not implemented')
+
+
class ConnectionKey(Connection):
"""
A Base Connection class to derive from, which includes a
Modified: libcloud/trunk/libcloud/common/cloudstack.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/common/cloudstack.py?rev=1195065&r1=1195064&r2=1195065&view=diff
==============================================================================
--- libcloud/trunk/libcloud/common/cloudstack.py (original)
+++ libcloud/trunk/libcloud/common/cloudstack.py Sat Oct 29 22:52:07 2011
@@ -19,13 +19,16 @@ import hmac
import time
import urllib
-from libcloud.common.base import ConnectionUserAndKey, JsonResponse
+from libcloud.common.base import ConnectionUserAndKey, PollingConnection
+from libcloud.common.base import JsonResponse
from libcloud.common.types import MalformedResponseError
class CloudStackResponse(JsonResponse): pass
-class CloudStackConnection(ConnectionUserAndKey):
+class CloudStackConnection(ConnectionUserAndKey, PollingConnection):
responseCls = CloudStackResponse
+ poll_interval = 1
+ request_method = '_sync_request'
ASYNC_PENDING = 0
ASYNC_SUCCESS = 1
@@ -50,6 +53,33 @@ class CloudStackConnection(ConnectionUse
return params, headers
+ def _async_request(self, command, **kwargs):
+ context = {'command': command, 'kwargs': kwargs}
+ result = super(CloudStackConnection, self).async_request(action=None,
+ params=None,
+ data=None,
+ headers=None,
+ method=None,
+ context=context)
+ return result['jobresult']
+
+ def get_request_kwargs(self, action, params=None, data='', headers=None,
+ method='GET', context=None):
+ return context
+
+ def get_poll_request_kwargs(self, response, context):
+ job_id = response['jobid']
+ kwargs = {'command': 'queryAsyncJobResult', 'jobid': job_id}
+ return kwargs
+
+ def has_completed(self, response):
+ status = response.get('jobstatus', self.ASYNC_PENDING)
+
+ if status == self.ASYNC_FAILURE:
+ raise Exception(status)
+
+ return status == self.ASYNC_SUCCESS
+
def _sync_request(self, command, **kwargs):
"""This method handles synchronous calls which are generally fast
information retrieval requests and thus return 'quickly'."""
@@ -65,34 +95,9 @@ class CloudStackConnection(ConnectionUse
result = result.object[command]
return result
- def _async_request(self, command, **kwargs):
- """This method handles asynchronous calls which are generally
- requests for the system to do something and can thus take time.
-
- In these cases the initial call will either fail fast and return
- an error, or it can return a job ID. We then poll for the status
- of the job ID which can either be pending, successful or failed."""
-
- result = self._sync_request(command, **kwargs)
- job_id = result['jobid']
- success = True
-
- while True:
- result = self._sync_request('queryAsyncJobResult', jobid=job_id)
- status = result.get('jobstatus', self.ASYNC_PENDING)
- if status != self.ASYNC_PENDING:
- break
- time.sleep(self.driver.async_poll_frequency)
-
- if result['jobstatus'] == self.ASYNC_FAILURE:
- raise Exception(result)
-
- return result['jobresult']
-
class CloudStackDriverMixIn(object):
host = None
path = None
- async_poll_frequency = 1
connectionCls = CloudStackConnection