You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@libcloud.apache.org by rb...@apache.org on 2012/04/12 09:27:27 UTC
svn commit: r1325146 - in /libcloud/trunk: CHANGES
libcloud/storage/drivers/cloudfiles.py libcloud/storage/providers.py
libcloud/storage/types.py test/storage/test_cloudfiles.py
Author: rbogorodskiy
Date: Thu Apr 12 07:27:27 2012
New Revision: 1325146
URL: http://svn.apache.org/viewvc?rev=1325146&view=rev
Log:
- Large object upload support for CloudFiles driver
- Add CLOUDFILES_SWIFT driver to connect to OpenStack Swift
Initial implementation provided by Dmitry Russkikh, I have
refactored it and created unit tests.
Modified:
libcloud/trunk/CHANGES
libcloud/trunk/libcloud/storage/drivers/cloudfiles.py
libcloud/trunk/libcloud/storage/providers.py
libcloud/trunk/libcloud/storage/types.py
libcloud/trunk/test/storage/test_cloudfiles.py
Modified: libcloud/trunk/CHANGES
URL: http://svn.apache.org/viewvc/libcloud/trunk/CHANGES?rev=1325146&r1=1325145&r2=1325146&view=diff
==============================================================================
--- libcloud/trunk/CHANGES (original)
+++ libcloud/trunk/CHANGES Thu Apr 12 07:27:27 2012
@@ -24,6 +24,12 @@ Changes with Apache Libcloud in developm
- Add Joyent driver.
[Tomaz Muraus]
+ *) Storage
+
+ - Large object upload support for CloudFiles driver
+ - Add CLOUDFILES_SWIFT driver to connect to OpenStack Swift
+ [Dmitry Russkikh, Roman Bogorodskiy]
+
Changes with Apache Libcloud 0.9.1:
*) General:
Modified: libcloud/trunk/libcloud/storage/drivers/cloudfiles.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/storage/drivers/cloudfiles.py?rev=1325146&r1=1325145&r2=1325146&view=diff
==============================================================================
--- libcloud/trunk/libcloud/storage/drivers/cloudfiles.py (original)
+++ libcloud/trunk/libcloud/storage/drivers/cloudfiles.py Thu Apr 12 07:27:27 2012
@@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import os
+import math
+
from libcloud.utils.py3 import httplib
try:
@@ -26,7 +29,6 @@ from libcloud.utils.py3 import urlquote
if PY3:
from io import FileIO as file
-
from libcloud.utils.files import read_in_chunks
from libcloud.common.types import MalformedResponseError, LibcloudError
from libcloud.common.base import Response, RawResponse
@@ -40,7 +42,8 @@ from libcloud.storage.types import Objec
from libcloud.storage.types import ObjectHashMismatchError
from libcloud.storage.types import InvalidContainerNameError
from libcloud.common.types import LazyList
-from libcloud.common.openstack import OpenStackBaseConnection, OpenStackDriverMixin
+from libcloud.common.openstack import OpenStackBaseConnection
+from libcloud.common.openstack import OpenStackDriverMixin
from libcloud.common.rackspace import (
AUTH_URL_US, AUTH_URL_UK)
@@ -107,15 +110,16 @@ class CloudFilesConnection(OpenStackBase
self.accept_format = 'application/json'
def get_endpoint(self, cdn_request=False):
-
- # First, we parse out both files and cdn endpoints for each auth version
+ # First, we parse out both files and cdn endpoints
+ # for each auth version
if '2.0' in self._auth_version:
ep = self.service_catalog.get_endpoint(service_type='object-store',
name='cloudFiles',
region='ORD')
- cdn_ep = self.service_catalog.get_endpoint(service_type='object-store',
- name='cloudFilesCDN',
- region='ORD')
+ cdn_ep = self.service_catalog.get_endpoint(
+ service_type='object-store',
+ name='cloudFilesCDN',
+ region='ORD')
elif ('1.1' in self._auth_version) or ('1.0' in self._auth_version):
ep = self.service_catalog.get_endpoint(name='cloudFiles',
region='ORD')
@@ -139,9 +143,9 @@ class CloudFilesConnection(OpenStackBase
params = {}
# FIXME: Massive hack.
- # This driver dynamically changes the url in its connection, based on arguments
- # passed to request(). As such, we have to manually check and reset connection
- # params each request
+ # This driver dynamically changes the url in its connection,
+ # based on arguments passed to request(). As such, we have to
+ # manually check and reset connection params each request
self._populate_hosts_and_request_paths()
if not self._ex_force_base_url:
self._reset_connection_params(self.get_endpoint(cdn_request))
@@ -150,7 +154,7 @@ class CloudFilesConnection(OpenStackBase
params['format'] = 'json'
- if method in ['POST', 'PUT']:
+ if method in ['POST', 'PUT'] and 'Content-Type' not in headers:
headers.update({'Content-Type': 'application/json; charset=UTF-8'})
return super(CloudFilesConnection, self).request(
@@ -160,7 +164,8 @@ class CloudFilesConnection(OpenStackBase
raw=raw)
def _reset_connection_params(self, endpoint_url):
- (self.host, self.port, self.secure, self.request_path) = self._tuple_from_url(endpoint_url)
+ (self.host, self.port, self.secure, self.request_path) = \
+ self._tuple_from_url(endpoint_url)
class CloudFilesUSConnection(CloudFilesConnection):
@@ -179,6 +184,30 @@ class CloudFilesUKConnection(CloudFilesC
auth_url = AUTH_URL_UK
+class CloudFilesSwiftConnection(CloudFilesConnection):
+ """
+ Connection class for the Cloudfiles Swift endpoint.
+ """
+ def __init__(self, *args, **kwargs):
+ self.region_name = kwargs.pop('ex_region_name', None)
+ super(CloudFilesSwiftConnection, self).__init__(*args, **kwargs)
+
+ def get_endpoint(self, *args, **kwargs):
+ if '2.0' in self._auth_version:
+ endpoint = self.service_catalog.get_endpoint(
+ service_type='object-store',
+ name='swift',
+ region=self.region_name)
+ elif ('1.1' in self._auth_version) or ('1.0' in self._auth_version):
+ endpoint = self.service_catalog.get_endpoint(name='swift',
+ region=self.region_name)
+
+ if 'publicURL' in endpoint:
+ return endpoint['publicURL']
+ else:
+ raise LibcloudError('Could not find specified endpoint')
+
+
class CloudFilesStorageDriver(StorageDriver, OpenStackDriverMixin):
"""
Base CloudFiles driver.
@@ -277,7 +306,8 @@ class CloudFilesStorageDriver(StorageDri
# Accepted mean that container is not yet created but it will be
# eventually
extra = { 'object_count': 0 }
- container = Container(name=container_name, extra=extra, driver=self)
+ container = Container(name=container_name,
+ extra=extra, driver=self)
return container
elif response.status == httplib.ACCEPTED:
@@ -328,7 +358,7 @@ class CloudFilesStorageDriver(StorageDri
return self._get_object(obj=obj, callback=read_in_chunks,
response=response,
- callback_kwargs={ 'iterator': response.response,
+ callback_kwargs={'iterator': response.response,
'chunk_size': chunk_size},
success_status_code=httplib.OK)
@@ -393,6 +423,81 @@ class CloudFilesStorageDriver(StorageDri
raise LibcloudError('Unexpected status code: %s' % (response.status))
+ def ex_multipart_upload_object(self, file_path, container, object_name,
+ chunk_size=33554432, extra=None,
+ verify_hash=True):
+ object_size = os.path.getsize(file_path)
+ if object_size < chunk_size:
+ return self.upload_object(file_path, container, object_name,
+ extra=extra, verify_hash=verify_hash)
+
+ iter_chunk_reader = FileChunkReader(file_path, chunk_size)
+
+ for index, iterator in enumerate(iter_chunk_reader):
+ self._upload_object_part(container=container,
+ object_name=object_name,
+ part_number=index,
+ iterator=iterator,
+ verify_hash=verify_hash)
+
+ return self._upload_object_manifest(container=container,
+ object_name=object_name,
+ extra=extra,
+ verify_hash=verify_hash)
+
+ def _upload_object_part(self, container, object_name, part_number,
+ iterator, verify_hash=True):
+
+ upload_func = self._stream_data
+ upload_func_kwargs = {'iterator': iterator}
+ part_name = object_name + '/%08d' % part_number
+ extra = {'content_type': 'application/octet-stream'}
+
+ self._put_object(container=container,
+ object_name=part_name,
+ upload_func=upload_func,
+ upload_func_kwargs=upload_func_kwargs,
+ extra=extra, iterator=iterator,
+ verify_hash=verify_hash)
+
+ def _upload_object_manifest(self, container, object_name, extra=None,
+ verify_hash=True):
+ extra = extra or {}
+ meta_data = extra.get('meta_data')
+
+ container_name_cleaned = self._clean_container_name(container.name)
+ object_name_cleaned = self._clean_object_name(object_name)
+ request_path = '/%s/%s' % (container_name_cleaned, object_name_cleaned)
+
+ headers = {'X-Auth-Token': self.connection.auth_token,
+ 'X-Object-Manifest': '%s/%s/' % \
+ (container_name_cleaned, object_name_cleaned)}
+
+ data = ''
+ response = self.connection.request(request_path,
+ method='PUT', data=data,
+ headers=headers, raw=True)
+
+ object_hash = None
+
+ if verify_hash:
+ hash_function = self._get_hash_function()
+ hash_function.update(data)
+ data_hash = hash_function.hexdigest()
+ object_hash = response.headers.get('etag')
+
+ if object_hash != data_hash:
+ raise ObjectHashMismatchError(
+ value=('MD5 hash checksum does not match (expected=%s, ' +
+ 'actual=%s)') % \
+ (data_hash, object_hash),
+ object_name=object_name, driver=self)
+
+ obj = Object(name=object_name, size=0, hash=object_hash, extra=None,
+ meta_data=meta_data, container=container, driver=self)
+
+ return obj
+
def _get_more(self, last_key, value_dict):
container = value_dict['container']
params = {}
@@ -407,7 +512,8 @@ class CloudFilesStorageDriver(StorageDri
# Empty or inexistent container
return [], None, True
elif response.status == httplib.OK:
- objects = self._to_object_list(json.loads(response.body), container)
+ objects = self._to_object_list(json.loads(response.body),
+ container)
# TODO: Is this really needed?
if len(objects) == 0:
@@ -434,13 +540,13 @@ class CloudFilesStorageDriver(StorageDri
request_path = '/%s/%s' % (container_name_cleaned, object_name_cleaned)
result_dict = self._upload_object(object_name=object_name,
- content_type=content_type,
- upload_func=upload_func,
- upload_func_kwargs=upload_func_kwargs,
- request_path=request_path,
- request_method='PUT',
- headers=headers, file_path=file_path,
- iterator=iterator)
+ content_type=content_type,
+ upload_func=upload_func,
+ upload_func_kwargs=upload_func_kwargs,
+ request_path=request_path,
+ request_method='PUT',
+ headers=headers, file_path=file_path,
+ iterator=iterator)
response = result_dict['response'].response
bytes_transferred = result_dict['bytes_transferred']
@@ -487,7 +593,6 @@ class CloudFilesStorageDriver(StorageDri
' longer than 256 bytes',
container_name=name, driver=self)
-
return name
def _clean_object_name(self, name):
@@ -542,7 +647,7 @@ class CloudFilesStorageDriver(StorageDri
key = key.replace('x-object-meta-', '')
meta_data[key] = value
- extra = { 'content_type': content_type, 'last_modified': last_modified }
+ extra = {'content_type': content_type, 'last_modified': last_modified}
obj = Object(name=name, size=size, hash=etag, extra=extra,
meta_data=meta_data, container=container, driver=self)
@@ -551,6 +656,7 @@ class CloudFilesStorageDriver(StorageDri
def _ex_connection_class_kwargs(self):
return self.openstack_connection_kwargs()
+
class CloudFilesUSStorageDriver(CloudFilesStorageDriver):
"""
Cloudfiles storage driver for the US endpoint.
@@ -560,6 +666,26 @@ class CloudFilesUSStorageDriver(CloudFil
name = 'CloudFiles (US)'
connectionCls = CloudFilesUSConnection
+
+class CloudFilesSwiftStorageDriver(CloudFilesStorageDriver):
+ """
+ Cloudfiles storage driver for the OpenStack Swift.
+ """
+ type = Provider.CLOUDFILES_SWIFT
+ name = 'CloudFiles (SWIFT)'
+ connectionCls = CloudFilesSwiftConnection
+
+ def __init__(self, *args, **kwargs):
+ self._ex_region_name = kwargs.get('ex_region_name', 'RegionOne')
+ super(CloudFilesSwiftStorageDriver, self).__init__(*args, **kwargs)
+
+ def openstack_connection_kwargs(self):
+ rv = super(CloudFilesSwiftStorageDriver, self). \
+ openstack_connection_kwargs()
+ rv['ex_region_name'] = self._ex_region_name
+ return rv
+
+
class CloudFilesUKStorageDriver(CloudFilesStorageDriver):
"""
Cloudfiles storage driver for the UK endpoint.
@@ -568,3 +694,62 @@ class CloudFilesUKStorageDriver(CloudFil
type = Provider.CLOUDFILES_UK
name = 'CloudFiles (UK)'
connectionCls = CloudFilesUKConnection
+
+
+class FileChunkReader(object):
+ def __init__(self, file_path, chunk_size):
+ self.file_path = file_path
+ self.total = os.path.getsize(file_path)
+ self.chunk_size = chunk_size
+ self.bytes_read = 0
+ self.stop_iteration = False
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if not self.stop_iteration:
+ start_block = self.bytes_read
+ end_block = start_block + self.chunk_size
+ if end_block > self.total:
+ end_block = self.total
+ self.stop_iteration = True
+ self.bytes_read += end_block - start_block
+ return ChunkStreamReader(file_path=self.file_path,
+ start_block=start_block,
+ end_block=end_block,
+ chunk_size=8192)
+
+ else:
+ raise StopIteration
+
+
+class ChunkStreamReader(object):
+ def __init__(self, file_path, start_block, end_block, chunk_size):
+ self.fd = open(file_path, 'rb')
+ self.fd.seek(start_block)
+ self.start_block = start_block
+ self.end_block = end_block
+ self.chunk_size = chunk_size
+ self.bytes_read = 0
+ self.stop_iteration = False
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if not self.stop_iteration:
+ block_size = self.chunk_size
+ if self.bytes_read + block_size > \
+ self.end_block - self.start_block:
+ block_size = self.end_block - self.start_block - \
+ self.bytes_read
+ self.stop_iteration = True
+
+ block = self.fd.read(block_size)
+ self.bytes_read += block_size
+ return block
+
+ else:
+ self.fd.close()
+ raise StopIteration
Modified: libcloud/trunk/libcloud/storage/providers.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/storage/providers.py?rev=1325146&r1=1325145&r2=1325146&view=diff
==============================================================================
--- libcloud/trunk/libcloud/storage/providers.py (original)
+++ libcloud/trunk/libcloud/storage/providers.py Thu Apr 12 07:27:27 2012
@@ -38,7 +38,10 @@ DRIVERS = {
Provider.NINEFOLD:
('libcloud.storage.drivers.ninefold', 'NinefoldStorageDriver'),
Provider.GOOGLE_STORAGE:
- ('libcloud.storage.drivers.google_storage', 'GoogleStorageDriver')
+ ('libcloud.storage.drivers.google_storage', 'GoogleStorageDriver'),
+ Provider.CLOUDFILES_SWIFT:
+ ('libcloud.storage.drivers.cloudfiles',
+ 'CloudFilesSwiftStorageDriver')
}
def get_driver(provider):
Modified: libcloud/trunk/libcloud/storage/types.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/libcloud/storage/types.py?rev=1325146&r1=1325145&r2=1325146&view=diff
==============================================================================
--- libcloud/trunk/libcloud/storage/types.py (original)
+++ libcloud/trunk/libcloud/storage/types.py Thu Apr 12 07:27:27 2012
@@ -52,6 +52,7 @@ class Provider(object):
NINEFOLD = 8
GOOGLE_STORAGE = 9
S3_US_WEST_OREGON = 10
+ CLOUDFILES_SWIFT = 11
class ContainerError(LibcloudError):
error_type = 'ContainerError'
Modified: libcloud/trunk/test/storage/test_cloudfiles.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/test/storage/test_cloudfiles.py?rev=1325146&r1=1325145&r2=1325146&view=diff
==============================================================================
--- libcloud/trunk/test/storage/test_cloudfiles.py (original)
+++ libcloud/trunk/test/storage/test_cloudfiles.py Thu Apr 12 07:27:27 2012
@@ -14,10 +14,13 @@
# limitations under the License.
import os
import os.path # pylint: disable-msg=W0404
+import math
import sys
import copy
import unittest
+import mock
+
import libcloud.utils.files
from libcloud.utils.py3 import PY3
@@ -451,6 +454,115 @@ class CloudFilesTests(unittest.TestCase)
self.assertTrue('container_count' in meta_data)
self.assertTrue('bytes_used' in meta_data)
+ @mock.patch('os.path.getsize')
+ def test_ex_multipart_upload_object_for_small_files(self, getsize_mock):
+ getsize_mock.return_value = 0
+
+ old_func = CloudFilesStorageDriver.upload_object
+ mocked_upload_object = mock.Mock(return_value="test")
+ CloudFilesStorageDriver.upload_object = mocked_upload_object
+
+ file_path = os.path.abspath(__file__)
+ container = Container(name='foo_bar_container', extra={}, driver=self)
+ object_name = 'foo_test_upload'
+ obj = self.driver.ex_multipart_upload_object(file_path=file_path, container=container,
+ object_name=object_name)
+ CloudFilesStorageDriver.upload_object = old_func
+
+ self.assertTrue(mocked_upload_object.called)
+ self.assertEqual(obj, "test")
+
+ def test_ex_multipart_upload_object_success(self):
+ _upload_object_part = CloudFilesStorageDriver._upload_object_part
+ _upload_object_manifest = CloudFilesStorageDriver._upload_object_manifest
+
+ mocked__upload_object_part = mock.Mock(return_value="test_part")
+ mocked__upload_object_manifest = mock.Mock(return_value="test_manifest")
+
+ CloudFilesStorageDriver._upload_object_part = mocked__upload_object_part
+ CloudFilesStorageDriver._upload_object_manifest = mocked__upload_object_manifest
+
+ parts = 5
+ file_path = os.path.abspath(__file__)
+ chunk_size = int(math.ceil(float(os.path.getsize(file_path)) / parts))
+ container = Container(name='foo_bar_container', extra={}, driver=self)
+ object_name = 'foo_test_upload'
+ self.driver.ex_multipart_upload_object(file_path=file_path, container=container,
+ object_name=object_name, chunk_size=chunk_size)
+
+ CloudFilesStorageDriver._upload_object_part = _upload_object_part
+ CloudFilesStorageDriver._upload_object_manifest = _upload_object_manifest
+
+ self.assertEqual(mocked__upload_object_part.call_count, parts)
+ self.assertTrue(mocked__upload_object_manifest.call_count, 1)
+
+ def test__upload_object_part(self):
+ _put_object = CloudFilesStorageDriver._put_object
+ mocked__put_object = mock.Mock(return_value="test")
+ CloudFilesStorageDriver._put_object = mocked__put_object
+
+ part_number = 7
+ object_name = "test_object"
+ expected_name = object_name + '/%08d' % part_number
+ container = Container(name='foo_bar_container', extra={}, driver=self)
+
+ self.driver._upload_object_part(container, object_name,
+ part_number, None)
+
+ CloudFilesStorageDriver._put_object = _put_object
+
+ func_kwargs = tuple(mocked__put_object.call_args)[1]
+ self.assertEquals(func_kwargs['object_name'], expected_name)
+ self.assertEquals(func_kwargs['container'], container)
+
+ def test__upload_object_manifest(self):
+ hash_function = self.driver._get_hash_function()
+ hash_function.update('')
+ data_hash = hash_function.hexdigest()
+
+ fake_response = type('CloudFilesResponse', (), {'headers':
+ {'etag': data_hash}
+ })
+
+ _request = self.driver.connection.request
+ mocked_request = mock.Mock(return_value=fake_response)
+ self.driver.connection.request = mocked_request
+
+ container = Container(name='foo_bar_container', extra={}, driver=self)
+ object_name = "test_object"
+
+ self.driver._upload_object_manifest(container, object_name)
+
+ func_args, func_kwargs = tuple(mocked_request.call_args)
+
+ self.driver.connection.request = _request
+
+ self.assertEquals(func_args[0], "/" + container.name + "/" + object_name)
+ self.assertEquals(func_kwargs["headers"]["X-Object-Manifest"],
+ container.name + "/" + object_name + "/")
+ self.assertEquals(func_kwargs["method"], "PUT")
+
+ def test__upload_object_manifest_wrong_hash(self):
+ fake_response = type('CloudFilesResponse', (), {'headers':
+ {'etag': '0000000'}})
+
+ _request = self.driver.connection.request
+ mocked_request = mock.Mock(return_value=fake_response)
+ self.driver.connection.request = mocked_request
+
+ container = Container(name='foo_bar_container', extra={}, driver=self)
+ object_name = "test_object"
+
+
+ try:
+ self.driver._upload_object_manifest(container, object_name)
+ except ObjectHashMismatchError:
+ pass
+ else:
+ self.fail('Exception was not thrown')
+ finally:
+ self.driver.connection.request = _request
+
def _remove_test_file(self):
file_path = os.path.abspath(__file__) + '.temp'
@@ -459,6 +571,7 @@ class CloudFilesTests(unittest.TestCase)
except OSError:
pass
+
class CloudFilesMockHttp(StorageMockHttp):
fixtures = StorageFileFixtures('cloudfiles')