You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@libcloud.apache.org by rb...@apache.org on 2012/04/12 09:27:27 UTC

svn commit: r1325146 - in /libcloud/trunk: CHANGES libcloud/storage/drivers/cloudfiles.py libcloud/storage/providers.py libcloud/storage/types.py test/storage/test_cloudfiles.py

Author: rbogorodskiy
Date: Thu Apr 12 07:27:27 2012
New Revision: 1325146

URL: http://svn.apache.org/viewvc?rev=1325146&view=rev
Log:
- Large object upload support for CloudFiles driver
- Add CLOUDFILES_SWIFT driver to connect to OpenStack Swift

Initial implementation provided by Dmitry Russkikh, I have
refactored it and created unit tests.

Modified:
    libcloud/trunk/CHANGES
    libcloud/trunk/libcloud/storage/drivers/cloudfiles.py
    libcloud/trunk/libcloud/storage/providers.py
    libcloud/trunk/libcloud/storage/types.py
    libcloud/trunk/test/storage/test_cloudfiles.py

Modified: libcloud/trunk/CHANGES
URL: http://svn.apache.org/viewvc/libcloud/trunk/CHANGES?rev=1325146&r1=1325145&r2=1325146&view=diff
==============================================================================
--- libcloud/trunk/CHANGES (original)
+++ libcloud/trunk/CHANGES Thu Apr 12 07:27:27 2012
@@ -24,6 +24,12 @@ Changes with Apache Libcloud in developm
     - Add Joyent driver.
       [Tomaz Muraus]
 
+  *) Storage
+
+    - Large object upload support for CloudFiles driver
+    - Add CLOUDFILES_SWIFT driver to connect to OpenStack Swift
+      [Dmitry Russkikh, Roman Bogorodskiy]
+
 Changes with Apache Libcloud 0.9.1:
 
   *) General:

Modified: libcloud/trunk/libcloud/storage/drivers/cloudfiles.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/storage/drivers/cloudfiles.py?rev=1325146&r1=1325145&r2=1325146&view=diff
==============================================================================
--- libcloud/trunk/libcloud/storage/drivers/cloudfiles.py (original)
+++ libcloud/trunk/libcloud/storage/drivers/cloudfiles.py Thu Apr 12 07:27:27 2012
@@ -13,6 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import os
+import math
+
 from libcloud.utils.py3 import httplib
 
 try:
@@ -26,7 +29,6 @@ from libcloud.utils.py3 import urlquote
 if PY3:
     from io import FileIO as file
 
-
 from libcloud.utils.files import read_in_chunks
 from libcloud.common.types import MalformedResponseError, LibcloudError
 from libcloud.common.base import Response, RawResponse
@@ -40,7 +42,8 @@ from libcloud.storage.types import Objec
 from libcloud.storage.types import ObjectHashMismatchError
 from libcloud.storage.types import InvalidContainerNameError
 from libcloud.common.types import LazyList
-from libcloud.common.openstack import OpenStackBaseConnection, OpenStackDriverMixin
+from libcloud.common.openstack import OpenStackBaseConnection
+from libcloud.common.openstack import OpenStackDriverMixin
 
 from libcloud.common.rackspace import (
     AUTH_URL_US, AUTH_URL_UK)
@@ -107,15 +110,16 @@ class CloudFilesConnection(OpenStackBase
         self.accept_format = 'application/json'
 
     def get_endpoint(self, cdn_request=False):
-
-        # First, we parse out both files and cdn endpoints for each auth version
+        # First, we parse out both files and cdn endpoints
+        # for each auth version
         if '2.0' in self._auth_version:
             ep = self.service_catalog.get_endpoint(service_type='object-store',
                                                    name='cloudFiles',
                                                    region='ORD')
-            cdn_ep = self.service_catalog.get_endpoint(service_type='object-store',
-                                                       name='cloudFilesCDN',
-                                                       region='ORD')
+            cdn_ep = self.service_catalog.get_endpoint(
+                    service_type='object-store',
+                    name='cloudFilesCDN',
+                    region='ORD')
         elif ('1.1' in self._auth_version) or ('1.0' in self._auth_version):
             ep = self.service_catalog.get_endpoint(name='cloudFiles',
                                                    region='ORD')
@@ -139,9 +143,9 @@ class CloudFilesConnection(OpenStackBase
             params = {}
 
         # FIXME: Massive hack.
-        # This driver dynamically changes the url in its connection, based on arguments
-        # passed to request(). As such, we have to manually check and reset connection
-        # params each request
+        # This driver dynamically changes the url in its connection,
+        # based on arguments passed to request(). As such, we have to
+        # manually check and reset connection params each request
         self._populate_hosts_and_request_paths()
         if not self._ex_force_base_url:
             self._reset_connection_params(self.get_endpoint(cdn_request))
@@ -150,7 +154,7 @@ class CloudFilesConnection(OpenStackBase
 
         params['format'] = 'json'
 
-        if method in ['POST', 'PUT']:
+        if method in ['POST', 'PUT'] and 'Content-Type' not in headers:
             headers.update({'Content-Type': 'application/json; charset=UTF-8'})
 
         return super(CloudFilesConnection, self).request(
@@ -160,7 +164,8 @@ class CloudFilesConnection(OpenStackBase
             raw=raw)
 
     def _reset_connection_params(self, endpoint_url):
-        (self.host, self.port, self.secure, self.request_path) = self._tuple_from_url(endpoint_url)
+        (self.host, self.port, self.secure, self.request_path) = \
+                self._tuple_from_url(endpoint_url)
 
 
 class CloudFilesUSConnection(CloudFilesConnection):
@@ -179,6 +184,30 @@ class CloudFilesUKConnection(CloudFilesC
     auth_url = AUTH_URL_UK
 
 
+class CloudFilesSwiftConnection(CloudFilesConnection):
+    """
+    Connection class for the Cloudfiles Swift endpoint.
+    """
+    def __init__(self, *args, **kwargs):
+        self.region_name = kwargs.pop('ex_region_name', None)
+        super(CloudFilesSwiftConnection, self).__init__(*args, **kwargs)
+
+    def get_endpoint(self, *args, **kwargs):
+        if '2.0' in self._auth_version:
+            endpoint = self.service_catalog.get_endpoint(
+                    service_type='object-store',
+                    name='swift',
+                    region=self.region_name)
+        elif ('1.1' in self._auth_version) or ('1.0' in self._auth_version):
+            endpoint = self.service_catalog.get_endpoint(name='swift',
+                                                   region=self.region_name)
+
+        if 'publicURL' in endpoint:
+            return endpoint['publicURL']
+        else:
+            raise LibcloudError('Could not find specified endpoint')
+
+
 class CloudFilesStorageDriver(StorageDriver, OpenStackDriverMixin):
     """
     Base CloudFiles driver.
@@ -277,7 +306,8 @@ class CloudFilesStorageDriver(StorageDri
             # Accepted mean that container is not yet created but it will be
             # eventually
             extra = { 'object_count': 0 }
-            container = Container(name=container_name, extra=extra, driver=self)
+            container = Container(name=container_name,
+                    extra=extra, driver=self)
 
             return container
         elif response.status == httplib.ACCEPTED:
@@ -328,7 +358,7 @@ class CloudFilesStorageDriver(StorageDri
 
         return self._get_object(obj=obj, callback=read_in_chunks,
                                 response=response,
-                                callback_kwargs={ 'iterator': response.response,
+                                callback_kwargs={'iterator': response.response,
                                                  'chunk_size': chunk_size},
                                 success_status_code=httplib.OK)
 
@@ -393,6 +423,81 @@ class CloudFilesStorageDriver(StorageDri
 
         raise LibcloudError('Unexpected status code: %s' % (response.status))
 
+    def ex_multipart_upload_object(self, file_path, container, object_name,
+                                   chunk_size=33554432, extra=None,
+                                   verify_hash=True):
+        object_size = os.path.getsize(file_path)
+        if object_size < chunk_size:
+            return self.upload_object(file_path, container, object_name,
+                    extra=extra, verify_hash=verify_hash)
+
+        iter_chunk_reader = FileChunkReader(file_path, chunk_size)
+
+        for index, iterator in enumerate(iter_chunk_reader):
+            self._upload_object_part(container=container,
+                                     object_name=object_name,
+                                     part_number=index,
+                                     iterator=iterator,
+                                     verify_hash=verify_hash)
+
+        return self._upload_object_manifest(container=container,
+                                            object_name=object_name,
+                                            extra=extra,
+                                            verify_hash=verify_hash)
+
+    def _upload_object_part(self, container, object_name, part_number,
+                            iterator, verify_hash=True):
+
+        upload_func = self._stream_data
+        upload_func_kwargs = {'iterator': iterator}
+        part_name = object_name + '/%08d' % part_number
+        extra = {'content_type': 'application/octet-stream'}
+
+        self._put_object(container=container,
+                         object_name=part_name,
+                         upload_func=upload_func,
+                         upload_func_kwargs=upload_func_kwargs,
+                         extra=extra, iterator=iterator,
+                         verify_hash=verify_hash)
+
+    def _upload_object_manifest(self, container, object_name, extra=None,
+                                verify_hash=True):
+        extra = extra or {}
+        meta_data = extra.get('meta_data')
+
+        container_name_cleaned = self._clean_container_name(container.name)
+        object_name_cleaned = self._clean_object_name(object_name)
+        request_path = '/%s/%s' % (container_name_cleaned, object_name_cleaned)
+
+        headers = {'X-Auth-Token': self.connection.auth_token,
+                   'X-Object-Manifest': '%s/%s/' % \
+                       (container_name_cleaned, object_name_cleaned)}
+
+        data = ''
+        response = self.connection.request(request_path,
+                                           method='PUT', data=data,
+                                           headers=headers, raw=True)
+
+        object_hash = None
+
+        if verify_hash:
+            hash_function = self._get_hash_function()
+            hash_function.update(data)
+            data_hash = hash_function.hexdigest()
+            object_hash = response.headers.get('etag')
+
+            if object_hash != data_hash:
+                raise ObjectHashMismatchError(
+                    value=('MD5 hash checksum does not match (expected=%s, ' +
+                           'actual=%s)') % \
+                           (data_hash, object_hash),
+                    object_name=object_name, driver=self)
+
+        obj = Object(name=object_name, size=0, hash=object_hash, extra=None,
+                     meta_data=meta_data, container=container, driver=self)
+
+        return obj
+
     def _get_more(self, last_key, value_dict):
         container = value_dict['container']
         params = {}
@@ -407,7 +512,8 @@ class CloudFilesStorageDriver(StorageDri
             # Empty or inexistent container
             return [], None, True
         elif response.status == httplib.OK:
-            objects = self._to_object_list(json.loads(response.body), container)
+            objects = self._to_object_list(json.loads(response.body),
+                    container)
 
             # TODO: Is this really needed?
             if len(objects) == 0:
@@ -434,13 +540,13 @@ class CloudFilesStorageDriver(StorageDri
 
         request_path = '/%s/%s' % (container_name_cleaned, object_name_cleaned)
         result_dict = self._upload_object(object_name=object_name,
-                                          content_type=content_type,
-                                          upload_func=upload_func,
-                                          upload_func_kwargs=upload_func_kwargs,
-                                          request_path=request_path,
-                                          request_method='PUT',
-                                          headers=headers, file_path=file_path,
-                                          iterator=iterator)
+                                         content_type=content_type,
+                                         upload_func=upload_func,
+                                         upload_func_kwargs=upload_func_kwargs,
+                                         request_path=request_path,
+                                         request_method='PUT',
+                                         headers=headers, file_path=file_path,
+                                         iterator=iterator)
 
         response = result_dict['response'].response
         bytes_transferred = result_dict['bytes_transferred']
@@ -487,7 +593,6 @@ class CloudFilesStorageDriver(StorageDri
                                                    ' longer than 256 bytes',
                                             container_name=name, driver=self)
 
-
         return name
 
     def _clean_object_name(self, name):
@@ -542,7 +647,7 @@ class CloudFilesStorageDriver(StorageDri
                 key = key.replace('x-object-meta-', '')
                 meta_data[key] = value
 
-        extra = { 'content_type': content_type, 'last_modified': last_modified }
+        extra = {'content_type': content_type, 'last_modified': last_modified}
 
         obj = Object(name=name, size=size, hash=etag, extra=extra,
                      meta_data=meta_data, container=container, driver=self)
@@ -551,6 +656,7 @@ class CloudFilesStorageDriver(StorageDri
     def _ex_connection_class_kwargs(self):
         return self.openstack_connection_kwargs()
 
+
 class CloudFilesUSStorageDriver(CloudFilesStorageDriver):
     """
     Cloudfiles storage driver for the US endpoint.
@@ -560,6 +666,26 @@ class CloudFilesUSStorageDriver(CloudFil
     name = 'CloudFiles (US)'
     connectionCls = CloudFilesUSConnection
 
+
+class CloudFilesSwiftStorageDriver(CloudFilesStorageDriver):
+    """
+    Cloudfiles storage driver for the OpenStack Swift.
+    """
+    type = Provider.CLOUDFILES_SWIFT
+    name = 'CloudFiles (SWIFT)'
+    connectionCls = CloudFilesSwiftConnection
+
+    def __init__(self, *args, **kwargs):
+        self._ex_region_name = kwargs.get('ex_region_name', 'RegionOne')
+        super(CloudFilesSwiftStorageDriver, self).__init__(*args, **kwargs)
+
+    def openstack_connection_kwargs(self):
+        rv = super(CloudFilesSwiftStorageDriver, self). \
+                                                  openstack_connection_kwargs()
+        rv['ex_region_name'] = self._ex_region_name
+        return rv
+
+
 class CloudFilesUKStorageDriver(CloudFilesStorageDriver):
     """
     Cloudfiles storage driver for the UK endpoint.
@@ -568,3 +694,62 @@ class CloudFilesUKStorageDriver(CloudFil
     type = Provider.CLOUDFILES_UK
     name = 'CloudFiles (UK)'
     connectionCls = CloudFilesUKConnection
+
+
+class FileChunkReader(object):
+    def __init__(self, file_path, chunk_size):
+        self.file_path = file_path
+        self.total = os.path.getsize(file_path)
+        self.chunk_size = chunk_size
+        self.bytes_read = 0
+        self.stop_iteration = False
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        if not self.stop_iteration:
+            start_block = self.bytes_read
+            end_block = start_block + self.chunk_size
+            if end_block > self.total:
+                end_block = self.total
+                self.stop_iteration = True
+            self.bytes_read += end_block - start_block
+            return ChunkStreamReader(file_path=self.file_path,
+                                     start_block=start_block,
+                                     end_block=end_block,
+                                     chunk_size=8192)
+
+        else:
+            raise StopIteration
+
+
+class ChunkStreamReader(object):
+    def __init__(self, file_path, start_block, end_block, chunk_size):
+        self.fd = open(file_path, 'rb')
+        self.fd.seek(start_block)
+        self.start_block = start_block
+        self.end_block = end_block
+        self.chunk_size = chunk_size
+        self.bytes_read = 0
+        self.stop_iteration = False
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        if not self.stop_iteration:
+            block_size = self.chunk_size
+            if self.bytes_read + block_size > \
+                self.end_block - self.start_block:
+                    block_size = self.end_block - self.start_block - \
+                                 self.bytes_read
+                    self.stop_iteration = True
+
+            block = self.fd.read(block_size)
+            self.bytes_read += block_size
+            return block
+
+        else:
+            self.fd.close()
+            raise StopIteration

Modified: libcloud/trunk/libcloud/storage/providers.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/storage/providers.py?rev=1325146&r1=1325145&r2=1325146&view=diff
==============================================================================
--- libcloud/trunk/libcloud/storage/providers.py (original)
+++ libcloud/trunk/libcloud/storage/providers.py Thu Apr 12 07:27:27 2012
@@ -38,7 +38,10 @@ DRIVERS = {
     Provider.NINEFOLD:
         ('libcloud.storage.drivers.ninefold', 'NinefoldStorageDriver'),
     Provider.GOOGLE_STORAGE:
-        ('libcloud.storage.drivers.google_storage', 'GoogleStorageDriver')
+        ('libcloud.storage.drivers.google_storage', 'GoogleStorageDriver'),
+    Provider.CLOUDFILES_SWIFT:
+        ('libcloud.storage.drivers.cloudfiles',
+            'CloudFilesSwiftStorageDriver')
 }
 
 def get_driver(provider):

Modified: libcloud/trunk/libcloud/storage/types.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/storage/types.py?rev=1325146&r1=1325145&r2=1325146&view=diff
==============================================================================
--- libcloud/trunk/libcloud/storage/types.py (original)
+++ libcloud/trunk/libcloud/storage/types.py Thu Apr 12 07:27:27 2012
@@ -52,6 +52,7 @@ class Provider(object):
     NINEFOLD = 8
     GOOGLE_STORAGE = 9
     S3_US_WEST_OREGON = 10
+    CLOUDFILES_SWIFT = 11
 
 class ContainerError(LibcloudError):
     error_type = 'ContainerError'

Modified: libcloud/trunk/test/storage/test_cloudfiles.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/test/storage/test_cloudfiles.py?rev=1325146&r1=1325145&r2=1325146&view=diff
==============================================================================
--- libcloud/trunk/test/storage/test_cloudfiles.py (original)
+++ libcloud/trunk/test/storage/test_cloudfiles.py Thu Apr 12 07:27:27 2012
@@ -14,10 +14,13 @@
 # limitations under the License.
 import os
 import os.path                          # pylint: disable-msg=W0404
+import math
 import sys
 import copy
 import unittest
 
+import mock
+
 import libcloud.utils.files
 
 from libcloud.utils.py3 import PY3
@@ -451,6 +454,115 @@ class CloudFilesTests(unittest.TestCase)
         self.assertTrue('container_count' in meta_data)
         self.assertTrue('bytes_used' in meta_data)
 
+    @mock.patch('os.path.getsize')
+    def test_ex_multipart_upload_object_for_small_files(self, getsize_mock):
+        getsize_mock.return_value = 0
+
+        old_func = CloudFilesStorageDriver.upload_object
+        mocked_upload_object = mock.Mock(return_value="test")
+        CloudFilesStorageDriver.upload_object = mocked_upload_object
+
+        file_path = os.path.abspath(__file__)
+        container = Container(name='foo_bar_container', extra={}, driver=self)
+        object_name = 'foo_test_upload'
+        obj = self.driver.ex_multipart_upload_object(file_path=file_path, container=container,
+                                       object_name=object_name)
+        CloudFilesStorageDriver.upload_object = old_func
+
+        self.assertTrue(mocked_upload_object.called)
+        self.assertEqual(obj, "test")
+
+    def test_ex_multipart_upload_object_success(self):
+        _upload_object_part = CloudFilesStorageDriver._upload_object_part
+        _upload_object_manifest = CloudFilesStorageDriver._upload_object_manifest
+
+        mocked__upload_object_part = mock.Mock(return_value="test_part")
+        mocked__upload_object_manifest = mock.Mock(return_value="test_manifest")
+
+        CloudFilesStorageDriver._upload_object_part = mocked__upload_object_part
+        CloudFilesStorageDriver._upload_object_manifest = mocked__upload_object_manifest
+
+        parts = 5
+        file_path = os.path.abspath(__file__)
+        chunk_size = int(math.ceil(float(os.path.getsize(file_path)) / parts))
+        container = Container(name='foo_bar_container', extra={}, driver=self)
+        object_name = 'foo_test_upload'
+        self.driver.ex_multipart_upload_object(file_path=file_path, container=container,
+                                       object_name=object_name, chunk_size=chunk_size)
+
+        CloudFilesStorageDriver._upload_object_part = _upload_object_part
+        CloudFilesStorageDriver._upload_object_manifest = _upload_object_manifest
+
+        self.assertEqual(mocked__upload_object_part.call_count, parts)
+        self.assertTrue(mocked__upload_object_manifest.call_count, 1)
+
+    def test__upload_object_part(self):
+        _put_object = CloudFilesStorageDriver._put_object
+        mocked__put_object = mock.Mock(return_value="test")
+        CloudFilesStorageDriver._put_object = mocked__put_object
+
+        part_number = 7
+        object_name = "test_object"
+        expected_name = object_name + '/%08d' % part_number
+        container = Container(name='foo_bar_container', extra={}, driver=self)
+
+        self.driver._upload_object_part(container, object_name,
+                part_number, None)
+
+        CloudFilesStorageDriver._put_object = _put_object
+
+        func_kwargs = tuple(mocked__put_object.call_args)[1]
+        self.assertEquals(func_kwargs['object_name'], expected_name)
+        self.assertEquals(func_kwargs['container'], container)
+
+    def test__upload_object_manifest(self):
+        hash_function = self.driver._get_hash_function()
+        hash_function.update('')
+        data_hash = hash_function.hexdigest()
+
+        fake_response = type('CloudFilesResponse', (), {'headers': 
+                {'etag': data_hash}
+            })
+
+        _request = self.driver.connection.request
+        mocked_request = mock.Mock(return_value=fake_response)
+        self.driver.connection.request = mocked_request
+
+        container = Container(name='foo_bar_container', extra={}, driver=self)
+        object_name = "test_object"
+
+        self.driver._upload_object_manifest(container, object_name)
+
+        func_args, func_kwargs = tuple(mocked_request.call_args)
+
+        self.driver.connection.request = _request
+
+        self.assertEquals(func_args[0], "/" + container.name + "/" + object_name)
+        self.assertEquals(func_kwargs["headers"]["X-Object-Manifest"],
+                container.name + "/" + object_name + "/")
+        self.assertEquals(func_kwargs["method"], "PUT")
+
+    def test__upload_object_manifest_wrong_hash(self):
+        fake_response = type('CloudFilesResponse', (), {'headers':
+            {'etag': '0000000'}})
+
+        _request = self.driver.connection.request
+        mocked_request = mock.Mock(return_value=fake_response)
+        self.driver.connection.request = mocked_request
+
+        container = Container(name='foo_bar_container', extra={}, driver=self)
+        object_name = "test_object"
+
+
+        try:
+            self.driver._upload_object_manifest(container, object_name)
+        except ObjectHashMismatchError:
+            pass
+        else:
+            self.fail('Exception was not thrown')
+        finally:
+            self.driver.connection.request = _request
+
     def _remove_test_file(self):
         file_path = os.path.abspath(__file__) + '.temp'
 
@@ -459,6 +571,7 @@ class CloudFilesTests(unittest.TestCase)
         except OSError:
             pass
 
+
 class CloudFilesMockHttp(StorageMockHttp):
 
     fixtures = StorageFileFixtures('cloudfiles')