You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@libcloud.apache.org by to...@apache.org on 2012/12/25 23:46:50 UTC

svn commit: r1425779 - in /libcloud/trunk: ./ libcloud/common/ libcloud/storage/drivers/ libcloud/test/ libcloud/test/storage/ libcloud/test/storage/fixtures/s3/ libcloud/utils/

Author: tomaz
Date: Tue Dec 25 22:46:49 2012
New Revision: 1425779

URL: http://svn.apache.org/viewvc?rev=1425779&view=rev
Log:
Support for multipart uploads and other improvemetns in the S3 driver
so it can more easily be re-used with other implementations (e.g. Google
Storage, etc.).

Also default to a multipart upload when using upload_object_via_stream.
This methods is more efficient compared to old approach because it only
requires buffering a single multipart chunk (5 MB) in memory.

Contributed by Mahendra M, part of LIBCLOUD-269.

Added:
    libcloud/trunk/libcloud/test/storage/fixtures/s3/complete_multipart.xml
    libcloud/trunk/libcloud/test/storage/fixtures/s3/initiate_multipart.xml
    libcloud/trunk/libcloud/test/storage/fixtures/s3/list_multipart_1.xml
    libcloud/trunk/libcloud/test/storage/fixtures/s3/list_multipart_2.xml
Modified:
    libcloud/trunk/CHANGES
    libcloud/trunk/libcloud/common/base.py
    libcloud/trunk/libcloud/storage/drivers/google_storage.py
    libcloud/trunk/libcloud/storage/drivers/s3.py
    libcloud/trunk/libcloud/test/__init__.py
    libcloud/trunk/libcloud/test/storage/test_atmos.py
    libcloud/trunk/libcloud/test/storage/test_cloudfiles.py
    libcloud/trunk/libcloud/test/storage/test_s3.py
    libcloud/trunk/libcloud/utils/py3.py

Modified: libcloud/trunk/CHANGES
URL: http://svn.apache.org/viewvc/libcloud/trunk/CHANGES?rev=1425779&r1=1425778&r2=1425779&view=diff
==============================================================================
--- libcloud/trunk/CHANGES (original)
+++ libcloud/trunk/CHANGES Tue Dec 25 22:46:49 2012
@@ -95,6 +95,16 @@ Changes with Apache Libcloud in developm
       future.
       [Tomaz Muraus]
 
+    - Support for multipart uploads and other improvemetns in the S3 driver
+      so it can more easily be re-used with other implementations (e.g. Google
+      Storage, etc.).
+
+      Also default to a multipart upload when using upload_object_via_stream.
+      This methods is more efficient compared to old approach because it only
+      requires buffering a single multipart chunk (5 MB) in memory.
+      (LIBCLOUD-269)
+      [Mahendra M]
+
   *) DNS
 
     - Update 'if type' checks in the update_record methods to behave correctly

Modified: libcloud/trunk/libcloud/common/base.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/common/base.py?rev=1425779&r1=1425778&r2=1425779&view=diff
==============================================================================
--- libcloud/trunk/libcloud/common/base.py (original)
+++ libcloud/trunk/libcloud/common/base.py Tue Dec 25 22:46:49 2012
@@ -516,7 +516,8 @@ class Connection(object):
         object's `request` that does some helpful pre-processing.
 
         @type action: C{str}
-        @param action: A path
+        @param action: A path. This can include arguments. If included,
+            any extra parameters are appended to the existing ones.
 
         @type params: C{dict}
         @param params: Optional mapping of additional parameters to send. If
@@ -574,7 +575,10 @@ class Connection(object):
         params, headers = self.pre_connect_hook(params, headers)
 
         if params:
-            url = '?'.join((action, urlencode(params)))
+            if '?' in action:
+                url = '&'.join((action, urlencode(params)))
+            else:
+                url = '?'.join((action, urlencode(params)))
         else:
             url = action
 

Modified: libcloud/trunk/libcloud/storage/drivers/google_storage.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/storage/drivers/google_storage.py?rev=1425779&r1=1425778&r2=1425779&view=diff
==============================================================================
--- libcloud/trunk/libcloud/storage/drivers/google_storage.py (original)
+++ libcloud/trunk/libcloud/storage/drivers/google_storage.py Tue Dec 25 22:46:49 2012
@@ -133,3 +133,4 @@ class GoogleStorageDriver(S3StorageDrive
     hash_type = 'md5'
     namespace = NAMESPACE
     supports_chunked_encoding = False
+    supports_s3_multipart_upload = False

Modified: libcloud/trunk/libcloud/storage/drivers/s3.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/storage/drivers/s3.py?rev=1425779&r1=1425778&r2=1425779&view=diff
==============================================================================
--- libcloud/trunk/libcloud/storage/drivers/s3.py (original)
+++ libcloud/trunk/libcloud/storage/drivers/s3.py Tue Dec 25 22:46:49 2012
@@ -17,14 +17,17 @@ import time
 import copy
 import base64
 import hmac
+import sys
 
 from hashlib import sha1
-from xml.etree.ElementTree import Element, SubElement, tostring
+from xml.etree.ElementTree import Element, SubElement
 
 from libcloud.utils.py3 import PY3
 from libcloud.utils.py3 import httplib
 from libcloud.utils.py3 import urlquote
+from libcloud.utils.py3 import urlencode
 from libcloud.utils.py3 import b
+from libcloud.utils.py3 import tostring
 
 from libcloud.utils.xml import fixxpath, findtext
 from libcloud.utils.files import read_in_chunks
@@ -39,6 +42,7 @@ from libcloud.storage.types import Conta
 from libcloud.storage.types import ObjectDoesNotExistError
 from libcloud.storage.types import ObjectHashMismatchError
 
+
 # How long before the token expires
 EXPIRATION_SECONDS = 15 * 60
 
@@ -52,6 +56,12 @@ S3_AP_NORTHEAST_HOST = 's3-ap-northeast-
 API_VERSION = '2006-03-01'
 NAMESPACE = 'http://s3.amazonaws.com/doc/%s/' % (API_VERSION)
 
+# AWS multi-part chunks must be minimum 5MB
+CHUNK_SIZE = 5 * 1024 * 1024
+
+# Desired number of responses in each request
+RESPONSES_PER_REQUEST = 100
+
 
 class S3Response(AWSBaseResponse):
 
@@ -162,12 +172,47 @@ class S3Connection(ConnectionUserAndKey)
         return b64_hmac.decode('utf-8')
 
 
+class S3MultipartUpload(object):
+    """
+    Class representing an amazon s3 multipart upload
+    """
+
+    def __init__(self, key, id, created_at, initiator, owner):
+        """
+        Class representing an amazon s3 multipart upload
+
+        @param key: The object/key that was being uploaded
+        @type key: C{str}
+
+        @param id: The upload id assigned by amazon
+        @type id: C{str}
+
+        @param created_at: The date/time at which the upload was started
+        @type created_at: C{str}
+
+        @param initiator: The AWS owner/IAM user who initiated this
+        @type initiator: C{str}
+
+        @param owner: The AWS owner/IAM who will own this object
+        @type owner: C{str}
+        """
+        self.key = key
+        self.id = id
+        self.created_at = created_at
+        self.initiator = initiator
+        self.owner = owner
+
+    def __repr__(self):
+        return ('<S3MultipartUpload: key=%s>' % (self.key))
+
+
 class S3StorageDriver(StorageDriver):
     name = 'Amazon S3 (standard)'
     website = 'http://aws.amazon.com/s3/'
     connectionCls = S3Connection
     hash_type = 'md5'
     supports_chunked_encoding = False
+    supports_s3_multipart_upload = True
     ex_location_name = ''
     namespace = NAMESPACE
 
@@ -185,12 +230,13 @@ class S3StorageDriver(StorageDriver):
         params = {}
         last_key = None
         exhausted = False
+        container_path = self._get_container_path(container)
 
         while not exhausted:
             if last_key:
                 params['marker'] = last_key
 
-            response = self.connection.request('/%s' % (container.name),
+            response = self.connection.request(container_path,
                                                params=params)
 
             if response.status != httplib.OK:
@@ -222,9 +268,9 @@ class S3StorageDriver(StorageDriver):
 
     def get_object(self, container_name, object_name):
         container = self.get_container(container_name=container_name)
-        response = self.connection.request('/%s/%s' % (container_name,
-                                                       object_name),
-                                           method='HEAD')
+        object_path = self._get_object_path(container, object_name)
+        response = self.connection.request(object_path, method='HEAD')
+
         if response.status == httplib.OK:
             obj = self._headers_to_object(object_name=object_name,
                                           container=container,
@@ -234,17 +280,43 @@ class S3StorageDriver(StorageDriver):
         raise ObjectDoesNotExistError(value=None, driver=self,
                                       object_name=object_name)
 
+    def _get_container_path(self, container):
+        """
+        Return a container path
+
+        @param container: Container instance
+        @type  container: L{Container}
+
+        @return: A path for this container.
+        @rtype: C{str}
+        """
+        return '/%s' % (container.name)
+
+    def _get_object_path(self, container, object_name):
+        """
+        Return an object's CDN path.
+
+        @param container: Container instance
+        @type  container: L{Container}
+
+        @param object_name: Object name
+        @type  object_name: L{str}
+
+        @return: A  path for this object.
+        @rtype: C{str}
+        """
+        container_url = self._get_container_path(container)
+        object_name_cleaned = self._clean_object_name(object_name)
+        object_path = '%s/%s' % (container_url, object_name_cleaned)
+        return object_path
+
     def create_container(self, container_name):
         if self.ex_location_name:
             root = Element('CreateBucketConfiguration')
             child = SubElement(root, 'LocationConstraint')
             child.text = self.ex_location_name
 
-            if PY3:
-                encoding = 'unicode'
-            else:
-                encoding = None
-            data = tostring(root, encoding=encoding)
+            data = tostring(root)
         else:
             data = ''
 
@@ -288,13 +360,9 @@ class S3StorageDriver(StorageDriver):
 
     def download_object(self, obj, destination_path, overwrite_existing=False,
                         delete_on_failure=True):
-        container_name = self._clean_object_name(obj.container.name)
-        object_name = self._clean_object_name(obj.name)
+        obj_path = self._get_object_path(obj.container, obj.name)
 
-        response = self.connection.request('/%s/%s' % (container_name,
-                                                       object_name),
-                                           method='GET',
-                                           raw=True)
+        response = self.connection.request(obj_path, method='GET', raw=True)
 
         return self._get_object(obj=obj, callback=self._save_object,
                                 response=response,
@@ -307,11 +375,8 @@ class S3StorageDriver(StorageDriver):
                                 success_status_code=httplib.OK)
 
     def download_object_as_stream(self, obj, chunk_size=None):
-        container_name = self._clean_object_name(obj.container.name)
-        object_name = self._clean_object_name(obj.name)
-        response = self.connection.request('/%s/%s' % (container_name,
-                                                       object_name),
-                                           method='GET', raw=True)
+        obj_path = self._get_object_path(obj.container, obj.name)
+        response = self.connection.request(obj_path, method='GET', raw=True)
 
         return self._get_object(obj=obj, callback=read_in_chunks,
                                 response=response,
@@ -337,6 +402,187 @@ class S3StorageDriver(StorageDriver):
                                 verify_hash=verify_hash,
                                 storage_class=ex_storage_class)
 
+    def _upload_multipart(self, response, data, iterator, container,
+                          object_name, calculate_hash=True):
+        """
+        Callback invoked for uploading data to S3 using Amazon's
+        multipart upload mechanism
+
+        @param response: Response object from the initial POST request
+        @type response: L{S3RawResponse}
+
+        @param data: Any data from the initial POST request
+        @type data: C{str}
+
+        @param iterator: The generator for fetching the upload data
+        @type iterator: C{generator}
+
+        @param container: The container owning the object to which data is
+            being uploaded
+        @type container: L{Container}
+
+        @param object_name: The name of the object to which we are uploading
+        @type object_name: C{str}
+
+        @keyword calculate_hash: Indicates if we must calculate the data hash
+        @type calculate_hash: C{bool}
+
+        @return: A tuple of (status, checksum, bytes transferred)
+        @rtype: C{tuple}
+        """
+
+        object_path = self._get_object_path(container, object_name)
+
+        # Get the upload id from the response xml
+        response.body = response.response.read()
+        body = response.parse_body()
+        upload_id = body.find(fixxpath(xpath='UploadId',
+                                       namespace=self.namespace)).text
+
+        try:
+            # Upload the data through the iterator
+            result = self._upload_from_iterator(iterator, object_path,
+                                                upload_id, calculate_hash)
+            (chunks, data_hash, bytes_transferred) = result
+
+            # Commit the chunk info and complete the upload
+            etag = self._commit_multipart(object_path, upload_id, chunks)
+        except Exception:
+            exc = sys.exc_info()[1]
+            # Amazon provides a mechanism for aborting an upload.
+            self._abort_multipart(object_path, upload_id)
+            raise exc
+
+        # Modify the response header of the first request. This is used
+        # by other functions once the callback is done
+        response.headers['etag'] = etag
+
+        return (True, data_hash, bytes_transferred)
+
+    def _upload_from_iterator(self, iterator, object_path, upload_id,
+                              calculate_hash=True):
+        """
+        Uploads data from an interator in fixed sized chunks to S3
+
+        @param iterator: The generator for fetching the upload data
+        @type iterator: C{generator}
+
+        @param object_path: The path of the object to which we are uploading
+        @type object_name: C{str}
+
+        @param upload_id: The upload id allocated for this multipart upload
+        @type upload_id: C{str}
+
+        @keyword calculate_hash: Indicates if we must calculate the data hash
+        @type calculate_hash: C{bool}
+
+        @return: A tuple of (chunk info, checksum, bytes transferred)
+        @rtype: C{tuple}
+        """
+
+        data_hash = None
+        if calculate_hash:
+            data_hash = self._get_hash_function()
+
+        bytes_transferred = 0
+        count = 1
+        chunks = []
+        params = {'uploadId': upload_id}
+
+        # Read the input data in chunk sizes suitable for AWS
+        for data in read_in_chunks(iterator, chunk_size=CHUNK_SIZE):
+            bytes_transferred += len(data)
+
+            if calculate_hash:
+                data_hash.update(data)
+
+            chunk_hash = self._get_hash_function()
+            chunk_hash.update(data)
+            chunk_hash = base64.b64encode(chunk_hash.digest())
+
+            # This provides an extra level of data check and is recommended
+            # by amazon
+            headers = {'Content-MD5': chunk_hash}
+            params['partNumber'] = count
+
+            request_path = '?'.join((object_path, urlencode(params)))
+
+            resp = self.connection.request(request_path, method='PUT',
+                                           data=data, headers=headers)
+
+            if resp.status != httplib.OK:
+                raise LibcloudError('Error uploading chunk', driver=self)
+
+            server_hash = resp.headers['etag']
+
+            # Keep this data for a later commit
+            chunks.append((count, server_hash))
+            count += 1
+
+        if calculate_hash:
+            data_hash = data_hash.hexdigest()
+
+        return (chunks, data_hash, bytes_transferred)
+
+    def _commit_multipart(self, object_path, upload_id, chunks):
+        """
+        Makes a final commit of the data.
+
+        @param object_path: Server side object path.
+        @type object_path: C{str}
+
+        @param upload_id: ID of the multipart upload.
+        @type upload_id: C{str}
+
+        @param upload_id: A list of (chunk_number, chunk_hash) tuples.
+        @type upload_id: C{list}
+        """
+
+        root = Element('CompleteMultipartUpload')
+
+        for (count, etag) in chunks:
+            part = SubElement(root, 'Part')
+            part_no = SubElement(part, 'PartNumber')
+            part_no.text = str(count)
+
+            etag_id = SubElement(part, 'ETag')
+            etag_id.text = str(etag)
+
+        data = tostring(root)
+
+        params = {'uploadId': upload_id}
+        request_path = '?'.join((object_path, urlencode(params)))
+        response = self.connection.request(request_path, data=data,
+                                           method='POST')
+
+        if response.status != httplib.OK:
+            raise LibcloudError('Error in multipart commit', driver=self)
+
+        # Get the server's etag to be passed back to the caller
+        body = response.parse_body()
+        server_hash = body.find(fixxpath(xpath='ETag',
+                                         namespace=self.namespace)).text
+        return server_hash
+
+    def _abort_multipart(self, object_path, upload_id):
+        """
+        Aborts an already initiated multipart upload
+
+        @param object_path: Server side object path.
+        @type object_path: C{str}
+
+        @param upload_id: ID of the multipart upload.
+        @type upload_id: C{str}
+        """
+
+        params = {'uploadId': upload_id}
+        request_path = '?'.join((object_path, urlencode(params)))
+        resp = self.connection.request(request_path, method='DELETE')
+
+        if resp.status != httplib.NO_CONTENT:
+            raise LibcloudError('Error in multipart abort. status_code=%d' %
+                                (resp.status), driver=self)
+
     def upload_object_via_stream(self, iterator, container, object_name,
                                  extra=None, ex_storage_class=None):
         """
@@ -345,23 +591,42 @@ class S3StorageDriver(StorageDriver):
         @param ex_storage_class: Storage class
         @type ex_storage_class: C{str}
         """
-        #Amazon S3 does not support chunked transfer encoding so the whole data
-        #is read into memory before uploading the object.
-        upload_func = self._upload_data
-        upload_func_kwargs = {}
+
+        method = 'PUT'
+        params = None
+
+        # This driver is used by other S3 API compatible drivers also.
+        # Amazon provides a different (complex?) mechanism to do multipart
+        # uploads
+        if self.supports_s3_multipart_upload:
+            # Initiate the multipart request and get an upload id
+            upload_func = self._upload_multipart
+            upload_func_kwargs = {'iterator': iterator,
+                                  'container': container,
+                                  'object_name': object_name}
+            method = 'POST'
+            iterator = iter('')
+            params = 'uploads'
+
+        elif self.supports_chunked_encoding:
+            upload_func = self._stream_data
+            upload_func_kwargs = {'iterator': iterator}
+        else:
+            # In this case, we have to load the entire object to
+            # memory and send it as normal data
+            upload_func = self._upload_data
+            upload_func_kwargs = {}
 
         return self._put_object(container=container, object_name=object_name,
                                 upload_func=upload_func,
                                 upload_func_kwargs=upload_func_kwargs,
-                                extra=extra, iterator=iterator,
-                                verify_hash=False,
+                                extra=extra, method=method, query_args=params,
+                                iterator=iterator, verify_hash=False,
                                 storage_class=ex_storage_class)
 
     def delete_object(self, obj):
-        object_name = self._clean_object_name(name=obj.name)
-        response = self.connection.request('/%s/%s' % (obj.container.name,
-                                                       object_name),
-                                           method='DELETE')
+        object_path = self._get_object_path(obj.container, obj.name)
+        response = self.connection.request(object_path, method='DELETE')
         if response.status == httplib.NO_CONTENT:
             return True
         elif response.status == httplib.NOT_FOUND:
@@ -370,13 +635,108 @@ class S3StorageDriver(StorageDriver):
 
         return False
 
+    def ex_iterate_multipart_uploads(self, container, prefix=None,
+                                     delimiter=None):
+        """
+        Extension method for listing all S3 multipart uploads.
+
+        @param container: The container holding the uploads
+        @type container: L{Container}
+
+        @keyword prefix: Print only uploads of objects with this prefix
+        @type prefix: C{str}
+
+        @keyword delimiter: The object/key names are grouped based on
+            being split by this delimiter
+        @type delimiter: C{str}
+
+        @return: A generator of S3MultipartUpload instances.
+        @rtype: C{generator} of L{S3MultipartUpload}
+        """
+
+        if not self.supports_s3_multipart_upload:
+            raise LibcloudError('Feature not supported', driver=self)
+
+        # Get the data for a specific container
+        request_path = '%s/?uploads' % (self._get_container_path(container))
+        params = {'max-uploads': RESPONSES_PER_REQUEST}
+
+        if prefix:
+            params['prefix'] = prefix
+
+        if delimiter:
+            params['delimiter'] = delimiter
+
+        finder = lambda node, text: node.findtext(fixxpath(xpath=text,
+                                                  namespace=self.namespace))
+
+        while True:
+            response = self.connection.request(request_path, params=params)
+
+            if response.status != httplib.OK:
+                raise LibcloudError('Error fetching multipart uploads. '
+                                    'Got code: %s' % (response.status),
+                                    driver=self)
+
+            body = response.parse_body()
+            for node in body.findall(fixxpath(xpath='Upload',
+                                              namespace=self.namespace)):
+
+                initiator = node.find(fixxpath(xpath='Initiator',
+                                               namespace=self.namespace))
+                owner = node.find(fixxpath(xpath='Owner',
+                                           namespace=self.namespace))
+
+                key = finder(node, 'Key')
+                upload_id = finder(node, 'UploadId')
+                created_at = finder(node, 'Initiated')
+                initiator = finder(initiator, 'DisplayName')
+                owner = finder(owner, 'DisplayName')
+
+                yield S3MultipartUpload(key, upload_id, created_at,
+                                        initiator, owner)
+
+            # Check if this is the last entry in the listing
+            is_truncated = body.findtext(fixxpath(xpath='IsTruncated',
+                                                  namespace=self.namespace))
+
+            if is_truncated.lower() == 'false':
+                break
+
+            # Provide params for the next request
+            upload_marker = body.findtext(fixxpath(xpath='NextUploadIdMarker',
+                                                   namespace=self.namespace))
+            key_marker = body.findtext(fixxpath(xpath='NextKeyMarker',
+                                                namespace=self.namespace))
+
+            params['key-marker'] = key_marker
+            params['upload-id-marker'] = upload_marker
+
+    def ex_cleanup_all_multipart_uploads(self, container, prefix=None):
+        """
+        Extension method for removing S3 multipart uploads.
+
+        @param container: The container holding the uploads
+        @type container: L{Container}
+
+        @keyword prefix: Delete only uploads of objects with this prefix
+        @type prefix: C{str}
+        """
+
+        # Iterate through the container and delete the upload ids
+        for upload in self.ex_iterate_multipart_uploads(container, prefix,
+                                                        delimiter=None):
+            object_path = '/%s/%s' % (container.name, upload.key)
+            self._abort_multipart(object_path, upload.id)
+
     def _clean_object_name(self, name):
         name = urlquote(name)
         return name
 
     def _put_object(self, container, object_name, upload_func,
-                    upload_func_kwargs, extra=None, file_path=None,
-                    iterator=None, verify_hash=True, storage_class=None):
+                    upload_func_kwargs, method='PUT', query_args=None,
+                    extra=None, file_path=None, iterator=None,
+                    verify_hash=True, storage_class=None):
         headers = {}
         extra = extra or {}
         storage_class = storage_class or 'standard'
@@ -386,8 +746,6 @@ class S3StorageDriver(StorageDriver):
 
         headers['x-amz-storage-class'] = storage_class.upper()
 
-        container_name_cleaned = container.name
-        object_name_cleaned = self._clean_object_name(object_name)
         content_type = extra.get('content_type', None)
         meta_data = extra.get('meta_data', None)
 
@@ -396,7 +754,11 @@ class S3StorageDriver(StorageDriver):
                 key = 'x-amz-meta-%s' % (key)
                 headers[key] = value
 
-        request_path = '/%s/%s' % (container_name_cleaned, object_name_cleaned)
+        request_path = self._get_object_path(container, object_name)
+
+        if query_args:
+            request_path = '?'.join((request_path, query_args))
+
         # TODO: Let the underlying exceptions bubble up and capture the SIGPIPE
         # here.
         #SIGPIPE is thrown if the provided container does not exist or the user
@@ -404,7 +766,7 @@ class S3StorageDriver(StorageDriver):
         result_dict = self._upload_object(
             object_name=object_name, content_type=content_type,
             upload_func=upload_func, upload_func_kwargs=upload_func_kwargs,
-            request_path=request_path, request_method='PUT',
+            request_path=request_path, request_method=method,
             headers=headers, file_path=file_path, iterator=iterator)
 
         response = result_dict['response']
@@ -454,7 +816,8 @@ class S3StorageDriver(StorageDriver):
 
     def _headers_to_object(self, object_name, container, headers):
         hash = headers['etag'].replace('"', '')
-        extra = {'content_type': headers['content-type'], 'etag': headers['etag']}
+        extra = {'content_type': headers['content-type'],
+                 'etag': headers['etag']}
         meta_data = {}
 
         if 'last-modified' in headers:

Modified: libcloud/trunk/libcloud/test/__init__.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/test/__init__.py?rev=1425779&r1=1425778&r2=1425779&view=diff
==============================================================================
--- libcloud/trunk/libcloud/test/__init__.py (original)
+++ libcloud/trunk/libcloud/test/__init__.py Tue Dec 25 22:46:49 2012
@@ -85,6 +85,12 @@ class MockResponse(object):
     def read(self, *args, **kwargs):
         return self.body.read(*args, **kwargs)
 
+    def next(self):
+        return next(self.body)
+
+    def __next__(self):
+        return self.next()
+
     def getheader(self, name, *args, **kwargs):
         return self.headers.get(name, *args, **kwargs)
 
@@ -96,6 +102,7 @@ class MockResponse(object):
 
 class BaseMockHttpObject(object):
     def _get_method_name(self, type, use_param, qs, path):
+        path = path.split('?')[0]
         meth_name = path.replace('/', '_').replace('.', '_').replace('-', '_')
         if type:
             meth_name = '%s_%s' % (meth_name, self.type)
@@ -247,12 +254,12 @@ class MockRawResponse(BaseMockHttpObject
         return self.next()
 
     def _generate_random_data(self, size):
-        data = []
+        data = ''
         current_size = 0
         while current_size < size:
             value = str(random.randint(0, 9))
             value_size = len(value)
-            data.append(value)
+            data += value
             current_size += value_size
 
         return data
@@ -286,7 +293,6 @@ class MockRawResponse(BaseMockHttpObject
             self._status, self._body, self._headers, self._reason = result
             self._response = self.responseCls(self._status, self._body,
                                               self._headers, self._reason)
-            return self
         return self._response
 
 if __name__ == "__main__":

Added: libcloud/trunk/libcloud/test/storage/fixtures/s3/complete_multipart.xml
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/test/storage/fixtures/s3/complete_multipart.xml?rev=1425779&view=auto
==============================================================================
--- libcloud/trunk/libcloud/test/storage/fixtures/s3/complete_multipart.xml (added)
+++ libcloud/trunk/libcloud/test/storage/fixtures/s3/complete_multipart.xml Tue Dec 25 22:46:49 2012
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+  <Location>http://Example-Bucket.s3.amazonaws.com/Example-Object</Location>
+  <Bucket>Example-Bucket</Bucket>
+  <Key>Example-Object</Key>
+  <ETag>"3858f62230ac3c915f300c664312c11f-9"</ETag>
+</CompleteMultipartUploadResult>

Added: libcloud/trunk/libcloud/test/storage/fixtures/s3/initiate_multipart.xml
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/test/storage/fixtures/s3/initiate_multipart.xml?rev=1425779&view=auto
==============================================================================
--- libcloud/trunk/libcloud/test/storage/fixtures/s3/initiate_multipart.xml (added)
+++ libcloud/trunk/libcloud/test/storage/fixtures/s3/initiate_multipart.xml Tue Dec 25 22:46:49 2012
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+  <Bucket>example-bucket</Bucket>
+  <Key>example-object</Key>
+  <UploadId>VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA</UploadId>
+</InitiateMultipartUploadResult>

Added: libcloud/trunk/libcloud/test/storage/fixtures/s3/list_multipart_1.xml
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/test/storage/fixtures/s3/list_multipart_1.xml?rev=1425779&view=auto
==============================================================================
--- libcloud/trunk/libcloud/test/storage/fixtures/s3/list_multipart_1.xml (added)
+++ libcloud/trunk/libcloud/test/storage/fixtures/s3/list_multipart_1.xml Tue Dec 25 22:46:49 2012
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<ListMultipartUploadsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+  <Bucket>foo_bar_container</Bucket>
+  <KeyMarker></KeyMarker>
+  <UploadIdMarker></UploadIdMarker>
+  <NextKeyMarker>my-movie.m2ts</NextKeyMarker>
+  <NextUploadIdMarker>YW55IGlkZWEgd2h5IGVsdmluZydzIHVwbG9hZCBmYWlsZWQ</NextUploadIdMarker>
+  <MaxUploads>3</MaxUploads>
+  <IsTruncated>true</IsTruncated>
+  <Upload>
+    <Key>my-divisor</Key>
+    <UploadId>XMgbGlrZSBlbHZpbmcncyBub3QgaGF2aW5nIG11Y2ggbHVjaw</UploadId>
+    <Initiator>
+      <ID>arn:aws:iam::111122223333:user/user1-11111a31-17b5-4fb7-9df5-b111111f13de</ID>
+      <DisplayName>user1-11111a31-17b5-4fb7-9df5-b111111f13de</DisplayName>
+    </Initiator>
+    <Owner>
+      <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
+      <DisplayName>OwnerDisplayName</DisplayName>
+    </Owner>
+    <StorageClass>STANDARD</StorageClass>
+    <Initiated>2010-11-10T20:48:33.000Z</Initiated>  
+  </Upload>
+  <Upload>
+    <Key>my-movie.m2ts</Key>
+    <UploadId>VXBsb2FkIElEIGZvciBlbHZpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA</UploadId>
+    <Initiator>
+      <ID>b1d16700c70b0b05597d7acd6a3f92be</ID>
+      <DisplayName>InitiatorDisplayName</DisplayName>
+    </Initiator>
+    <Owner>
+      <ID>b1d16700c70b0b05597d7acd6a3f92be</ID>
+      <DisplayName>OwnerDisplayName</DisplayName>
+    </Owner>
+    <StorageClass>STANDARD</StorageClass>
+    <Initiated>2010-11-10T20:48:33.000Z</Initiated>
+  </Upload>
+  <Upload>
+    <Key>my-movie.m2ts</Key>
+    <UploadId>YW55IGlkZWEgd2h5IGVsdmluZydzIHVwbG9hZCBmYWlsZWQ</UploadId>
+    <Initiator>
+      <ID>arn:aws:iam::444455556666:user/user1-22222a31-17b5-4fb7-9df5-b222222f13de</ID>
+      <DisplayName>user1-22222a31-17b5-4fb7-9df5-b222222f13de</DisplayName>
+    </Initiator>
+    <Owner>
+      <ID>b1d16700c70b0b05597d7acd6a3f92be</ID>
+      <DisplayName>OwnerDisplayName</DisplayName>
+    </Owner>
+    <StorageClass>STANDARD</StorageClass>
+    <Initiated>2010-11-10T20:49:33.000Z</Initiated>
+  </Upload>
+</ListMultipartUploadsResult>

Added: libcloud/trunk/libcloud/test/storage/fixtures/s3/list_multipart_2.xml
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/test/storage/fixtures/s3/list_multipart_2.xml?rev=1425779&view=auto
==============================================================================
--- libcloud/trunk/libcloud/test/storage/fixtures/s3/list_multipart_2.xml (added)
+++ libcloud/trunk/libcloud/test/storage/fixtures/s3/list_multipart_2.xml Tue Dec 25 22:46:49 2012
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<ListMultipartUploadsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+  <Bucket>foo_bar_container</Bucket>
+  <KeyMarker></KeyMarker>
+  <UploadIdMarker></UploadIdMarker>
+  <NextKeyMarker></NextKeyMarker>
+  <NextUploadIdMarker></NextUploadIdMarker>
+  <MaxUploads>3</MaxUploads>
+  <IsTruncated>false</IsTruncated>
+  <Upload>
+    <Key>my-divisor</Key>
+    <UploadId>XMgbGlrZSBlbHZpbmcncyBub3QgaGF2aW5nIG11Y2ggbHVjaw</UploadId>
+    <Initiator>
+      <ID>arn:aws:iam::111122223333:user/user1-11111a31-17b5-4fb7-9df5-b111111f13de</ID>
+      <DisplayName>user1-11111a31-17b5-4fb7-9df5-b111111f13de</DisplayName>
+    </Initiator>
+    <Owner>
+      <ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
+      <DisplayName>OwnerDisplayName</DisplayName>
+    </Owner>
+    <StorageClass>STANDARD</StorageClass>
+    <Initiated>2010-11-10T20:48:33.000Z</Initiated>  
+  </Upload>
+  <Upload>
+    <Key>my-movie.m2ts</Key>
+    <UploadId>VXBsb2FkIElEIGZvciBlbHZpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA</UploadId>
+    <Initiator>
+      <ID>b1d16700c70b0b05597d7acd6a3f92be</ID>
+      <DisplayName>InitiatorDisplayName</DisplayName>
+    </Initiator>
+    <Owner>
+      <ID>b1d16700c70b0b05597d7acd6a3f92be</ID>
+      <DisplayName>OwnerDisplayName</DisplayName>
+    </Owner>
+    <StorageClass>STANDARD</StorageClass>
+    <Initiated>2010-11-10T20:48:33.000Z</Initiated>
+  </Upload>
+  <Upload>
+    <Key>my-movie.m2ts</Key>
+    <UploadId>YW55IGlkZWEgd2h5IGVsdmluZydzIHVwbG9hZCBmYWlsZWQ</UploadId>
+    <Initiator>
+      <ID>arn:aws:iam::444455556666:user/user1-22222a31-17b5-4fb7-9df5-b222222f13de</ID>
+      <DisplayName>user1-22222a31-17b5-4fb7-9df5-b222222f13de</DisplayName>
+    </Initiator>
+    <Owner>
+      <ID>b1d16700c70b0b05597d7acd6a3f92be</ID>
+      <DisplayName>OwnerDisplayName</DisplayName>
+    </Owner>
+    <StorageClass>STANDARD</StorageClass>
+    <Initiated>2010-11-10T20:49:33.000Z</Initiated>
+  </Upload>
+</ListMultipartUploadsResult>

Modified: libcloud/trunk/libcloud/test/storage/test_atmos.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/test/storage/test_atmos.py?rev=1425779&r1=1425778&r2=1425779&view=diff
==============================================================================
--- libcloud/trunk/libcloud/test/storage/test_atmos.py (original)
+++ libcloud/trunk/libcloud/test/storage/test_atmos.py Tue Dec 25 22:46:49 2012
@@ -739,14 +739,12 @@ class AtmosMockRawResponse(MockRawRespon
 
     def _rest_namespace_foo_bar_container_foo_bar_object(self, method, url,
                                                          body, headers):
-        body = 'test'
-        self._data = self._generate_random_data(1000)
+        body = self._generate_random_data(1000)
         return (httplib.OK, body, {}, httplib.responses[httplib.OK])
 
     def _rest_namespace_foo_20_26_20bar_container_foo_20_26_20bar_object(self, method, url,
                                                                          body, headers):
-        body = 'test'
-        self._data = self._generate_random_data(1000)
+        body = self._generate_random_data(1000)
         return (httplib.OK, body, {}, httplib.responses[httplib.OK])
 
     def _rest_namespace_foo_bar_container_foo_bar_object_NOT_FOUND(self, method,

Modified: libcloud/trunk/libcloud/test/storage/test_cloudfiles.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/test/storage/test_cloudfiles.py?rev=1425779&r1=1425778&r2=1425779&view=diff
==============================================================================
--- libcloud/trunk/libcloud/test/storage/test_cloudfiles.py (original)
+++ libcloud/trunk/libcloud/test/storage/test_cloudfiles.py Tue Dec 25 22:46:49 2012
@@ -992,8 +992,7 @@ class CloudFilesMockRawResponse(MockRawR
         self, method, url, body, headers):
 
         # test_download_object_success
-        body = 'test'
-        self._data = self._generate_random_data(1000)
+        body = self._generate_random_data(1000)
         return (httplib.OK,
                 body,
                 self.base_headers,
@@ -1002,8 +1001,7 @@ class CloudFilesMockRawResponse(MockRawR
     def _v1_MossoCloudFS_foo_bar_container_foo_bar_object_INVALID_SIZE(
         self, method, url, body, headers):
         # test_download_object_invalid_file_size
-        body = 'test'
-        self._data = self._generate_random_data(100)
+        body = self._generate_random_data(100)
         return (httplib.OK, body,
                 self.base_headers,
                 httplib.responses[httplib.OK])

Modified: libcloud/trunk/libcloud/test/storage/test_s3.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/test/storage/test_s3.py?rev=1425779&r1=1425778&r2=1425779&view=diff
==============================================================================
--- libcloud/trunk/libcloud/test/storage/test_s3.py (original)
+++ libcloud/trunk/libcloud/test/storage/test_s3.py Tue Dec 25 22:46:49 2012
@@ -17,7 +17,9 @@ import os
 import sys
 import unittest
 
+from xml.etree import ElementTree as ET
 from libcloud.utils.py3 import httplib
+from libcloud.utils.py3 import urlparse
 
 from libcloud.common.types import InvalidCredsError
 from libcloud.common.types import LibcloudError
@@ -34,11 +36,12 @@ from libcloud.storage.drivers.s3 import 
 from libcloud.storage.drivers.dummy import DummyIterator
 
 from libcloud.test import StorageMockHttp, MockRawResponse # pylint: disable-msg=E0611
+from libcloud.test import MockHttpTestCase # pylint: disable-msg=E0611
 from libcloud.test.file_fixtures import StorageFileFixtures # pylint: disable-msg=E0611
 from libcloud.test.secrets import STORAGE_S3_PARAMS
 
 
-class S3MockHttp(StorageMockHttp):
+class S3MockHttp(StorageMockHttp, MockHttpTestCase):
 
     fixtures = StorageFileFixtures('s3')
     base_headers = {}
@@ -181,15 +184,122 @@ class S3MockHttp(StorageMockHttp):
                 headers,
                 httplib.responses[httplib.OK])
 
+    def _foo_bar_container_foo_test_stream_data(self, method, url, body,
+                                                headers):
+        # test_upload_object_via_stream
+        body = ''
+        headers = {'etag': '"0cc175b9c0f1b6a831c399e269772661"'}
+        return (httplib.OK,
+                body,
+                headers,
+                httplib.responses[httplib.OK])
+
+    def _foo_bar_container_foo_test_stream_data_MULTIPART(self, method, url,
+                                                          body, headers):
+        headers = {'etag': '"0cc175b9c0f1b6a831c399e269772661"'}
+        TEST_ID = 'VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA'
+
+        query_string = urlparse.urlsplit(url).query
+        query = urlparse.parse_qs(query_string)
+
+        if not query.get('uploadId', False):
+            self.fail('Request doesnt contain uploadId query parameter')
+
+        upload_id = query['uploadId'][0]
+        if upload_id != TEST_ID:
+            self.fail('first uploadId doesnt match TEST_ID')
+
+        if method == 'PUT':
+            # PUT is used for uploading the part. part number is mandatory
+            if not query.get('partNumber', False):
+                self.fail('Request is missing partNumber query parameter')
+
+            body = ''
+            return (httplib.OK,
+                    body,
+                    headers,
+                    httplib.responses[httplib.OK])
+
+        elif method == 'DELETE':
+            # DELETE is done for aborting the upload
+            body = ''
+            return (httplib.NO_CONTENT,
+                    body,
+                    headers,
+                    httplib.responses[httplib.NO_CONTENT])
+
+        else:
+            # POST is done for committing the upload. Parse the XML and
+            # check if the commit is proper (TODO: XML Schema based check?)
+            commit = ET.fromstring(body)
+            count = 0
+
+            for part in commit.findall('Part'):
+                count += 1
+                part_no = part.find('PartNumber').text
+                etag = part.find('ETag').text
+
+                self.assertEqual(part_no, str(count))
+                self.assertEqual(etag, headers['etag'])
+
+            self.assertEqual(count, 3)
+
+            body = self.fixtures.load('complete_multipart.xml')
+            return (httplib.OK,
+                    body,
+                    headers,
+                    httplib.responses[httplib.OK])
+
+    def _foo_bar_container_LIST_MULTIPART(self, method, url, body, headers):
+        query_string = urlparse.urlsplit(url).query
+        query = urlparse.parse_qs(query_string)
+
+        if 'key-marker' not in query:
+            body = self.fixtures.load('list_multipart_1.xml')
+        else:
+            body = self.fixtures.load('list_multipart_2.xml')
+
+        return (httplib.OK,
+                body,
+                headers,
+                httplib.responses[httplib.OK])
+
+    def _foo_bar_container_my_divisor_LIST_MULTIPART(self, method, url,
+                                                     body, headers):
+        body = ''
+        return (httplib.NO_CONTENT,
+                body,
+                headers,
+                httplib.responses[httplib.NO_CONTENT])
+
+    def _foo_bar_container_my_movie_m2ts_LIST_MULTIPART(self, method, url,
+                                                        body, headers):
+        body = ''
+        return (httplib.NO_CONTENT,
+                body,
+                headers,
+                httplib.responses[httplib.NO_CONTENT])
+
 
 class S3MockRawResponse(MockRawResponse):
 
     fixtures = StorageFileFixtures('s3')
 
+    def parse_body(self):
+        if len(self.body) == 0 and not self.parse_zero_length_body:
+            return self.body
+
+        try:
+            body = ET.XML(self.body)
+        except:
+            raise MalformedResponseError("Failed to parse XML",
+                                         body=self.body,
+                                         driver=self.connection.driver)
+        return body
+
     def _foo_bar_container_foo_bar_object(self, method, url, body, headers):
         # test_download_object_success
-        body = ''
-        self._data = self._generate_random_data(1000)
+        body = self._generate_random_data(1000)
         return (httplib.OK,
                 body,
                 headers,
@@ -225,6 +335,14 @@ class S3MockRawResponse(MockRawResponse)
                 headers,
                 httplib.responses[httplib.OK])
 
+    def _foo_bar_container_foo_bar_object(self, method, url, body, headers):
+        # test_upload_object_invalid_file_size
+        body = self._generate_random_data(1000)
+        return (httplib.OK,
+                body,
+                headers,
+                httplib.responses[httplib.OK])
+
     def _foo_bar_container_foo_bar_object_INVALID_SIZE(self, method, url,
                                                        body, headers):
         # test_upload_object_invalid_file_size
@@ -244,6 +362,23 @@ class S3MockRawResponse(MockRawResponse)
                 headers,
                 httplib.responses[httplib.OK])
 
+    def _foo_bar_container_foo_test_stream_data_MULTIPART(self, method, url,
+                                                          body, headers):
+        headers = {}
+        # POST is done for initiating multipart upload
+        if method == 'POST':
+            body = self.fixtures.load('initiate_multipart.xml')
+            return (httplib.OK,
+                    body,
+                    headers,
+                    httplib.responses[httplib.OK])
+        else:
+            body = ''
+            return (httplib.BAD_REQUEST,
+                    body,
+                    headers,
+                    httplib.responses[httplib.BAD_REQUEST])
+
 
 class S3Tests(unittest.TestCase):
     driver_type = S3StorageDriver
@@ -377,7 +512,8 @@ class S3Tests(unittest.TestCase):
         self.assertEqual(obj.container.name, 'test2')
         self.assertEqual(obj.size, 12345)
         self.assertEqual(obj.hash, 'e31208wqsdoj329jd')
-        self.assertEqual(obj.extra['last_modified'], 'Thu, 13 Sep 2012 07:13:22 GMT')
+        self.assertEqual(obj.extra['last_modified'],
+                         'Thu, 13 Sep 2012 07:13:22 GMT')
         self.assertEqual(obj.extra['content_type'], 'application/zip')
         self.assertEqual(obj.meta_data['rabbits'], 'monkeys')
 
@@ -457,7 +593,7 @@ class S3Tests(unittest.TestCase):
                               driver=self.driver)
         obj = Object(name='foo_bar_object', size=1000, hash=None, extra={},
                      container=container, meta_data=None,
-                     driver=S3StorageDriver)
+                     driver=self.driver_type)
         destination_path = os.path.abspath(__file__) + '.temp'
         result = self.driver.download_object(obj=obj,
                                              destination_path=destination_path,
@@ -471,7 +607,7 @@ class S3Tests(unittest.TestCase):
                               driver=self.driver)
         obj = Object(name='foo_bar_object', size=1000, hash=None, extra={},
                      container=container, meta_data=None,
-                     driver=S3StorageDriver)
+                     driver=self.driver_type)
         destination_path = os.path.abspath(__file__) + '.temp'
         result = self.driver.download_object(obj=obj,
                                              destination_path=destination_path,
@@ -485,7 +621,7 @@ class S3Tests(unittest.TestCase):
                               driver=self.driver)
         obj = Object(name='foo_bar_object', size=1000, hash=None, extra={},
                      container=container, meta_data=None,
-                     driver=S3StorageDriver)
+                     driver=self.driver_type)
         destination_path = os.path.abspath(__file__)
         try:
             self.driver.download_object(obj=obj,
@@ -503,7 +639,7 @@ class S3Tests(unittest.TestCase):
 
         obj = Object(name='foo_bar_object', size=1000, hash=None, extra={},
                      container=container, meta_data=None,
-                     driver=S3StorageDriver)
+                     driver=self.driver_type)
 
         stream = self.driver.download_object_as_stream(obj=obj,
                                                        chunk_size=None)
@@ -536,8 +672,8 @@ class S3Tests(unittest.TestCase):
 
         self.mock_raw_response_klass.type = 'INVALID_HASH1'
 
-        old_func = S3StorageDriver._upload_file
-        S3StorageDriver._upload_file = upload_file
+        old_func = self.driver_type._upload_file
+        self.driver_type._upload_file = upload_file
         file_path = os.path.abspath(__file__)
         container = Container(name='foo_bar_container', extra={},
                               driver=self.driver)
@@ -552,7 +688,7 @@ class S3Tests(unittest.TestCase):
             self.fail(
                 'Invalid hash was returned but an exception was not thrown')
         finally:
-            S3StorageDriver._upload_file = old_func
+            self.driver_type._upload_file = old_func
 
     def test_upload_object_invalid_hash2(self):
         # Invalid hash is detected when comparing hash provided in the response
@@ -563,8 +699,8 @@ class S3Tests(unittest.TestCase):
 
         self.mock_raw_response_klass.type = 'INVALID_HASH2'
 
-        old_func = S3StorageDriver._upload_file
-        S3StorageDriver._upload_file = upload_file
+        old_func = self.driver_type._upload_file
+        self.driver_type._upload_file = upload_file
         file_path = os.path.abspath(__file__)
         container = Container(name='foo_bar_container', extra={},
                               driver=self.driver)
@@ -579,15 +715,15 @@ class S3Tests(unittest.TestCase):
             self.fail(
                 'Invalid hash was returned but an exception was not thrown')
         finally:
-            S3StorageDriver._upload_file = old_func
+            self.driver_type._upload_file = old_func
 
     def test_upload_object_success(self):
         def upload_file(self, response, file_path, chunked=False,
                         calculate_hash=True):
             return True, '0cc175b9c0f1b6a831c399e269772661', 1000
 
-        old_func = S3StorageDriver._upload_file
-        S3StorageDriver._upload_file = upload_file
+        old_func = self.driver_type._upload_file
+        self.driver_type._upload_file = upload_file
         file_path = os.path.abspath(__file__)
         container = Container(name='foo_bar_container', extra={},
                               driver=self.driver)
@@ -601,9 +737,17 @@ class S3Tests(unittest.TestCase):
         self.assertEqual(obj.name, 'foo_test_upload')
         self.assertEqual(obj.size, 1000)
         self.assertTrue('some-value' in obj.meta_data)
-        S3StorageDriver._upload_file = old_func
+        self.driver_type._upload_file = old_func
 
     def test_upload_object_via_stream(self):
+
+        if self.driver.supports_s3_multipart_upload:
+            self.mock_raw_response_klass.type = 'MULTIPART'
+            self.mock_response_klass.type = 'MULTIPART'
+        else:
+            self.mock_raw_response_klass.type = None
+            self.mock_response_klass.type = None
+
         container = Container(name='foo_bar_container', extra={},
                               driver=self.driver)
         object_name = 'foo_test_stream_data'
@@ -617,6 +761,63 @@ class S3Tests(unittest.TestCase):
         self.assertEqual(obj.name, object_name)
         self.assertEqual(obj.size, 3)
 
+    def test_upload_object_via_stream_abort(self):
+        if not self.driver.supports_s3_multipart_upload:
+            return
+
+        self.mock_raw_response_klass.type = 'MULTIPART'
+        self.mock_response_klass.type = 'MULTIPART'
+
+        def _faulty_iterator():
+            for i in range(0, 5):
+                yield str(i)
+            raise RuntimeError('Error in fetching data')
+
+        container = Container(name='foo_bar_container', extra={},
+                              driver=self.driver)
+        object_name = 'foo_test_stream_data'
+        iterator = _faulty_iterator()
+        extra = {'content_type': 'text/plain'}
+
+        try:
+            obj = self.driver.upload_object_via_stream(container=container,
+                                                       object_name=object_name,
+                                                       iterator=iterator,
+                                                       extra=extra)
+        except Exception:
+            pass
+
+        return
+
+    def test_s3_list_multipart_uploads(self):
+        if not self.driver.supports_s3_multipart_upload:
+            return
+
+        self.mock_response_klass.type = 'LIST_MULTIPART'
+        S3StorageDriver.RESPONSES_PER_REQUEST = 2
+
+        container = Container(name='foo_bar_container', extra={},
+                              driver=self.driver)
+
+        for upload in self.driver.ex_iterate_multipart_uploads(container):
+            self.assertNotEqual(upload.key, None)
+            self.assertNotEqual(upload.id, None)
+            self.assertNotEqual(upload.created_at, None)
+            self.assertNotEqual(upload.owner, None)
+            self.assertNotEqual(upload.initiator, None)
+
+    def test_s3_abort_multipart_uploads(self):
+        if not self.driver.supports_s3_multipart_upload:
+            return
+
+        self.mock_response_klass.type = 'LIST_MULTIPART'
+        S3StorageDriver.RESPONSES_PER_REQUEST = 2
+
+        container = Container(name='foo_bar_container', extra={},
+                              driver=self.driver)
+
+        self.driver.ex_cleanup_all_multipart_uploads(container)
+
     def test_delete_object_not_found(self):
         self.mock_response_klass.type = 'NOT_FOUND'
         container = Container(name='foo_bar_container', extra={},

Modified: libcloud/trunk/libcloud/utils/py3.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/utils/py3.py?rev=1425779&r1=1425778&r2=1425779&view=diff
==============================================================================
--- libcloud/trunk/libcloud/utils/py3.py (original)
+++ libcloud/trunk/libcloud/utils/py3.py Tue Dec 25 22:46:49 2012
@@ -22,6 +22,7 @@ from __future__ import absolute_import
 
 import sys
 import types
+from xml.etree import ElementTree as ET
 
 PY3 = False
 PY2 = False
@@ -59,6 +60,9 @@ if sys.version_info >= (3, 0):
     next = __builtins__['next']
     def dictvalues(d):
         return list(d.values())
+
+    def tostring(node):
+        return ET.tostring(node, encoding='unicode')
 else:
     PY2 = True
     import httplib
@@ -84,5 +88,7 @@ else:
     def dictvalues(d):
         return d.values()
 
+    tostring = ET.tostring
+
 if sys.version_info >= (2, 5) and sys.version_info <= (2, 6):
     PY25 = True