You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@deltacloud.apache.org by ma...@redhat.com on 2012/10/25 16:15:59 UTC
Segmented Blob uploads API - RFC
This is rev 1 for the 'segmented blob uploads' API.
Comparison of how the various cloud providers do this and the 'common' API I used is described @ http://mariosandreou.com/deltacloud/cloud_API/2012/10/08/segmenting-huge-blobs.html
cURL examples are @ http://mariosandreou.com/deltacloud/cloud_API/2012/10/24/segmenting-huge-blobs-part-2.html
Patches also available @ http://tracker.deltacloud.org/set/105
marios
[PATCH 1/2] Adds routes and helpers for segmented blob upload API
Posted by ma...@redhat.com.
From: marios <ma...@redhat.com>
http://mariosandreou.com/deltacloud/cloud_API/2012/10/08/segmenting-huge-blobs.html
Signed-off-by: marios <ma...@redhat.com>
---
server/lib/deltacloud/collections/buckets.rb | 35 ++++++++++++++++-
.../lib/deltacloud/helpers/blob_stream_helper.rb | 45 ++++++++++++++++++++--
2 files changed, 76 insertions(+), 4 deletions(-)
diff --git a/server/lib/deltacloud/collections/buckets.rb b/server/lib/deltacloud/collections/buckets.rb
index c503926..90f2306 100644
--- a/server/lib/deltacloud/collections/buckets.rb
+++ b/server/lib/deltacloud/collections/buckets.rb
@@ -49,8 +49,41 @@ module Deltacloud::Collections
end
end
+ put "/segmented_blob_operation/:bucket/:blob" do
+ case BlobHelper.segmented_blob_op_type(request)
+ when "init" then
+ segmented_blob_id = driver.init_segmented_blob(credentials, {:id => params[:blob],
+ :bucket => params[:bucket]})
+ headers["X-Deltacloud-SegmentedBlob"] = segmented_blob_id
+ status 204
+ when "segment" then
+ if env["BLOB_SUCCESS"] # set by blob_stream_helper after succesful stream PUT
+ #segment already uploaded by blob_streaming_patch - setting blob_segment-id from the response
+ headers["X-Deltacloud-BlobSegmentId"] = request.env["BLOB_SEGMENT_ID"] # set in blob_stream_helper: 203
+ status 204
+ else
+ report_error(400) # likely blob size < 112 KB (didn't hit streaming patch)
+ end
+ when "finalize" then
+ bucket_id = params[:bucket]
+ blob_id = params[:blob]
+ segmented_blob_manifest = BlobHelper.extract_segmented_blob_manifest(request)
+ segmented_blob_id = BlobHelper.segmented_blob_id(request)
+ @blob = driver.create_blob(credentials, params[:bucket], params[:blob], nil, {:segment_manifest=>segmented_blob_manifest, :segmented_blob_id=>segmented_blob_id})
+ respond_to do |format|
+ format.xml { haml :"blobs/show" }
+ format.html { haml :"blobs/show" }
+ format.json { xml_to_json 'blobs/show' }
+ end
+ else
+ report_error(500)
+ end
+ end
+
put "/buckets/:bucket/:blob" do
- if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
+ if BlobHelper.segmented_blob(request)
+ status, headers, body = call!(env.merge("PATH_INFO" => "/segmented_blob_operation/#{params[:bucket]}/#{params[:blob]}"))
+ elsif(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
content_type = env["CONTENT_TYPE"]
content_type ||= ""
@blob = driver.blob(credentials, {:id => params[:blob],
diff --git a/server/lib/deltacloud/helpers/blob_stream_helper.rb b/server/lib/deltacloud/helpers/blob_stream_helper.rb
index dc1d230..88c6c4d 100644
--- a/server/lib/deltacloud/helpers/blob_stream_helper.rb
+++ b/server/lib/deltacloud/helpers/blob_stream_helper.rb
@@ -77,6 +77,43 @@ DELTACLOUD_BLOBMETA_HEADER = /HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i
metadata.gsub_keys(DELTACLOUD_BLOBMETA_HEADER, rename_to)
end
+ #in the following segment* methods, using context.env["QUERY_STRING"] rather than context.params so it works for both Thin and Sinatra request objects (streaming)
+ def self.segmented_blob(request_context)
+ return true if (request_context.env["HTTP_X_DELTACLOUD_BLOBTYPE"] == 'segmented' || request_context.env["QUERY_STRING"].match(/blob_type=segmented/))
+ false
+ end
+
+ def self.segment_order(request_context)
+ (request_context.env["HTTP_X_DELTACLOUD_SEGMENTORDER"] || request_context.env["QUERY_STRING"].match(/segment_order=(\w)*/){|m| m[0].split("=").pop})
+ end
+
+ def self.segmented_blob_id(request_context)
+ (request_context.env["HTTP_X_DELTACLOUD_SEGMENTEDBLOB"] || request_context.env["QUERY_STRING"].match(/segmented_blob=(\w)*/){|m| m[0].split("=").pop})
+ end
+
+ def self.segmented_blob_op_type(request_context)
+ is_segmented = segmented_blob(request_context)
+ blob_id = segmented_blob_id(request_context)
+ segment_order = segment_order(request_context)
+ #if blob_type=segmented AND segmented_blob_id AND segment_order then it is a "SEGMENT"
+ #if blob_type=segmented AND segmented_blob_id then it is a "FINALIZE"
+ #if blob_type=segmented then it is "INIT"
+ return "segment" if (is_segmented && blob_id && segment_order)
+ return "finalize" if (is_segmented && blob_id)
+ return "init" if is_segmented
+ nil # should explode something instead
+ end
+
+ #in "1=abc , 2=def , 3=ghi"
+ #out {"1"=>"abc", "2"=>"def", "3"=>"ghi"}
+ def self.extract_segmented_blob_manifest(request)
+ manifest_hash = request.body.read.split(",").inject({}) do |res,current|
+ k,v=current.strip.split("=")
+ res[k]=v
+ res
+ end
+ end
+
end
#Monkey patch for streaming blobs:
@@ -158,11 +195,10 @@ class BlobStreamIO
@content_length = request.env['CONTENT_LENGTH']
@http, provider_request = Deltacloud::API.driver.blob_stream_connection({:user=>user,
:password=>password, :bucket=>bucket, :blob=>blob, :metadata=> user_meta,
- :content_type=>content_type, :content_length=>@content_length })
+ :content_type=>content_type, :content_length=>@content_length, :context=>request })
@content_length = @content_length.to_i #for comparison of size in '<< (data)'
@sock = @http.request(provider_request, nil, true)
end
-
def << (data)
@sock.write(data)
@size += data.length
@@ -170,6 +206,9 @@ class BlobStreamIO
result = @http.end_request
if result.is_a?(Net::HTTPSuccess)
@client_request.env["BLOB_SUCCESS"] = "true"
+ if BlobHelper.segmented_blob_op_type(@client_request) == "segment"
+ @client_request.env["BLOB_SEGMENT_ID"] = Deltacloud::API.driver.blob_segment_id(@client_request, result)
+ end
else
@client_request.env["BLOB_FAIL"] = result.body
end
@@ -195,7 +234,7 @@ class BlobStreamIO
private
def parse_bucket_blob(request_string)
- array = request_string.split("/")
+ array = request_string.gsub(/(&\w*=\w*)*$/, "").split("/")
blob = array.pop
bucket = array.pop
return bucket, blob
--
1.7.11.7
Re: Segmented Blob uploads API - RFC
Posted by David Lutterkort <lu...@redhat.com>.
On Thu, 2012-10-25 at 17:15 +0300, marios@redhat.com wrote:
> This is rev 1 for the 'segmented blob uploads' API.
>
> Comparison of how the various cloud providers do this and the 'common' API I used is described @ http://mariosandreou.com/deltacloud/cloud_API/2012/10/08/segmenting-huge-blobs.html
>
> cURL examples are @ http://mariosandreou.com/deltacloud/cloud_API/2012/10/24/segmenting-huge-blobs-part-2.html
>
> Patches also available @ http://tracker.deltacloud.org/set/105
ACK; testing the whole segmented blob business in a unit test is
probably going to be a huge headache - but can we put some unit tests in
that test some of the machinery used here, e.g. some of the BlobHelper
methods ?
David
[PATCH 2/2] Adds EC2 and Openstack driver methods for segmented blob upload API
Posted by ma...@redhat.com.
From: marios <ma...@redhat.com>
http://mariosandreou.com/deltacloud/cloud_API/2012/10/24/segmenting-huge-blobs-part-2.html
Signed-off-by: marios <ma...@redhat.com>
---
server/lib/deltacloud/drivers/ec2/ec2_driver.rb | 54 ++++++++++++++++------
.../drivers/openstack/openstack_driver.rb | 23 ++++++++-
2 files changed, 61 insertions(+), 16 deletions(-)
diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
index c7bed05..9c10ff8 100644
--- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
+++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
@@ -507,33 +507,51 @@ module Deltacloud
#--
# Create Blob - NON Streaming way (i.e. was called with POST html multipart form data)
#--
+ #also called for segmented blobs - as final call with blob manifest
def create_blob(credentials, bucket_id, blob_id, data = nil, opts = {})
s3_client = new_client(credentials, :s3)
#data is a construct with the temporary file created by server @.tempfile
#also file[:type] will give us the content-type
- res = nil
- # File stream needs to be reopened in binary mode for whatever reason
- file = File::open(data[:tempfile].path, 'rb')
- #insert ec2-specific header for user metadata ... x-amz-meta-KEY = VALUE
- BlobHelper::rename_metadata_headers(opts, 'x-amz-meta-')
- opts["Content-Type"] = data[:type]
- safely do
- res = s3_client.interface.put(bucket_id,
- blob_id,
- file,
- opts)
+ if(opts[:segment_manifest])
+ safely do
+ s3_client.interface.complete_multipart(bucket_id, blob_id, opts[:segmented_blob_id], opts[:segment_manifest])
+ end
+ else
+ # File stream needs to be reopened in binary mode
+ file = File::open(data[:tempfile].path, 'rb')
+ #insert ec2-specific header for user metadata ... x-amz-meta-KEY = VALUE
+ BlobHelper::rename_metadata_headers(opts, 'x-amz-meta-')
+ opts["Content-Type"] = data[:type]
+ safely do
+ s3_client.interface.put(bucket_id,
+ blob_id,
+ file,
+ opts)
+ end
end
#create a new Blob object and return that
Blob.new( { :id => blob_id,
:bucket => bucket_id,
- :content_length => data[:tempfile].length,
- :content_type => data[:type],
+ :content_length => ((data && data[:tempfile]) ? data[:tempfile].length : nil),
+ :content_type => ((data && data[:type]) ? data[:type] : nil),
:last_modified => '',
:user_metadata => opts.select{|k,v| k.match(/^x-amz-meta-/i)}
}
)
end
+ def init_segmented_blob(credentials, opts={})
+ s3_client = new_client(credentials, :s3)
+ safely do
+ s3_client.interface.initiate_multipart(opts[:bucket],opts[:id])
+ end
+
+ end
+
+ def blob_segment_id(request, response)
+ response["etag"].gsub("\"", "")
+ end
+
#--
# Delete Blob
#--
@@ -591,8 +609,16 @@ module Deltacloud
timestamp = Time.now.httpdate
string_to_sign =
"PUT\n\n#{params[:content_type]}\n#{timestamp}\n#{signature_meta_string}/#{params[:bucket]}/#{params[:blob]}"
+ if BlobHelper.segmented_blob_op_type(params[:context]) == "segment"
+ partNumber = BlobHelper.segment_order(params[:context])
+ uploadId = BlobHelper.segmented_blob_id(params[:context])
+ segment_string = "?partNumber=#{partNumber}&uploadId=#{uploadId}"
+ string_to_sign << segment_string
+ request = Net::HTTP::Put.new("/#{params[:blob]}#{segment_string}")
+ else
+ request = Net::HTTP::Put.new("/#{params[:blob]}")
+ end
auth_string = Aws::Utils::sign(params[:password], string_to_sign)
- request = Net::HTTP::Put.new("/#{params[:blob]}")
request['Host'] = "#{params[:bucket]}.#{uri.host}"
request['Date'] = timestamp
request['Content-Type'] = params[:content_type]
diff --git a/server/lib/deltacloud/drivers/openstack/openstack_driver.rb b/server/lib/deltacloud/drivers/openstack/openstack_driver.rb
index 04bd409..69d523b 100644
--- a/server/lib/deltacloud/drivers/openstack/openstack_driver.rb
+++ b/server/lib/deltacloud/drivers/openstack/openstack_driver.rb
@@ -263,8 +263,12 @@ module Deltacloud
def create_blob(credentials, bucket, blob, data, opts={})
os = new_client(credentials, :buckets)
safely do
- BlobHelper.rename_metadata_headers(opts, "X-Object-Meta-")
- os_blob = os.container(bucket).create_object(blob, {:content_type=> data[:type], :metadata=>opts}, data[:tempfile])
+ if(opts[:segment_manifest]) # finalize a segmented blob upload
+ os_blob = os.container(bucket).create_object(blob, {:manifest=>"#{bucket}/#{opts[:segmented_blob_id]}"})
+ else
+ BlobHelper.rename_metadata_headers(opts, "X-Object-Meta-")
+ os_blob = os.container(bucket).create_object(blob, {:content_type=> data[:type], :metadata=>opts}, data[:tempfile])
+ end
convert_blob(os_blob, bucket)
end
end
@@ -292,8 +296,23 @@ module Deltacloud
end
end
+ def init_segmented_blob(credentials, opts={})
+ opts[:id]
+ end
+
+ def blob_segment_id(request, response)
+ #could be in http header OR query string:
+ segment_order = BlobHelper.segment_order(request)
+ blob_name = request.env["PATH_INFO"].gsub(/(&\w*=\w*)*$/, "").split("/").pop
+ "#{blob_name}#{segment_order}"
+ end
+
#params: {:user,:password,:bucket,:blob,:content_type,:content_length,:metadata}
+ #params[:context] holds the request object - for getting to blob segment params
def blob_stream_connection(params)
+ if BlobHelper.segmented_blob_op_type(params[:context]) == "segment"
+ params[:blob] = "#{params[:blob]}#{BlobHelper.segment_order(params[:context])}"
+ end
tokens = params[:user].split("+")
user_name, tenant_name = tokens.first, tokens.last
#need a client for the auth_token and endpoints
--
1.7.11.7