You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@deltacloud.apache.org by ma...@redhat.com on 2012/10/25 16:16:01 UTC

[PATCH 2/2] Adds EC2 and Openstack driver methods for segmented blob upload API

From: marios <ma...@redhat.com>

http://mariosandreou.com/deltacloud/cloud_API/2012/10/24/segmenting-huge-blobs-part-2.html

Signed-off-by: marios <ma...@redhat.com>
---
 server/lib/deltacloud/drivers/ec2/ec2_driver.rb    | 54 ++++++++++++++++------
 .../drivers/openstack/openstack_driver.rb          | 23 ++++++++-
 2 files changed, 61 insertions(+), 16 deletions(-)

diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
index c7bed05..9c10ff8 100644
--- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
+++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
@@ -507,33 +507,51 @@ module Deltacloud
         #--
         # Create Blob - NON Streaming way (i.e. was called with POST html multipart form data)
         #--
+        #also called for segmented blobs - as final call with blob manifest
         def create_blob(credentials, bucket_id, blob_id, data = nil, opts = {})
           s3_client = new_client(credentials, :s3)
           #data is a construct with the temporary file created by server @.tempfile
           #also file[:type] will give us the content-type
-          res = nil
-          # File stream needs to be reopened in binary mode for whatever reason
-          file = File::open(data[:tempfile].path, 'rb')
-          #insert ec2-specific header for user metadata ... x-amz-meta-KEY = VALUE
-          BlobHelper::rename_metadata_headers(opts, 'x-amz-meta-')
-          opts["Content-Type"] = data[:type]
-          safely do
-            res = s3_client.interface.put(bucket_id,
-                                        blob_id,
-                                        file,
-                                        opts)
+          if(opts[:segment_manifest])
+            safely do
+              s3_client.interface.complete_multipart(bucket_id, blob_id, opts[:segmented_blob_id], opts[:segment_manifest])
+            end
+          else
+            # File stream needs to be reopened in binary mode
+            file = File::open(data[:tempfile].path, 'rb')
+            #insert ec2-specific header for user metadata ... x-amz-meta-KEY = VALUE
+            BlobHelper::rename_metadata_headers(opts, 'x-amz-meta-')
+            opts["Content-Type"] = data[:type]
+            safely do
+              s3_client.interface.put(bucket_id,
+                                          blob_id,
+                                          file,
+                                          opts)
+            end
           end
           #create a new Blob object and return that
           Blob.new( { :id => blob_id,
                       :bucket => bucket_id,
-                      :content_length => data[:tempfile].length,
-                      :content_type => data[:type],
+                      :content_length => ((data && data[:tempfile]) ? data[:tempfile].length : nil),
+                      :content_type => ((data && data[:type]) ? data[:type] : nil),
                       :last_modified => '',
                       :user_metadata => opts.select{|k,v| k.match(/^x-amz-meta-/i)}
                     }
                   )
         end
 
+        def init_segmented_blob(credentials, opts={})
+          s3_client = new_client(credentials, :s3)
+          safely do
+            s3_client.interface.initiate_multipart(opts[:bucket],opts[:id])
+          end
+
+        end
+
+        def blob_segment_id(request, response)
+          response["etag"].gsub("\"", "")
+        end
+
         #--
         # Delete Blob
         #--
@@ -591,8 +609,16 @@ module Deltacloud
           timestamp = Time.now.httpdate
           string_to_sign =
             "PUT\n\n#{params[:content_type]}\n#{timestamp}\n#{signature_meta_string}/#{params[:bucket]}/#{params[:blob]}"
+          if BlobHelper.segmented_blob_op_type(params[:context]) == "segment"
+            partNumber = BlobHelper.segment_order(params[:context])
+            uploadId = BlobHelper.segmented_blob_id(params[:context])
+            segment_string = "?partNumber=#{partNumber}&uploadId=#{uploadId}"
+            string_to_sign << segment_string
+            request = Net::HTTP::Put.new("/#{params[:blob]}#{segment_string}")
+          else
+            request = Net::HTTP::Put.new("/#{params[:blob]}")
+          end
           auth_string = Aws::Utils::sign(params[:password], string_to_sign)
-          request = Net::HTTP::Put.new("/#{params[:blob]}")
           request['Host'] = "#{params[:bucket]}.#{uri.host}"
           request['Date'] = timestamp
           request['Content-Type'] = params[:content_type]
diff --git a/server/lib/deltacloud/drivers/openstack/openstack_driver.rb b/server/lib/deltacloud/drivers/openstack/openstack_driver.rb
index 04bd409..69d523b 100644
--- a/server/lib/deltacloud/drivers/openstack/openstack_driver.rb
+++ b/server/lib/deltacloud/drivers/openstack/openstack_driver.rb
@@ -263,8 +263,12 @@ module Deltacloud
         def create_blob(credentials, bucket, blob, data, opts={})
           os = new_client(credentials, :buckets)
           safely do
-            BlobHelper.rename_metadata_headers(opts, "X-Object-Meta-")
-            os_blob = os.container(bucket).create_object(blob, {:content_type=> data[:type], :metadata=>opts}, data[:tempfile])
+            if(opts[:segment_manifest]) # finalize a segmented blob upload
+              os_blob = os.container(bucket).create_object(blob, {:manifest=>"#{bucket}/#{opts[:segmented_blob_id]}"})
+            else
+              BlobHelper.rename_metadata_headers(opts, "X-Object-Meta-")
+              os_blob = os.container(bucket).create_object(blob, {:content_type=> data[:type], :metadata=>opts}, data[:tempfile])
+            end
             convert_blob(os_blob, bucket)
           end
         end
@@ -292,8 +296,23 @@ module Deltacloud
           end
         end
 
+        def init_segmented_blob(credentials, opts={})
+          opts[:id]
+        end
+
+        def blob_segment_id(request, response)
+          #could be in http header OR query string:
+          segment_order = BlobHelper.segment_order(request)
+          blob_name = request.env["PATH_INFO"].gsub(/(&\w*=\w*)*$/, "").split("/").pop
+          "#{blob_name}#{segment_order}"
+        end
+
         #params: {:user,:password,:bucket,:blob,:content_type,:content_length,:metadata}
+        #params[:context] holds the request object - for getting to blob segment params
         def blob_stream_connection(params)
+          if BlobHelper.segmented_blob_op_type(params[:context]) == "segment"
+            params[:blob] = "#{params[:blob]}#{BlobHelper.segment_order(params[:context])}"
+          end
           tokens = params[:user].split("+")
           user_name, tenant_name = tokens.first, tokens.last
           #need a client for the auth_token and endpoints
-- 
1.7.11.7