You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by mc...@apache.org on 2013/10/29 04:53:38 UTC

[48/50] [abbrv] git commit: updated refs/heads/object_store_migration to 5ec2a44

CLOUDSTACK-4817: fix s3 multipart uplaod

Conflicts:

	plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/89d6e7ed
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/89d6e7ed
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/89d6e7ed

Branch: refs/heads/object_store_migration
Commit: 89d6e7ed66e92ded84e85d57a7b2681fd088c20b
Parents: 2cac1aa
Author: Edison Su <su...@gmail.com>
Authored: Mon Oct 28 17:31:49 2013 -0700
Committer: Edison Su <su...@gmail.com>
Committed: Mon Oct 28 17:31:49 2013 -0700

----------------------------------------------------------------------
 .../xen/resource/XenServerStorageProcessor.java | 11 ++--
 scripts/vm/hypervisor/xenserver/s3xen           | 68 ++++++++++++++++++--
 2 files changed, 71 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/89d6e7ed/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java
----------------------------------------------------------------------
diff --git a/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java b/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java
index 1496108..5a19aee 100644
--- a/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java
+++ b/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/XenServerStorageProcessor.java
@@ -18,6 +18,11 @@
  */
 package com.cloud.hypervisor.xen.resource;
 
+
+import static com.cloud.utils.ReflectUtil.flattenProperties;
+import static com.google.common.collect.Lists.newArrayList;
+
+
 import java.io.File;
 import java.net.URI;
 import java.util.Arrays;
@@ -82,9 +87,6 @@ import com.xensource.xenapi.VDI;
 import com.xensource.xenapi.VM;
 import com.xensource.xenapi.VMGuestMetrics;
 
-import static com.cloud.utils.ReflectUtil.flattenProperties;
-import static com.google.common.collect.Lists.newArrayList;
-
 public class XenServerStorageProcessor implements StorageProcessor {
     private static final Logger s_logger = Logger.getLogger(XenServerStorageProcessor.class);
     protected CitrixResourceBase hypervisorResource;
@@ -1091,11 +1093,12 @@ public class XenServerStorageProcessor implements StorageProcessor {
                     S3Utils.ClientOptions.class));
             // https workaround for Introspector bug that does not
             // recognize Boolean accessor methods ...
+
             parameters.addAll(Arrays.asList("operation", "put", "filename",
                     dir + "/" + filename, "iSCSIFlag",
                     iSCSIFlag.toString(), "bucket", s3.getBucketName(),
                     "key", key, "https", s3.isHttps() != null ? s3.isHttps().toString()
-                            : "null"));
+                            : "null", "maxSingleUploadSizeInBytes", String.valueOf(s3.getMaxSingleUploadSizeInBytes())));
             final String result = hypervisorResource.callHostPluginAsync(connection, "s3xen",
                     "s3", wait,
                     parameters.toArray(new String[parameters.size()]));

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/89d6e7ed/scripts/vm/hypervisor/xenserver/s3xen
----------------------------------------------------------------------
diff --git a/scripts/vm/hypervisor/xenserver/s3xen b/scripts/vm/hypervisor/xenserver/s3xen
index 372a6da..bf81bbd 100644
--- a/scripts/vm/hypervisor/xenserver/s3xen
+++ b/scripts/vm/hypervisor/xenserver/s3xen
@@ -34,6 +34,7 @@ import base64
 import hmac
 import traceback
 import urllib2
+from xml.dom.minidom import parseString
 
 import XenAPIPlugin
 sys.path.extend(["/opt/xensource/sm/"])
@@ -260,15 +261,73 @@ class S3Client(object):
                      sha).digest())[:-1]
 
         return signature, request_date
+        
+    def getText(self, nodelist):
+        rc = []
+        for node in nodelist:
+            if node.nodeType == node.TEXT_NODE:
+                rc.append(node.data)
+        return ''.join(rc)
+
+    def multiUpload(self, bucket, key, src_fileName, chunkSize=5 * 1024 * 1024):
+        uploadId={}
+        def readInitalMultipart(response):
+           data = response.read()
+           xmlResult = parseString(data) 
+           result = xmlResult.getElementsByTagName("InitiateMultipartUploadResult")[0]
+           upload = result.getElementsByTagName("UploadId")[0]
+           uploadId["0"] = upload.childNodes[0].data
+       
+        self.do_operation('POST', bucket, key + "?uploads", fn_read=readInitalMultipart) 
+
+        fileSize = os.path.getsize(src_fileName) 
+        parts = fileSize / chunkSize + ((fileSize % chunkSize) and 1)
+        part = 1
+        srcFile = open(src_fileName, 'rb')
+        etags = []
+        while part <= parts:
+            offset = part - 1
+            size = min(fileSize - offset * chunkSize, chunkSize)
+            headers = {
+                self.HEADER_CONTENT_LENGTH: size
+            }
+            def send_body(connection): 
+               srcFile.seek(offset * chunkSize)
+               block = srcFile.read(size)
+               connection.send(block)
+            def read_multiPart(response):
+               etag = response.getheader('ETag') 
+               etags.append((part, etag))
+            self.do_operation("PUT", bucket, "%s?partNumber=%s&uploadId=%s"%(key, part, uploadId["0"]), headers, send_body, read_multiPart)
+            part = part + 1
+        srcFile.close()
+
+        data = [] 
+        partXml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>"
+        for etag in etags:
+            data.append(partXml%etag)
+        msg = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>"%("".join(data))
+        size = len(msg)
+        headers = {
+            self.HEADER_CONTENT_LENGTH: size
+        }
+        def send_complete_multipart(connection):
+            connection.send(msg) 
+        self.do_operation("POST", bucket, "%s?uploadId=%s"%(key, uploadId["0"]), headers, send_complete_multipart)
 
-    def put(self, bucket, key, src_filename):
+    def put(self, bucket, key, src_filename, maxSingleUpload):
 
         if not os.path.isfile(src_filename):
             raise Exception(
                 "Attempt to put " + src_filename + " that does not exist.")
 
+        size = os.path.getsize(src_filename)
+        if size > maxSingleUpload or maxSingleUpload == 0:
+            return self.multiUpload(bucket, key, src_filename)
+           
         headers = {
             self.HEADER_CONTENT_MD5: compute_md5(src_filename),
+        
             self.HEADER_CONTENT_TYPE: 'application/octet-stream',
             self.HEADER_CONTENT_LENGTH: str(os.stat(src_filename).st_size),
         }
@@ -323,6 +382,7 @@ def parseArguments(args):
     bucket = args['bucket']
     key = args['key']
     filename = args['filename']
+    maxSingleUploadBytes = int(args["maxSingleUploadSizeInBytes"])
 
     if is_blank(operation):
         raise ValueError('An operation must be specified.')
@@ -336,18 +396,18 @@ def parseArguments(args):
     if is_blank(filename):
         raise ValueError('A filename must be specified.')
 
-    return client, operation, bucket, key, filename
+    return client, operation, bucket, key, filename, maxSingleUploadBytes
 
 
 @echo
 def s3(session, args):
 
-    client, operation, bucket, key, filename = parseArguments(args)
+    client, operation, bucket, key, filename, maxSingleUploadBytes = parseArguments(args)
 
     try:
 
         if operation == 'put':
-            client.put(bucket, key, filename)
+            client.put(bucket, key, filename, maxSingleUploadBytes)
         elif operation == 'get':
             client.get(bucket, key, filename)
         elif operation == 'delete':