You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ed...@apache.org on 2013/07/17 03:05:21 UTC
[3/4] be able to upload template into swift
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ba4c4400/scripts/storage/secondary/swift
----------------------------------------------------------------------
diff --git a/scripts/storage/secondary/swift b/scripts/storage/secondary/swift
index 8224b4d..4138db8 100755
--- a/scripts/storage/secondary/swift
+++ b/scripts/storage/secondary/swift
@@ -1,5 +1,5 @@
#!/usr/bin/python -u
-# Copyright (c) 2010-2012 OpenStack, LLC.
+# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,51 +13,896 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import signal
-import socket
-import logging
from errno import EEXIST, ENOENT
from hashlib import md5
-from optparse import OptionParser, SUPPRESS_HELP
-from os import environ, listdir, makedirs, utime, _exit as os_exit
+from optparse import OptionParser
+from os import environ, listdir, makedirs, utime
from os.path import basename, dirname, getmtime, getsize, isdir, join
-from Queue import Queue
-from random import shuffle
+from Queue import Empty, Queue
from sys import argv, exc_info, exit, stderr, stdout
-from threading import Thread
-from time import sleep, time, gmtime, strftime
+from threading import enumerate as threading_enumerate, Thread
+from time import sleep
from traceback import format_exception
-from urllib import quote, unquote
+
+
+# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
+# Inclusion of swift.common.client for convenience of single file distribution
+
+import socket
+from cStringIO import StringIO
+from re import compile, DOTALL
+from tokenize import generate_tokens, STRING, NAME, OP
+from urllib import quote as _quote, unquote
+from urlparse import urlparse, urlunparse
+
+try:
+ from eventlet.green.httplib import HTTPException, HTTPSConnection
+except ImportError:
+ from httplib import HTTPException, HTTPSConnection
try:
- import simplejson as json
+ from eventlet import sleep
except ImportError:
- import json
+ from time import sleep
-from swiftclient import Connection, ClientException, HTTPException, utils
-from swiftclient.version import version_info
+try:
+ from swift.common.bufferedhttp \
+ import BufferedHTTPConnection as HTTPConnection
+except ImportError:
+ try:
+ from eventlet.green.httplib import HTTPConnection
+ except ImportError:
+ from httplib import HTTPConnection
-def get_conn(options):
+def quote(value, safe='/'):
"""
- Return a connection building it from the options.
+ Patched version of urllib.quote that encodes utf8 strings before quoting
"""
- return Connection(options.auth,
- options.user,
- options.key,
- auth_version=options.auth_version,
- os_options=options.os_options,
- snet=options.snet,
- cacert=options.os_cacert,
- insecure=options.insecure,
- ssl_compression=options.ssl_compression)
+ if isinstance(value, unicode):
+ value = value.encode('utf8')
+ return _quote(value, safe)
+
+
+# look for a real json parser first
+try:
+ # simplejson is popular and pretty good
+ from simplejson import loads as json_loads
+except ImportError:
+ try:
+ # 2.6 will have a json module in the stdlib
+ from json import loads as json_loads
+ except ImportError:
+ # fall back on local parser otherwise
+ comments = compile(r'/\*.*\*/|//[^\r\n]*', DOTALL)
+
+ def json_loads(string):
+ '''
+ Fairly competent json parser exploiting the python tokenizer and
+ eval(). -- From python-cloudfiles
+
+ _loads(serialized_json) -> object
+ '''
+ try:
+ res = []
+ consts = {'true': True, 'false': False, 'null': None}
+ string = '(' + comments.sub('', string) + ')'
+ for type, val, _junk, _junk, _junk in \
+ generate_tokens(StringIO(string).readline):
+ if (type == OP and val not in '[]{}:,()-') or \
+ (type == NAME and val not in consts):
+ raise AttributeError()
+ elif type == STRING:
+ res.append('u')
+ res.append(val.replace('\\/', '/'))
+ else:
+ res.append(val)
+ return eval(''.join(res), {}, consts)
+ except Exception:
+ raise AttributeError()
+
+
+class ClientException(Exception):
+
+ def __init__(self, msg, http_scheme='', http_host='', http_port='',
+ http_path='', http_query='', http_status=0, http_reason='',
+ http_device=''):
+ Exception.__init__(self, msg)
+ self.msg = msg
+ self.http_scheme = http_scheme
+ self.http_host = http_host
+ self.http_port = http_port
+ self.http_path = http_path
+ self.http_query = http_query
+ self.http_status = http_status
+ self.http_reason = http_reason
+ self.http_device = http_device
+
+ def __str__(self):
+ a = self.msg
+ b = ''
+ if self.http_scheme:
+ b += '%s://' % self.http_scheme
+ if self.http_host:
+ b += self.http_host
+ if self.http_port:
+ b += ':%s' % self.http_port
+ if self.http_path:
+ b += self.http_path
+ if self.http_query:
+ b += '?%s' % self.http_query
+ if self.http_status:
+ if b:
+ b = '%s %s' % (b, self.http_status)
+ else:
+ b = str(self.http_status)
+ if self.http_reason:
+ if b:
+ b = '%s %s' % (b, self.http_reason)
+ else:
+ b = '- %s' % self.http_reason
+ if self.http_device:
+ if b:
+ b = '%s: device %s' % (b, self.http_device)
+ else:
+ b = 'device %s' % self.http_device
+ return b and '%s: %s' % (a, b) or a
+
+
+def http_connection(url, proxy=None):
+ """
+ Make an HTTPConnection or HTTPSConnection
+
+ :param url: url to connect to
+ :param proxy: proxy to connect through, if any; None by default; str of the
+ format 'http://127.0.0.1:8888' to set one
+ :returns: tuple of (parsed url, connection object)
+ :raises ClientException: Unable to handle protocol scheme
+ """
+ parsed = urlparse(url)
+ proxy_parsed = urlparse(proxy) if proxy else None
+ if parsed.scheme == 'http':
+ conn = HTTPConnection((proxy_parsed if proxy else parsed).netloc)
+ elif parsed.scheme == 'https':
+ conn = HTTPSConnection((proxy_parsed if proxy else parsed).netloc)
+ else:
+ raise ClientException('Cannot handle protocol scheme %s for url %s' %
+ (parsed.scheme, repr(url)))
+ if proxy:
+ conn._set_tunnel(parsed.hostname, parsed.port)
+ return parsed, conn
+
+
+def get_auth(url, user, key, snet=False):
+ """
+ Get authentication/authorization credentials.
+
+ The snet parameter is used for Rackspace's ServiceNet internal network
+ implementation. In this function, it simply adds *snet-* to the beginning
+ of the host name for the returned storage URL. With Rackspace Cloud Files,
+ use of this network path causes no bandwidth charges but requires the
+ client to be running on Rackspace's ServiceNet network.
+
+ :param url: authentication/authorization URL
+ :param user: user to authenticate as
+ :param key: key or password for authorization
+ :param snet: use SERVICENET internal network (see above), default is False
+ :returns: tuple of (storage URL, auth token)
+ :raises ClientException: HTTP GET request to auth URL failed
+ """
+ parsed, conn = http_connection(url)
+ conn.request('GET', parsed.path, '',
+ {'X-Auth-User': user, 'X-Auth-Key': key})
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Auth GET failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port,
+ http_path=parsed.path, http_status=resp.status,
+ http_reason=resp.reason)
+ url = resp.getheader('x-storage-url')
+ if snet:
+ parsed = list(urlparse(url))
+ # Second item in the list is the netloc
+ parsed[1] = 'snet-' + parsed[1]
+ url = urlunparse(parsed)
+ return url, resp.getheader('x-storage-token',
+ resp.getheader('x-auth-token'))
+
+
+def get_account(url, token, marker=None, limit=None, prefix=None,
+ http_conn=None, full_listing=False):
+ """
+ Get a listing of containers for the account.
+
+ :param url: storage URL
+ :param token: auth token
+ :param marker: marker query
+ :param limit: limit query
+ :param prefix: prefix query
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param full_listing: if True, return a full listing, else returns a max
+ of 10000 listings
+ :returns: a tuple of (response headers, a list of containers) The response
+ headers will be a dict and all header names will be lowercase.
+ :raises ClientException: HTTP GET request failed
+ """
+ if not http_conn:
+ http_conn = http_connection(url)
+ if full_listing:
+ rv = get_account(url, token, marker, limit, prefix, http_conn)
+ listing = rv[1]
+ while listing:
+ marker = listing[-1]['name']
+ listing = \
+ get_account(url, token, marker, limit, prefix, http_conn)[1]
+ if listing:
+ rv[1].extend(listing)
+ return rv
+ parsed, conn = http_conn
+ qs = 'format=json'
+ if marker:
+ qs += '&marker=%s' % quote(marker)
+ if limit:
+ qs += '&limit=%d' % limit
+ if prefix:
+ qs += '&prefix=%s' % quote(prefix)
+ conn.request('GET', '%s?%s' % (parsed.path, qs), '',
+ {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ resp_headers = {}
+ for header, value in resp.getheaders():
+ resp_headers[header.lower()] = value
+ if resp.status < 200 or resp.status >= 300:
+ resp.read()
+ raise ClientException('Account GET failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port,
+ http_path=parsed.path, http_query=qs, http_status=resp.status,
+ http_reason=resp.reason)
+ if resp.status == 204:
+ resp.read()
+ return resp_headers, []
+ return resp_headers, json_loads(resp.read())
+
+
+def head_account(url, token, http_conn=None):
+ """
+ Get account stats.
+
+ :param url: storage URL
+ :param token: auth token
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: a dict containing the response's headers (all header names will
+ be lowercase)
+ :raises ClientException: HTTP HEAD request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Account HEAD failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port,
+ http_path=parsed.path, http_status=resp.status,
+ http_reason=resp.reason)
+ resp_headers = {}
+ for header, value in resp.getheaders():
+ resp_headers[header.lower()] = value
+ return resp_headers
+
+
+def post_account(url, token, headers, http_conn=None):
+ """
+ Update an account's metadata.
+
+ :param url: storage URL
+ :param token: auth token
+ :param headers: additional headers to include in the request
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP POST request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ headers['X-Auth-Token'] = token
+ conn.request('POST', parsed.path, '', headers)
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Account POST failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+
+
+def get_container(url, token, container, marker=None, limit=None,
+ prefix=None, delimiter=None, http_conn=None,
+ full_listing=False):
+ """
+ Get a listing of objects for the container.
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to get a listing for
+ :param marker: marker query
+ :param limit: limit query
+ :param prefix: prefix query
+ :param delimeter: string to delimit the queries on
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param full_listing: if True, return a full listing, else returns a max
+ of 10000 listings
+ :returns: a tuple of (response headers, a list of objects) The response
+ headers will be a dict and all header names will be lowercase.
+ :raises ClientException: HTTP GET request failed
+ """
+ if not http_conn:
+ http_conn = http_connection(url)
+ if full_listing:
+ rv = get_container(url, token, container, marker, limit, prefix,
+ delimiter, http_conn)
+ listing = rv[1]
+ while listing:
+ if not delimiter:
+ marker = listing[-1]['name']
+ else:
+ marker = listing[-1].get('name', listing[-1].get('subdir'))
+ listing = get_container(url, token, container, marker, limit,
+ prefix, delimiter, http_conn)[1]
+ if listing:
+ rv[1].extend(listing)
+ return rv
+ parsed, conn = http_conn
+ path = '%s/%s' % (parsed.path, quote(container))
+ qs = 'format=json'
+ if marker:
+ qs += '&marker=%s' % quote(marker)
+ if limit:
+ qs += '&limit=%d' % limit
+ if prefix:
+ qs += '&prefix=%s' % quote(prefix)
+ if delimiter:
+ qs += '&delimiter=%s' % quote(delimiter)
+ conn.request('GET', '%s?%s' % (path, qs), '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ resp.read()
+ raise ClientException('Container GET failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_query=qs,
+ http_status=resp.status, http_reason=resp.reason)
+ resp_headers = {}
+ for header, value in resp.getheaders():
+ resp_headers[header.lower()] = value
+ if resp.status == 204:
+ resp.read()
+ return resp_headers, []
+ return resp_headers, json_loads(resp.read())
+
+
+def head_container(url, token, container, http_conn=None):
+ """
+ Get container stats.
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to get stats for
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: a dict containing the response's headers (all header names will
+ be lowercase)
+ :raises ClientException: HTTP HEAD request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ conn.request('HEAD', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container HEAD failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+ resp_headers = {}
+ for header, value in resp.getheaders():
+ resp_headers[header.lower()] = value
+ return resp_headers
+
+
+def put_container(url, token, container, headers=None, http_conn=None):
+ """
+ Create a container
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to create
+ :param headers: additional headers to include in the request
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP PUT request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ if not headers:
+ headers = {}
+ headers['X-Auth-Token'] = token
+ conn.request('PUT', path, '', headers)
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container PUT failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+
+
+def post_container(url, token, container, headers, http_conn=None):
+ """
+ Update a container's metadata.
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to update
+ :param headers: additional headers to include in the request
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP POST request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ headers['X-Auth-Token'] = token
+ conn.request('POST', path, '', headers)
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container POST failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+
+
+def delete_container(url, token, container, http_conn=None):
+ """
+ Delete a container
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name to delete
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP DELETE request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s' % (parsed.path, quote(container))
+ conn.request('DELETE', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Container DELETE failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+
+
+def get_object(url, token, container, name, http_conn=None,
+ resp_chunk_size=None):
+ """
+ Get an object
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to get
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param resp_chunk_size: if defined, chunk size of data to read. NOTE: If
+ you specify a resp_chunk_size you must fully read
+ the object's contents before making another
+ request.
+ :returns: a tuple of (response headers, the object's contents) The response
+ headers will be a dict and all header names will be lowercase.
+ :raises ClientException: HTTP GET request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ conn.request('GET', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ if resp.status < 200 or resp.status >= 300:
+ resp.read()
+ raise ClientException('Object GET failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+ if resp_chunk_size:
+
+ def _object_body():
+ buf = resp.read(resp_chunk_size)
+ while buf:
+ yield buf
+ buf = resp.read(resp_chunk_size)
+ object_body = _object_body()
+ else:
+ object_body = resp.read()
+ resp_headers = {}
+ for header, value in resp.getheaders():
+ resp_headers[header.lower()] = value
+ return resp_headers, object_body
+
+
+def head_object(url, token, container, name, http_conn=None):
+ """
+ Get object info
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: object name to get info for
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :returns: a dict containing the response's headers (all header names will
+ be lowercase)
+ :raises ClientException: HTTP HEAD request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ conn.request('HEAD', path, '', {'X-Auth-Token': token})
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object HEAD failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+ resp_headers = {}
+ for header, value in resp.getheaders():
+ resp_headers[header.lower()] = value
+ return resp_headers
+
+
+def put_object(url, token=None, container=None, name=None, contents=None,
+ content_length=None, etag=None, chunk_size=65536,
+ content_type=None, headers=None, http_conn=None, proxy=None):
+ """
+ Put an object
+
+ :param url: storage URL
+ :param token: auth token; if None, no token will be sent
+ :param container: container name that the object is in; if None, the
+ container name is expected to be part of the url
+ :param name: object name to put; if None, the object name is expected to be
+ part of the url
+ :param contents: a string or a file like object to read object data from;
+ if None, a zero-byte put will be done
+ :param content_length: value to send as content-length header; also limits
+ the amount read from contents; if None, it will be
+ computed via the contents or chunked transfer
+ encoding will be used
+ :param etag: etag of contents; if None, no etag will be sent
+ :param chunk_size: chunk size of data to write; default 65536
+ :param content_type: value to send as content-type header; if None, no
+ content-type will be set (remote end will likely try
+ to auto-detect it)
+ :param headers: additional headers to include in the request, if any
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param proxy: proxy to connect through, if any; None by default; str of the
+ format 'http://127.0.0.1:8888' to set one
+ :returns: etag from server response
+ :raises ClientException: HTTP PUT request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url, proxy=proxy)
+ path = parsed.path
+ if container:
+ path = '%s/%s' % (path.rstrip('/'), quote(container))
+ if name:
+ path = '%s/%s' % (path.rstrip('/'), quote(name))
+ if headers:
+ headers = dict(headers)
+ else:
+ headers = {}
+ if token:
+ headers['X-Auth-Token'] = token
+ if etag:
+ headers['ETag'] = etag.strip('"')
+ if content_length is not None:
+ headers['Content-Length'] = str(content_length)
+ else:
+ for n, v in headers.iteritems():
+ if n.lower() == 'content-length':
+ content_length = int(v)
+ if content_type is not None:
+ headers['Content-Type'] = content_type
+ if not contents:
+ headers['Content-Length'] = '0'
+ if hasattr(contents, 'read'):
+ conn.putrequest('PUT', path)
+ for header, value in headers.iteritems():
+ conn.putheader(header, value)
+ if content_length is None:
+ conn.putheader('Transfer-Encoding', 'chunked')
+ conn.endheaders()
+ chunk = contents.read(chunk_size)
+ while chunk:
+ conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
+ chunk = contents.read(chunk_size)
+ conn.send('0\r\n\r\n')
+ else:
+ conn.endheaders()
+ left = content_length
+ while left > 0:
+ size = chunk_size
+ if size > left:
+ size = left
+ chunk = contents.read(size)
+ conn.send(chunk)
+ left -= len(chunk)
+ else:
+ conn.request('PUT', path, contents, headers)
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object PUT failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+ return resp.getheader('etag', '').strip('"')
+
+
+def post_object(url, token, container, name, headers, http_conn=None):
+ """
+ Update object metadata
+
+ :param url: storage URL
+ :param token: auth token
+ :param container: container name that the object is in
+ :param name: name of the object to update
+ :param headers: additional headers to include in the request
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :raises ClientException: HTTP POST request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url)
+ path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
+ headers['X-Auth-Token'] = token
+ conn.request('POST', path, '', headers)
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object POST failed', http_scheme=parsed.scheme,
+ http_host=conn.host, http_port=conn.port, http_path=path,
+ http_status=resp.status, http_reason=resp.reason)
+
+
+def delete_object(url, token=None, container=None, name=None, http_conn=None,
+ headers=None, proxy=None):
+ """
+ Delete object
+
+ :param url: storage URL
+ :param token: auth token; if None, no token will be sent
+ :param container: container name that the object is in; if None, the
+ container name is expected to be part of the url
+ :param name: object name to delete; if None, the object name is expected to
+ be part of the url
+ :param http_conn: HTTP connection object (If None, it will create the
+ conn object)
+ :param headers: additional headers to include in the request
+ :param proxy: proxy to connect through, if any; None by default; str of the
+ format 'http://127.0.0.1:8888' to set one
+ :raises ClientException: HTTP DELETE request failed
+ """
+ if http_conn:
+ parsed, conn = http_conn
+ else:
+ parsed, conn = http_connection(url, proxy=proxy)
+ path = parsed.path
+ if container:
+ path = '%s/%s' % (path.rstrip('/'), quote(container))
+ if name:
+ path = '%s/%s' % (path.rstrip('/'), quote(name))
+ if headers:
+ headers = dict(headers)
+ else:
+ headers = {}
+ if token:
+ headers['X-Auth-Token'] = token
+ conn.request('DELETE', path, '', headers)
+ resp = conn.getresponse()
+ resp.read()
+ if resp.status < 200 or resp.status >= 300:
+ raise ClientException('Object DELETE failed',
+ http_scheme=parsed.scheme, http_host=conn.host,
+ http_port=conn.port, http_path=path, http_status=resp.status,
+ http_reason=resp.reason)
+
+
+class Connection(object):
+ """Convenience class to make requests that will also retry the request"""
+
+ def __init__(self, authurl, user, key, retries=5, preauthurl=None,
+ preauthtoken=None, snet=False, starting_backoff=1):
+ """
+ :param authurl: authenitcation URL
+ :param user: user name to authenticate as
+ :param key: key/password to authenticate with
+ :param retries: Number of times to retry the request before failing
+ :param preauthurl: storage URL (if you have already authenticated)
+ :param preauthtoken: authentication token (if you have already
+ authenticated)
+ :param snet: use SERVICENET internal network default is False
+ """
+ self.authurl = authurl
+ self.user = user
+ self.key = key
+ self.retries = retries
+ self.http_conn = None
+ self.url = preauthurl
+ self.token = preauthtoken
+ self.attempts = 0
+ self.snet = snet
+ self.starting_backoff = starting_backoff
+
+ def get_auth(self):
+ return get_auth(self.authurl, self.user, self.key, snet=self.snet)
+
+ def http_connection(self):
+ return http_connection(self.url)
+
+ def _retry(self, reset_func, func, *args, **kwargs):
+ self.attempts = 0
+ backoff = self.starting_backoff
+ while self.attempts <= self.retries:
+ self.attempts += 1
+ try:
+ if not self.url or not self.token:
+ self.url, self.token = self.get_auth()
+ self.http_conn = None
+ if not self.http_conn:
+ self.http_conn = self.http_connection()
+ kwargs['http_conn'] = self.http_conn
+ rv = func(self.url, self.token, *args, **kwargs)
+ return rv
+ except (socket.error, HTTPException):
+ if self.attempts > self.retries:
+ raise
+ self.http_conn = None
+ except ClientException, err:
+ if self.attempts > self.retries:
+ raise
+ if err.http_status == 401:
+ self.url = self.token = None
+ if self.attempts > 1:
+ raise
+ elif err.http_status == 408:
+ self.http_conn = None
+ elif 500 <= err.http_status <= 599:
+ pass
+ else:
+ raise
+ sleep(backoff)
+ backoff *= 2
+ if reset_func:
+ reset_func(func, *args, **kwargs)
+
+ def head_account(self):
+ """Wrapper for :func:`head_account`"""
+ return self._retry(None, head_account)
+
+ def get_account(self, marker=None, limit=None, prefix=None,
+ full_listing=False):
+ """Wrapper for :func:`get_account`"""
+ # TODO(unknown): With full_listing=True this will restart the entire
+ # listing with each retry. Need to make a better version that just
+ # retries where it left off.
+ return self._retry(None, get_account, marker=marker, limit=limit,
+ prefix=prefix, full_listing=full_listing)
+
+ def post_account(self, headers):
+ """Wrapper for :func:`post_account`"""
+ return self._retry(None, post_account, headers)
+
+ def head_container(self, container):
+ """Wrapper for :func:`head_container`"""
+ return self._retry(None, head_container, container)
+
+ def get_container(self, container, marker=None, limit=None, prefix=None,
+ delimiter=None, full_listing=False):
+ """Wrapper for :func:`get_container`"""
+ # TODO(unknown): With full_listing=True this will restart the entire
+ # listing with each retry. Need to make a better version that just
+ # retries where it left off.
+ return self._retry(None, get_container, container, marker=marker,
+ limit=limit, prefix=prefix, delimiter=delimiter,
+ full_listing=full_listing)
+
+ def put_container(self, container, headers=None):
+ """Wrapper for :func:`put_container`"""
+ return self._retry(None, put_container, container, headers=headers)
+
+ def post_container(self, container, headers):
+ """Wrapper for :func:`post_container`"""
+ return self._retry(None, post_container, container, headers)
+
+ def delete_container(self, container):
+ """Wrapper for :func:`delete_container`"""
+ return self._retry(None, delete_container, container)
+
+ def head_object(self, container, obj):
+ """Wrapper for :func:`head_object`"""
+ return self._retry(None, head_object, container, obj)
+
+ def get_object(self, container, obj, resp_chunk_size=None):
+ """Wrapper for :func:`get_object`"""
+ return self._retry(None, get_object, container, obj,
+ resp_chunk_size=resp_chunk_size)
+
+ def put_object(self, container, obj, contents, content_length=None,
+ etag=None, chunk_size=65536, content_type=None,
+ headers=None):
+ """Wrapper for :func:`put_object`"""
+
+ def _default_reset(*args, **kwargs):
+ raise ClientException('put_object(%r, %r, ...) failure and no '
+ 'ability to reset contents for reupload.' % (container, obj))
+
+ reset_func = _default_reset
+ tell = getattr(contents, 'tell', None)
+ seek = getattr(contents, 'seek', None)
+ if tell and seek:
+ orig_pos = tell()
+ reset_func = lambda *a, **k: seek(orig_pos)
+ elif not contents:
+ reset_func = lambda *a, **k: None
+
+ return self._retry(reset_func, put_object, container, obj, contents,
+ content_length=content_length, etag=etag, chunk_size=chunk_size,
+ content_type=content_type, headers=headers)
+
+ def post_object(self, container, obj, headers):
+ """Wrapper for :func:`post_object`"""
+ return self._retry(None, post_object, container, obj, headers)
+
+ def delete_object(self, container, obj):
+ """Wrapper for :func:`delete_object`"""
+ return self._retry(None, delete_object, container, obj)
+
+# End inclusion of swift.common.client
+# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
def mkdirs(path):
try:
makedirs(path)
- except OSError as err:
+ except OSError, err:
if err.errno != EEXIST:
raise
@@ -80,64 +925,39 @@ def put_errors_from_threads(threads, error_queue):
return was_error
-class StopWorkerThreadSignal(object):
- pass
-
-
class QueueFunctionThread(Thread):
def __init__(self, queue, func, *args, **kwargs):
- """
- Calls func for each item in queue; func is called with a queued
- item as the first arg followed by *args and **kwargs. Use the
- PriorityQueue for sending quit signal when Ctrl-C is pressed.
- """
+ """ Calls func for each item in queue; func is called with a queued
+ item as the first arg followed by *args and **kwargs. Use the abort
+ attribute to have the thread empty the queue (without processing)
+ and exit. """
Thread.__init__(self)
+ self.abort = False
self.queue = queue
self.func = func
self.args = args
self.kwargs = kwargs
self.exc_infos = []
- self.results = []
- self.store_results = kwargs.pop('store_results', False)
def run(self):
- while True:
- try:
- item = self.queue.get()
- if isinstance(item, StopWorkerThreadSignal):
- break
- except:
- # This catch is important and it may occur when ctrl-C is
- # pressed, in this case simply quit the thread
- break
- else:
+ try:
+ while True:
try:
- self.func(item, *self.args, **self.kwargs)
- except Exception:
- self.exc_infos.append(exc_info())
-
-
-def shutdown_worker_threads(queue, thread_list):
- """
- Takes a job queue and a list of associated QueueFunctionThread objects,
- puts a StopWorkerThreadSignal object into the queue, and waits for the
- queue to flush.
- """
- for thread in [t for t in thread_list if t.isAlive()]:
- queue.put(StopWorkerThreadSignal())
-
- while any(map(QueueFunctionThread.is_alive, thread_list)):
- sleep(0.05)
-
-
-def immediate_exit(signum, frame):
- stderr.write(" Aborted\n")
- os_exit(2)
+ item = self.queue.get_nowait()
+ if not self.abort:
+ self.func(item, *self.args, **self.kwargs)
+ self.queue.task_done()
+ except Empty:
+ if self.abort:
+ break
+ sleep(0.01)
+ except Exception:
+ self.exc_infos.append(exc_info())
st_delete_help = '''
-delete [options] --all OR delete container [options] [object] [object] ...
+delete --all OR delete container [--leave-segments] [object] [object] ...
Deletes everything in the account (with --all), or everything in a
container, or a list of objects depending on the args given. Segments of
manifest objects will be deleted as well, unless you specify the
@@ -145,21 +965,12 @@ delete [options] --all OR delete container [options] [object] [object] ...
def st_delete(parser, args, print_queue, error_queue):
- parser.add_option(
- '-a', '--all', action='store_true', dest='yes_all',
+ parser.add_option('-a', '--all', action='store_true', dest='yes_all',
default=False, help='Indicates that you really want to delete '
'everything in the account')
- parser.add_option(
- '', '--leave-segments', action='store_true',
- dest='leave_segments', default=False,
- help='Indicates that you want the segments of manifest'
- 'objects left alone')
- parser.add_option(
- '', '--object-threads', type=int,
- default=10, help='Number of threads to use for deleting objects')
- parser.add_option('', '--container-threads', type=int,
- default=10, help='Number of threads to use for '
- 'deleting containers')
+ parser.add_option('', '--leave-segments', action='store_true',
+ dest='leave_segments', default=False, help='Indicates that you want '
+ 'the segments of manifest objects left alone')
(options, args) = parse_args(parser, args)
args = args[1:]
if (not args and not options.yes_all) or (args and options.yes_all):
@@ -181,34 +992,32 @@ def st_delete(parser, args, print_queue, error_queue):
def _delete_object((container, obj), conn):
try:
old_manifest = None
- query_string = None
if not options.leave_segments:
try:
- headers = conn.head_object(container, obj)
- old_manifest = headers.get('x-object-manifest')
- if utils.config_true_value(
- headers.get('x-static-large-object')):
- query_string = 'multipart-manifest=delete'
- except ClientException as err:
+ old_manifest = conn.head_object(container, obj).get(
+ 'x-object-manifest')
+ except ClientException, err:
if err.http_status != 404:
raise
- conn.delete_object(container, obj, query_string=query_string)
+ conn.delete_object(container, obj)
if old_manifest:
segment_queue = Queue(10000)
scontainer, sprefix = old_manifest.split('/', 1)
- scontainer = unquote(scontainer)
- sprefix = unquote(sprefix).rstrip('/') + '/'
for delobj in conn.get_container(scontainer,
prefix=sprefix)[1]:
segment_queue.put((scontainer, delobj['name']))
if not segment_queue.empty():
- segment_threads = [QueueFunctionThread(
- segment_queue,
+ segment_threads = [QueueFunctionThread(segment_queue,
_delete_segment, create_connection()) for _junk in
- xrange(options.object_threads)]
+ xrange(10)]
for thread in segment_threads:
thread.start()
- shutdown_worker_threads(segment_queue, segment_threads)
+ while not segment_queue.empty():
+ sleep(0.01)
+ for thread in segment_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
put_errors_from_threads(segment_threads, error_queue)
if options.verbose:
path = options.yes_all and join(container, obj) or obj
@@ -219,7 +1028,7 @@ def st_delete(parser, args, print_queue, error_queue):
(path, conn.attempts))
else:
print_queue.put(path)
- except ClientException as err:
+ except ClientException, err:
if err.http_status != 404:
raise
error_queue.put('Object %s not found' %
@@ -239,109 +1048,97 @@ def st_delete(parser, args, print_queue, error_queue):
object_queue.put((container, obj))
marker = objects[-1]
while not object_queue.empty():
- sleep(0.05)
+ sleep(0.01)
attempts = 1
while True:
try:
conn.delete_container(container)
break
- except ClientException as err:
+ except ClientException, err:
if err.http_status != 409:
raise
if attempts > 10:
raise
attempts += 1
sleep(1)
- except ClientException as err:
+ except ClientException, err:
if err.http_status != 404:
raise
error_queue.put('Container %s not found' % repr(container))
- create_connection = lambda: get_conn(options)
- object_threads = \
- [QueueFunctionThread(object_queue, _delete_object, create_connection())
- for _junk in xrange(options.object_threads)]
+ url, token = get_auth(options.auth, options.user, options.key,
+ snet=options.snet)
+ create_connection = lambda: Connection(options.auth, options.user,
+ options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
+ object_threads = [QueueFunctionThread(object_queue, _delete_object,
+ create_connection()) for _junk in xrange(10)]
for thread in object_threads:
thread.start()
- container_threads = \
- [QueueFunctionThread(container_queue, _delete_container,
- create_connection())
- for _junk in xrange(options.container_threads)]
+ container_threads = [QueueFunctionThread(container_queue,
+ _delete_container, create_connection()) for _junk in xrange(10)]
for thread in container_threads:
thread.start()
+ if not args:
+ conn = create_connection()
+ try:
+ marker = ''
+ while True:
+ containers = \
+ [c['name'] for c in conn.get_account(marker=marker)[1]]
+ if not containers:
+ break
+ for container in containers:
+ container_queue.put(container)
+ marker = containers[-1]
+ while not container_queue.empty():
+ sleep(0.01)
+ while not object_queue.empty():
+ sleep(0.01)
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Account not found')
+ elif len(args) == 1:
+ if '/' in args[0]:
+ print >> stderr, 'WARNING: / in container name; you might have ' \
+ 'meant %r instead of %r.' % \
+ (args[0].replace('/', ' ', 1), args[0])
+ conn = create_connection()
+ _delete_container(args[0], conn)
+ else:
+ for obj in args[1:]:
+ object_queue.put((args[0], obj))
+ while not container_queue.empty():
+ sleep(0.01)
+ for thread in container_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+ put_errors_from_threads(container_threads, error_queue)
+ while not object_queue.empty():
+ sleep(0.01)
+ for thread in object_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+ put_errors_from_threads(object_threads, error_queue)
- try:
- if not args:
- conn = create_connection()
- try:
- marker = ''
- while True:
- containers = [
- c['name'] for c in conn.get_account(marker=marker)[1]]
- if not containers:
- break
- for container in containers:
- container_queue.put(container)
- marker = containers[-1]
- except ClientException as err:
- if err.http_status != 404:
- raise
- error_queue.put('Account not found')
- elif len(args) == 1:
- if '/' in args[0]:
- print >> stderr, 'WARNING: / in container name; you might ' \
- 'have meant %r instead of %r.' % (
- args[0].replace('/', ' ', 1), args[0])
- conn = create_connection()
- _delete_container(args[0], conn)
- else:
- for obj in args[1:]:
- object_queue.put((args[0], obj))
- finally:
- shutdown_worker_threads(container_queue, container_threads)
- put_errors_from_threads(container_threads, error_queue)
- shutdown_worker_threads(object_queue, object_threads)
- put_errors_from_threads(object_threads, error_queue)
+st_download_help = '''
+download --all OR download container [options] [object] [object] ...
+ Downloads everything in the account (with --all), or everything in a
+ container, or a list of objects depending on the args given. For a single
+ object download, you may use the -o [--output] <filename> option to
+ redirect the output to a specific file or if "-" then just redirect to
+ stdout.'''.strip('\n')
-st_download_help = '''
-download --all [options] OR download container [options] [object] [object] ...
- Downloads everything in the account (with --all), or everything in all
- containers in the account matching a prefix (with --all and -p [--prefix]),
- or everything in a container, or a subset of a container with -p
- [--prefix], or a list of objects depending on the args given. -p or
- --prefix is an option that will only download items beginning with that
- prefix. For a single object download, you may use the -o [--output]
- <filename> option to redirect the output to a specific file or if "-" then
- just redirect to stdout.'''.strip('\n')
-
-
-def st_download(parser, args, print_queue, error_queue):
- parser.add_option(
- '-a', '--all', action='store_true', dest='yes_all',
+def st_download(options, args, print_queue, error_queue):
+ parser.add_option('-a', '--all', action='store_true', dest='yes_all',
default=False, help='Indicates that you really want to download '
'everything in the account')
- parser.add_option(
- '-m', '--marker', dest='marker',
- default='', help='Marker to use when starting a container or '
- 'account download')
- parser.add_option(
- '-p', '--prefix', dest='prefix',
- help='Will only download items beginning with the prefix')
- parser.add_option(
- '-o', '--output', dest='out_file', help='For a single '
+ parser.add_option('-o', '--output', dest='out_file', help='For a single '
'file download, stream the output to an alternate location ')
- parser.add_option(
- '', '--object-threads', type=int,
- default=10, help='Number of threads to use for downloading objects')
- parser.add_option(
- '', '--container-threads', type=int, default=10,
- help='Number of threads to use for listing containers')
- parser.add_option(
- '', '--no-download', action='store_true',
- default=False,
- help="Perform download(s), but don't actually write anything to disk")
(options, args) = parse_args(parser, args)
args = args[1:]
if options.out_file == '-':
@@ -364,10 +1161,8 @@ def st_download(parser, args, print_queue, error_queue):
else:
raise Exception("Invalid queue_arg length of %s" % len(queue_arg))
try:
- start_time = time()
headers, body = \
conn.get_object(container, obj, resp_chunk_size=65536)
- header_receipt = time()
content_type = headers.get('content-type')
if 'content-length' in headers:
content_length = int(headers.get('content-length'))
@@ -378,13 +1173,12 @@ def st_download(parser, args, print_queue, error_queue):
if path[:1] in ('/', '\\'):
path = path[1:]
md5sum = None
- make_dir = not options.no_download and out_file != "-"
+ make_dir = out_file != "-"
if content_type.split(';', 1)[0] == 'text/directory':
if make_dir and not isdir(path):
mkdirs(path)
read_length = 0
- if 'x-object-manifest' not in headers and \
- 'x-static-large-object' not in headers:
+ if 'x-object-manifest' not in headers:
md5sum = md5()
for chunk in body:
read_length += len(chunk)
@@ -394,47 +1188,37 @@ def st_download(parser, args, print_queue, error_queue):
dirpath = dirname(path)
if make_dir and dirpath and not isdir(dirpath):
mkdirs(dirpath)
- if not options.no_download:
- if out_file == "-":
- fp = stdout
- elif out_file:
- fp = open(out_file, 'wb')
- else:
- fp = open(path, 'wb')
+ if out_file == "-":
+ fp = stdout
+ elif out_file:
+ fp = open(out_file, 'wb')
+ else:
+ fp = open(path, 'wb')
read_length = 0
- if 'x-object-manifest' not in headers and \
- 'x-static-large-object' not in headers:
+ if 'x-object-manifest' not in headers:
md5sum = md5()
for chunk in body:
- if not options.no_download:
- fp.write(chunk)
+ fp.write(chunk)
read_length += len(chunk)
if md5sum:
md5sum.update(chunk)
- if not options.no_download:
- fp.close()
+ fp.close()
if md5sum and md5sum.hexdigest() != etag:
error_queue.put('%s: md5sum != etag, %s != %s' %
(path, md5sum.hexdigest(), etag))
if content_length is not None and read_length != content_length:
error_queue.put('%s: read_length != content_length, %d != %d' %
(path, read_length, content_length))
- if 'x-object-meta-mtime' in headers and not options.out_file \
- and not options.no_download:
-
+ if 'x-object-meta-mtime' in headers and not options.out_file:
mtime = float(headers['x-object-meta-mtime'])
utime(path, (mtime, mtime))
if options.verbose:
- finish_time = time()
- time_str = 'headers %.3fs, total %.3fs, %.3fs MB/s' % (
- header_receipt - start_time, finish_time - start_time,
- float(read_length) / (finish_time - start_time) / 1000000)
if conn.attempts > 1:
- print_queue.put('%s [%s after %d attempts]' %
- (path, time_str, conn.attempts))
+ print_queue.put('%s [after %d attempts' %
+ (path, conn.attempts))
else:
- print_queue.put('%s [%s]' % (path, time_str))
- except ClientException as err:
+ print_queue.put(path)
+ except ClientException, err:
if err.http_status != 404:
raise
error_queue.put('Object %s not found' %
@@ -442,212 +1226,120 @@ def st_download(parser, args, print_queue, error_queue):
container_queue = Queue(10000)
- def _download_container(container, conn, prefix=None):
+ def _download_container(container, conn):
try:
- marker = options.marker
+ marker = ''
while True:
- objects = [
- o['name'] for o in
- conn.get_container(container, marker=marker,
- prefix=prefix)[1]]
+ objects = [o['name'] for o in
+ conn.get_container(container, marker=marker)[1]]
if not objects:
break
- marker = objects[-1]
- shuffle(objects)
for obj in objects:
object_queue.put((container, obj))
- except ClientException as err:
+ marker = objects[-1]
+ except ClientException, err:
if err.http_status != 404:
raise
error_queue.put('Container %s not found' % repr(container))
- create_connection = lambda: get_conn(options)
- object_threads = [QueueFunctionThread(
- object_queue, _download_object,
- create_connection()) for _junk in xrange(options.object_threads)]
+ url, token = get_auth(options.auth, options.user, options.key,
+ snet=options.snet)
+ create_connection = lambda: Connection(options.auth, options.user,
+ options.key, preauthurl=url, preauthtoken=token, snet=options.snet)
+ object_threads = [QueueFunctionThread(object_queue, _download_object,
+ create_connection()) for _junk in xrange(10)]
for thread in object_threads:
thread.start()
- container_threads = [QueueFunctionThread(
- container_queue,
- _download_container, create_connection())
- for _junk in xrange(options.container_threads)]
+ container_threads = [QueueFunctionThread(container_queue,
+ _download_container, create_connection()) for _junk in xrange(10)]
for thread in container_threads:
thread.start()
-
- # We musn't let the main thread die with an exception while non-daemonic
- # threads exist or the process with hang and ignore Ctrl-C. So we catch
- # anything and tidy up the threads in a finally block.
- try:
- if not args:
- # --all case
- conn = create_connection()
- try:
- marker = options.marker
- while True:
- containers = [
- c['name'] for c in conn.get_account(
- marker=marker, prefix=options.prefix)[1]]
- if not containers:
- break
- marker = containers[-1]
- shuffle(containers)
- for container in containers:
- container_queue.put(container)
- except ClientException as err:
- if err.http_status != 404:
- raise
- error_queue.put('Account not found')
- elif len(args) == 1:
- if '/' in args[0]:
- print >> stderr, ('WARNING: / in container name; you might '
- 'have meant %r instead of %r.' % (
- args[0].replace('/', ' ', 1), args[0]))
- _download_container(args[0], create_connection(),
- options.prefix)
- else:
- if len(args) == 2:
- obj = args[1]
- object_queue.put((args[0], obj, options.out_file))
- else:
- for obj in args[1:]:
- object_queue.put((args[0], obj))
- finally:
- shutdown_worker_threads(container_queue, container_threads)
- put_errors_from_threads(container_threads, error_queue)
-
- shutdown_worker_threads(object_queue, object_threads)
- put_errors_from_threads(object_threads, error_queue)
-
-
-def prt_bytes(bytes, human_flag):
- """
- convert a number > 1024 to printable format, either in 4 char -h format as
- with ls -lh or return as 12 char right justified string
- """
-
- if human_flag:
- suffix = ''
- mods = 'KMGTPEZY'
- temp = float(bytes)
- if temp > 0:
- while (temp > 1023):
- temp /= 1024.0
- suffix = mods[0]
- mods = mods[1:]
- if suffix != '':
- if temp >= 10:
- bytes = '%3d%s' % (temp, suffix)
- else:
- bytes = '%.1f%s' % (temp, suffix)
- if suffix == '': # must be < 1024
- bytes = '%4s' % bytes
+ if not args:
+ conn = create_connection()
+ try:
+ marker = ''
+ while True:
+ containers = [c['name']
+ for c in conn.get_account(marker=marker)[1]]
+ if not containers:
+ break
+ for container in containers:
+ container_queue.put(container)
+ marker = containers[-1]
+ except ClientException, err:
+ if err.http_status != 404:
+ raise
+ error_queue.put('Account not found')
+ elif len(args) == 1:
+ if '/' in args[0]:
+ print >> stderr, 'WARNING: / in container name; you might have ' \
+ 'meant %r instead of %r.' % \
+ (args[0].replace('/', ' ', 1), args[0])
+ _download_container(args[0], create_connection())
else:
- bytes = '%12s' % bytes
-
- return(bytes)
+ if len(args) == 2:
+ obj = args[1]
+ object_queue.put((args[0], obj, options.out_file))
+ else:
+ for obj in args[1:]:
+ object_queue.put((args[0], obj))
+ while not container_queue.empty():
+ sleep(0.01)
+ for thread in container_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+ put_errors_from_threads(container_threads, error_queue)
+ while not object_queue.empty():
+ sleep(0.01)
+ for thread in object_threads:
+ thread.abort = True
+ while thread.isAlive():
+ thread.join(0.01)
+ put_errors_from_threads(object_threads, error_queue)
st_list_help = '''
list [options] [container]
Lists the containers for the account or the objects for a container. -p or
--prefix is an option that will only list items beginning with that prefix.
- -l produces output formatted like 'ls -l' and --lh like 'ls -lh'.
- -t used with -l or --lh, only report totals
-d or --delimiter is option (for container listings only) that will roll up
- items with the given delimiter (see http://docs.openstack.org/
- api/openstack-object-storage/1.0/content/list-objects.html)
+ items with the given delimiter (see Cloud Files general documentation for
+ what this means).
'''.strip('\n')
-def st_list(parser, args, print_queue, error_queue):
- parser.add_option(
- '-l', '--long', dest='long', help='Long listing '
- 'similar to ls -l command', action='store_true', default=False)
- parser.add_option(
- '--lh', dest='human', help='report sizes as human '
- "similar to ls -lh switch, but -h taken", action='store_true',
- default=False)
- parser.add_option(
- '-t', dest='totals', help='used with -l or --ls, only report totals',
- action='store_true', default=False)
- parser.add_option(
- '-p', '--prefix', dest='prefix',
- help='Will only list items beginning with the prefix')
- parser.add_option(
- '-d', '--delimiter', dest='delimiter',
- help='Will roll up items with the given delimiter'
- ' (see OpenStack Swift API documentation for what this means)')
+def st_list(options, args, print_queue, error_queue):
+ parser.add_option('-p', '--prefix', dest='prefix', help='Will only list '
+ 'items beginning with the prefix')
+ parser.add_option('-d', '--delimiter', dest='delimiter', help='Will roll '
+ 'up items with the given delimiter (see Cloud Files general '
+ 'documentation for what this means)')
(options, args) = parse_args(parser, args)
args = args[1:]
if options.delimiter and not args:
exit('-d option only allowed for container listings')
- if len(args) > 1 or len(args) == 1 and args[0].find('/') >= 0:
+ if len(args) > 1:
error_queue.put('Usage: %s [options] %s' %
(basename(argv[0]), st_list_help))
return
-
- conn = get_conn(options)
+ conn = Connection(options.auth, options.user, options.key,
+ snet=options.snet)
try:
marker = ''
- total_count = total_bytes = 0
while True:
if not args:
items = \
conn.get_account(marker=marker, prefix=options.prefix)[1]
else:
- items = conn.get_container(
- args[0], marker=marker,
+ items = conn.get_container(args[0], marker=marker,
prefix=options.prefix, delimiter=options.delimiter)[1]
if not items:
break
for item in items:
- item_name = item.get('name')
-
- if not options.long and not options.human:
- print_queue.put(item.get('name', item.get('subdir')))
- else:
- item_bytes = item.get('bytes')
- total_bytes += item_bytes
- if len(args) == 0: # listing containers
- bytes = prt_bytes(item_bytes, options.human)
- count = item.get('count')
- total_count += count
- try:
- meta = conn.head_container(item_name)
- utc = gmtime(float(meta.get('x-timestamp')))
- datestamp = strftime('%Y-%m-%d %H:%M:%S', utc)
- except ClientException:
- datestamp = '????-??-?? ??:??:??'
- if not options.totals:
- print_queue.put("%5s %s %s %s" %
- (count, bytes, datestamp,
- item_name))
- else: # list container contents
- subdir = item.get('subdir')
- if subdir is None:
- bytes = prt_bytes(item_bytes, options.human)
- date, xtime = item.get('last_modified').split('T')
- xtime = xtime.split('.')[0]
- else:
- bytes = prt_bytes(0, options.human)
- date = xtime = ''
- item_name = subdir
- if not options.totals:
- print_queue.put("%s %10s %8s %s" %
- (bytes, date, xtime, item_name))
-
- marker = items[-1].get('name', items[-1].get('subdir'))
-
- # report totals
- if options.long or options.human:
- if len(args) == 0:
- print_queue.put("%5s %s" % (prt_bytes(total_count, True),
- prt_bytes(total_bytes,
- options.human)))
- else:
- print_queue.put("%s" % (prt_bytes(total_bytes, options.human)))
-
- except ClientException as err:
+ print_queue.put(item.get('name', item.get('subdir')))
+ marker = items[-1].get('name', items[-1].get('subdir'))
+ except ClientException, err:
if err.http_status != 404:
raise
if not args:
@@ -655,20 +1347,17 @@ def st_list(parser, args, print_queue, error_queue):
else:
error_queue.put('Container %s not found' % repr(args[0]))
+
st_stat_help = '''
stat [container] [object]
Displays information for the account, container, or object depending on the
- args given (if any). --lh will print number of objects and total sizes
- like 'list --lh' noting number of objs a multiple of 1024'''.strip('\n')
+ args given (if any).'''.strip('\n')
-def st_stat(parser, args, print_queue, error_queue):
- parser.add_option(
- '--lh', dest='human', help="report totals like 'list --lh'",
- action='store_true', default=False)
+def st_stat(options, args, print_queue, error_queue):
(options, args) = parse_args(parser, args)
args = args[1:]
- conn = get_conn(options)
+ conn = Connection(options.auth, options.user, options.key)
if not args:
try:
headers = conn.head_account()
@@ -678,20 +1367,17 @@ StorageURL: %s
Auth Token: %s
'''.strip('\n') % (conn.url, conn.token))
container_count = int(headers.get('x-account-container-count', 0))
- object_count = prt_bytes(headers.get('x-account-object-count', 0),
- options.human).lstrip()
- bytes_used = prt_bytes(headers.get('x-account-bytes-used', 0),
- options.human).lstrip()
+ object_count = int(headers.get('x-account-object-count', 0))
+ bytes_used = int(headers.get('x-account-bytes-used', 0))
print_queue.put('''
Account: %s
Containers: %d
- Objects: %s
- Bytes: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], container_count,
+ Objects: %d
+ Bytes: %d'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], container_count,
object_count, bytes_used))
for key, value in headers.items():
if key.startswith('x-account-meta-'):
- print_queue.put(
- '%10s: %s' % ('Meta %s' %
+ print_queue.put('%10s: %s' % ('Meta %s' %
key[len('x-account-meta-'):].title(), value))
for key, value in headers.items():
if not key.startswith('x-account-meta-') and key not in (
@@ -699,7 +1385,7 @@ Containers: %d
'x-account-object-count', 'x-account-bytes-used'):
print_queue.put(
'%10s: %s' % (key.title(), value))
- except ClientException as err:
+ except ClientException, err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
@@ -710,16 +1396,13 @@ Containers: %d
(args[0].replace('/', ' ', 1), args[0])
try:
headers = conn.head_container(args[0])
- object_count = prt_bytes(
- headers.get('x-container-object-count', 0),
- options.human).lstrip()
- bytes_used = prt_bytes(headers.get('x-container-bytes-used', 0),
- options.human).lstrip()
+ object_count = int(headers.get('x-container-object-count', 0))
+ bytes_used = int(headers.get('x-container-bytes-used', 0))
print_queue.put('''
Account: %s
Container: %s
- Objects: %s
- Bytes: %s
+ Objects: %d
+ Bytes: %d
Read ACL: %s
Write ACL: %s
Sync To: %s
@@ -731,8 +1414,7 @@ Write ACL: %s
headers.get('x-container-sync-key', '')))
for key, value in headers.items():
if key.startswith('x-container-meta-'):
- print_queue.put(
- '%9s: %s' % ('Meta %s' %
+ print_queue.put('%9s: %s' % ('Meta %s' %
key[len('x-container-meta-'):].title(), value))
for key, value in headers.items():
if not key.startswith('x-container-meta-') and key not in (
@@ -742,7 +1424,7 @@ Write ACL: %s
'x-container-sync-key'):
print_queue.put(
'%9s: %s' % (key.title(), value))
- except ClientException as err:
+ except ClientException, err:
if err.http_status != 404:
raise
error_queue.put('Container %s not found' % repr(args[0]))
@@ -757,8 +1439,7 @@ Write ACL: %s
args[1], headers.get('content-type')))
if 'content-length' in headers:
print_queue.put('Content Length: %s' %
- prt_bytes(headers['content-length'],
- options.human).lstrip())
+ headers['content-length'])
if 'last-modified' in headers:
print_queue.put(' Last Modified: %s' %
headers['last-modified'])
@@ -769,8 +1450,7 @@ Write ACL: %s
headers['x-object-manifest'])
for key, value in headers.items():
if key.startswith('x-object-meta-'):
- print_queue.put(
- '%14s: %s' % ('Meta %s' %
+ print_queue.put('%14s: %s' % ('Meta %s' %
key[len('x-object-meta-'):].title(), value))
for key, value in headers.items():
if not key.startswith('x-object-meta-') and key not in (
@@ -778,7 +1458,7 @@ Write ACL: %s
'etag', 'date', 'x-object-manifest'):
print_queue.put(
'%14s: %s' % (key.title(), value))
- except ClientException as err:
+ except ClientException, err:
if err.http_status != 404:
raise
error_queue.put('Object %s not found' %
@@ -799,41 +1479,35 @@ post [options] [container] [object]
post -m Color:Blue -m Size:Large'''.strip('\n')
-def st_post(parser, args, print_queue, error_queue):
- parser.add_option(
- '-r', '--read-acl', dest='read_acl', help='Sets the '
+def st_post(options, args, print_queue, error_queue):
+ parser.add_option('-r', '--read-acl', dest='read_acl', help='Sets the '
'Read ACL for containers. Quick summary of ACL syntax: .r:*, '
'.r:-.example.com, .r:www.example.com, account1, account2:user2')
- parser.add_option(
- '-w', '--write-acl', dest='write_acl', help='Sets the '
+ parser.add_option('-w', '--write-acl', dest='write_acl', help='Sets the '
'Write ACL for containers. Quick summary of ACL syntax: account1, '
'account2:user2')
- parser.add_option(
- '-t', '--sync-to', dest='sync_to', help='Sets the '
+ parser.add_option('-t', '--sync-to', dest='sync_to', help='Sets the '
'Sync To for containers, for multi-cluster replication.')
- parser.add_option(
- '-k', '--sync-key', dest='sync_key', help='Sets the '
+ parser.add_option('-k', '--sync-key', dest='sync_key', help='Sets the '
'Sync Key for containers, for multi-cluster replication.')
- parser.add_option(
- '-m', '--meta', action='append', dest='meta', default=[],
+ parser.add_option('-m', '--meta', action='append', dest='meta', default=[],
help='Sets a meta data item with the syntax name:value. This option '
'may be repeated. Example: -m Color:Blue -m Size:Large')
- parser.add_option(
- '-H', '--header', action='append', dest='header',
- default=[], help='Set request headers with the syntax header:value. '
- ' This option may be repeated. Example -H content-type:text/plain '
- '-H "Content-Length: 4000"')
(options, args) = parse_args(parser, args)
args = args[1:]
if (options.read_acl or options.write_acl or options.sync_to or
- options.sync_key) and not args:
+ options.sync_key) and not args:
exit('-r, -w, -t, and -k options only allowed for containers')
- conn = get_conn(options)
+ conn = Connection(options.auth, options.user, options.key)
if not args:
- headers = split_headers(options.meta, 'X-Account-Meta-', error_queue)
+ headers = {}
+ for item in options.meta:
+ split_item = item.split(':')
+ headers['X-Account-Meta-' + split_item[0]] = \
+ len(split_item) > 1 and split_item[1]
try:
conn.post_account(headers=headers)
- except ClientException as err:
+ except ClientException, err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
@@ -842,7 +1516,11 @@ def st_post(parser, args, print_queue, error_queue):
print >> stderr, 'WARNING: / in container name; you might have ' \
'meant %r instead of %r.' % \
(args[0].replace('/', ' ', 1), args[0])
- headers = split_headers(options.meta, 'X-Container-Meta-', error_queue)
+ headers = {}
+ for item in options.meta:
+ split_item = item.split(':')
+ headers['X-Container-Meta-' + split_item[0]] = \
+ len(split_item) > 1 and split_item[1]
if options.read_acl is not None:
headers['X-Container-Read'] = options.read_acl
if options.write_acl is not None:
@@ -853,17 +1531,19 @@ def st_post(parser, args, print_queue, error_queue):
headers['X-Container-Sync-Key'] = options.sync_key
try:
conn.post_container(args[0], headers=headers)
- except ClientException as err:
+ except ClientException, err:
if err.http_status != 404:
raise
conn.put_container(args[0], headers=headers)
elif len(args) == 2:
- headers = split_headers(options.meta, 'X-Object-Meta-', error_queue)
- # add header options to the headers object for the request.
- headers.update(split_headers(options.header, '', error_queue))
+ headers = {}
+ for item in options.meta:
+ split_item = item.split(':')
+ headers['X-Object-Meta-' + split_item[0]] = \
+ len(split_item) > 1 and split_item[1]
try:
conn.post_object(args[0], args[1], headers=headers)
- except ClientException as err:
+ except ClientException, err:
if err.http_status != 404:
raise
error_queue.put('Object %s not found' %
@@ -878,48 +1558,24 @@ upload [options] container file_or_directory [file_or_directory] [...]
Uploads to the given container the files and directories specified by the
remaining args. -c or --changed is an option that will only upload files
that have changed since the last upload. -S <size> or --segment-size <size>
- will upload the files in segments no larger than size. -C <container> or
- --segment-container <container> will specify the location of the segments
- to <container>. --leave-segments are options as well (see --help for more).
+ and --leave-segments are options as well (see --help for more).
'''.strip('\n')
-def st_upload(parser, args, print_queue, error_queue):
- parser.add_option(
- '-c', '--changed', action='store_true', dest='changed',
+def st_upload(options, args, print_queue, error_queue):
+ parser.add_option('-c', '--changed', action='store_true', dest='changed',
default=False, help='Will only upload files that have changed since '
'the last upload')
- parser.add_option(
- '-S', '--segment-size', dest='segment_size', help='Will '
+ parser.add_option('-S', '--segment-size', dest='segment_size', help='Will '
'upload files in segments no larger than <size> and then create a '
'"manifest" file that will download all the segments as if it were '
- 'the original file.')
- parser.add_option(
- '-C', '--segment-container', dest='segment_container',
- help='Will upload the segments into the specified container.'
- 'If not specified, the segments will be uploaded to '
+ 'the original file. The segments will be uploaded to a '
'<container>_segments container so as to not pollute the main '
'<container> listings.')
- parser.add_option(
- '', '--leave-segments', action='store_true',
+ parser.add_option('', '--leave-segments', action='store_true',
dest='leave_segments', default=False, help='Indicates that you want '
'the older segments of manifest objects left alone (in the case of '
'overwrites)')
- parser.add_option(
- '', '--object-threads', type=int, default=10,
- help='Number of threads to use for uploading full objects')
- parser.add_option(
- '', '--segment-threads', type=int, default=10,
- help='Number of threads to use for uploading object segments')
- parser.add_option(
- '-H', '--header', action='append', dest='header',
- default=[], help='Set request headers with the syntax header:value. '
- ' This option may be repeated. Example -H content-type:text/plain '
- '-H "Content-Length: 4000"')
- parser.add_option('', '--use-slo', action='store_true', default=False,
- help='When used in conjuction with --segment-size will '
- 'create a Static Large Object instead of the default '
- 'Dynamic Large Object.')
(options, args) = parse_args(parser, args)
args = args[1:]
if len(args) < 2:
@@ -934,21 +1590,14 @@ def st_upload(parser, args, print_queue, error_queue):
else:
fp = open(job['path'], 'rb')
fp.seek(job['segment_start'])
- seg_container = args[0] + '_segments'
- if options.segment_container:
- seg_container = options.segment_container
- etag = conn.put_object(job.get('container', seg_container),
- job['obj'], fp,
- content_length=job['segment_size'])
- job['segment_location'] = '/%s/%s' % (seg_container, job['obj'])
- job['segment_etag'] = etag
+ conn.put_object(job.get('container', args[0] + '_segments'),
+ job['obj'], fp, content_length=job['segment_size'])
if options.verbose and 'log_line' in job:
if conn.attempts > 1:
print_queue.put('%s [after %d attempts]' %
(job['log_line'], conn.attempts))
else:
print_queue.put(job['log_line'])
- return job
def _object_job(job, conn):
path = job['path']
@@ -960,7 +1609,7 @@ def st_upload(parser, args, print_queue, error_queue):
obj = obj[2:]
if obj.startswith('/'):
obj = obj[1:]
- put_headers = {'x-object-meta-mtime': "%f" % getmtime(path)}
+ put_headers = {'x-object-meta-mtime': str(getmtime(path))}
if dir_marker:
if options.changed:
try:
@@ -974,7 +1623,7 @@ def st_upload(parser, args, print_queue, error_queue):
et == 'd41d8cd98f00b204e9800998ecf8427e' and \
mt == put_headers['x-object-meta-mtime']:
return
- except ClientException as err:
+ except ClientException, err:
if err.http_status != 404:
raise
conn.put_object(container, obj, '', content_length=0,
@@ -985,8 +1634,6 @@ def st_upload(parser, args, print_queue, error_queue):
# manifest object and need to delete the old segments
# ourselves.
old_manifest = None
- old_slo_manifest_paths = []
- new_slo_manifest_paths = set()
if options.changed or not options.leave_segments:
try:
headers = conn.head_object(container, obj)
@@ -997,134 +1644,73 @@ def st_upload(parser, args, print_queue, error_queue):
return
if not options.leave_segments:
old_manifest = headers.get('x-object-manifest')
- if utils.config_true_value(
- headers.get('x-static-large-object')):
- headers, manifest_data = conn.get_object(
- container, obj,
- query_string='multipart-manifest=get')
- for old_seg in json.loads(manifest_data):
- seg_path = old_seg['name'].lstrip('/')
- if isinstance(seg_path, unicode):
- seg_path = seg_path.encode('utf-8')
- old_slo_manifest_paths.append(seg_path)
- except ClientException as err:
+ except ClientException, err:
if err.http_status != 404:
raise
- # Merge the command line header options to the put_headers
- put_headers.update(split_headers(options.header, '',
- error_queue))
- # Don't do segment job if object is not big enough
if options.segment_size and \
- getsize(path) > int(options.segment_size):
- seg_container = container + '_segments'
- if options.segment_container:
- seg_container = options.segment_container
+ getsize(path) < options.segment_size:
full_size = getsize(path)
segment_queue = Queue(10000)
- segment_threads = [
- QueueFunctionThread(
- segment_queue, _segment_job,
- create_connection(), store_results=True)
- for _junk in xrange(options.segment_threads)]
+ segment_threads = [QueueFunctionThread(segment_queue,
+ _segment_job, create_connection()) for _junk in
+ xrange(10)]
for thread in segment_threads:
thread.start()
- try:
- segment = 0
- segment_start = 0
- while segment_start < full_size:
- segment_size = int(options.segment_size)
- if segment_start + segment_size > full_size:
- segment_size = full_size - segment_start
- if options.use_slo:
- segment_name = '%s/slo/%s/%s/%s/%08d' % (
- obj, put_headers['x-object-meta-mtime'],
- full_size, options.segment_size, segment)
- else:
- segment_name = '%s/%s/%s/%s/%08d' % (
- obj, put_headers['x-object-meta-mtime'],
- full_size, options.segment_size, segment)
- segment_queue.put(
- {'path': path, 'obj': segment_name,
- 'segment_start': segment_start,
- 'segment_size': segment_size,
- 'segment_index': segment,
- 'log_line': '%s segment %s' % (obj, segment)})
- segment += 1
- segment_start += segment_size
- finally:
- shutdown_worker_threads(segment_queue, segment_threads)
- if put_errors_from_threads(segment_threads,
- error_queue):
- raise ClientException(
- 'Aborting manifest creation '
- 'because not all segments could be uploaded. '
- '%s/%s' % (container, obj))
- if options.use_slo:
- slo_segments = []
- for thread in segment_threads:
- slo_segments += thread.results
- slo_segments.sort(key=lambda d: d['segment_index'])
- for seg in slo_segments:
- seg_loc = seg['segment_location'].lstrip('/')
- if isinstance(seg_loc, unicode):
- seg_loc = seg_loc.encode('utf-8')
- new_slo_manifest_paths.add(seg_loc)
-
- manifest_data = json.dumps([
- {'path': d['segment_location'],
- 'etag': d['segment_etag'],
- 'size_b
<TRUNCATED>