You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@deltacloud.apache.org by ma...@redhat.com on 2011/05/20 18:09:31 UTC
[PATCH] Adds blob streaming uploads using PUT (client ---STREAM---> deltacloud ---STREAM---> provider)
From: marios <ma...@redhat.com>
Signed-off-by: marios <ma...@redhat.com>
---
server/lib/deltacloud/drivers/ec2/ec2_driver.rb | 4 +-
server/lib/deltacloud/helpers/blob_stream.rb | 168 +++++++++++++++++++++++
server/server.rb | 40 +++++-
server/views/blobs/new.html.haml | 14 +-
4 files changed, 210 insertions(+), 16 deletions(-)
diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
index 14c5829..a31d358 100644
--- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
+++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
@@ -1,3 +1,5 @@
+# Copyright (C) 2009, 2010 Red Hat, Inc.
+#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership. The
@@ -396,7 +398,7 @@ module Deltacloud
end
#--
- # Create Blob
+ # Create Blob - NON Streaming way (i.e. was called with POST html multipart form data)
#--
def create_blob(credentials, bucket_id, blob_id, data = nil, opts = {})
s3_client = new_client(credentials, :s3)
diff --git a/server/lib/deltacloud/helpers/blob_stream.rb b/server/lib/deltacloud/helpers/blob_stream.rb
index fce14d0..a96b9ca 100644
--- a/server/lib/deltacloud/helpers/blob_stream.rb
+++ b/server/lib/deltacloud/helpers/blob_stream.rb
@@ -79,3 +79,171 @@ class Hash
end #def
end #class
+
+#Monkey patch for streaming blobs:
+# Normally a client will upload a blob to deltacloud and thin will put this into a tempfile.
+# Then deltacloud would stream up to the provider: client =-->>TEMP_FILE-->> deltacloud =-->>STREAM-->> provider
+# Instead we want to recognise that this is a 'Post blob' operation and start streaming to the provider as
+# the request is received: client =-->>STREAM-->> deltacloud =-->>STREAM-->> provider
+module Thin
+ class Request
+
+ alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile
+ private
+ def move_body_to_tempfile
+ if BlobStreamIO::is_put_blob(self)
+ @body = BlobStreamIO.new(self)
+ else
+ move_body_to_tempfile_orig
+ end
+ end
+
+ end
+end
+
+require 'net/http'
+#monkey patch for Net:HTTP
+module Net
+ class HTTP
+
+ alias :request_orig :request
+
+ @blob_req = nil # needs global scope for close op later
+
+ def request(req, body = nil, blob_stream = nil, &block)
+ unless blob_stream
+ return request_orig(req, body, &block)
+ end
+ @blob_req = req
+ do_start #start the connection
+
+ req.set_body_internal body
+ begin_transport req
+ req.write_header_m @socket,@curr_http_version, edit_path(req.path)
+ @socket
+ end
+
+ class Put < HTTPRequest
+ def write_header_m(sock, ver, path)
+ write_header(sock, ver, path)
+ end
+ end
+
+ def end_request
+ begin
+ res = HTTPResponse.read_new(@socket)
+ end while res.kind_of?(HTTPContinue)
+ res.reading_body(@socket, @blob_req.response_body_permitted?) {
+ yield res if block_given?
+ }
+ end_transport @blob_req, res
+ do_finish
+ res
+ end
+ end
+
+end
+
+
+
+
+require 'aws'
+require 'uri'
+class BlobStreamIO
+
+ attr_accessor :size, :provider, :sock
+
+ def initialize(request)
+ @request = request
+ @size = 0
+ provider = new_provider(request.driver.to_s)
+ bucket, blob = parse_args(request.env["PATH_INFO"], :bucket)
+ user, password = parse_args(request.env['HTTP_AUTHORIZATION'], :credentials)
+ content_type = request.env['CONTENT_TYPE']
+ content_type ||= ""
+ @content_length = request.env['CONTENT_LENGTH']
+ #user_meta = {}
+ #meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
+ #meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
+ uri = URI.parse(provider)
+ timestamp = Time.now.httpdate
+ string_to_sign = "PUT\n\n#{content_type}\n#{timestamp}\n/#{bucket}/#{blob}"
+ auth_string = Aws::Utils::sign(password, string_to_sign)
+ @http = Net::HTTP.new("#{bucket}.#{uri.host}", uri.port )
+ @http.use_ssl = true
+ @http.verify_mode = OpenSSL::SSL::VERIFY_NONE
+ @provider_request = Net::HTTP::Put.new("/#{blob}")
+ @provider_request['Host'] = "#{bucket}.#{uri.host}"
+ @provider_request['Date'] = timestamp
+ @provider_request['Content-Type'] = content_type
+ @provider_request['Content-Length'] = @content_length
+ @provider_request['Authorization'] = "AWS #{user}:#{auth_string}"
+ @provider_request['Expect'] = "100-continue"
+# @provider_request.body_stream = @buf
+# true is the flag for blob_stream
+ @content_length = @content_length.to_i
+ @sock = @http.request(@provider_request, nil, true)
+ end
+
+ def << (data)
+ @sock.write(data)
+ @size += data.length
+ if (@size >= @content_length)
+ result = @http.end_request
+ if result.inspect =~ (/Net::HTTPOK 200 OK/)
+ @request.env["BLOB_SUCCESS"] = "true"
+ else
+ @request.env["BLOB_FAIL"] = result.body
+ end
+ end
+ end
+
+ def rewind
+ puts "total size counted was #{size}"
+ end
+
+ #use the Request.env hash (populated by the ThinParser) to
+ #determine whether this is a post blob operation
+ #by definition, only get here with a body of > 112kbytes - thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32)
+ #request.env['PATH_INFO'] === "/api/buckets/mynewcoolbucketwoo"
+ def self.is_put_blob(request = nil)
+ path = request.env['PATH_INFO']
+ method = request.env['REQUEST_METHOD']
+ if ( ((path =~ /^\/api\/buckets/i) == 0) &&
+ ( method == 'PUT') )
+ return true
+ else
+ return false
+ end
+ end
+
+ private
+
+ def parse_args(request_string, element)
+ case element
+ when :bucket
+ array = request_string.split("/")
+ blob = array.pop
+ bucket = array.pop
+ return bucket, blob
+ when :credentials
+ decoded = Base64.decode64(request_string.split('Basic ').last)
+ key = decoded.split(':').first
+ pass = decoded.split(':').last
+ return key, pass
+ else
+ nil
+ end
+ end
+
+ def new_provider(driver_name)
+ provider = case driver_name
+ when (/EC2/) then "https://s3.amazonaws.com"
+ when (/Rackspace/) then "Rax"
+ when (/Azure/) then "Azure"
+ else
+ "UNKNOWN" # should blow something up here - noisily
+ end
+ provider
+ end
+end
diff --git a/server/server.rb b/server/server.rb
index 86dd524..eef4218 100644
--- a/server/server.rb
+++ b/server/server.rb
@@ -696,10 +696,32 @@ get '/api/buckets/:bucket/new_blob' do
end
end
-#create a new blob
+#create a new blob using PUT - streams through deltacloud
+put '/api/buckets/:bucket/:blob' do
+ if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
+ content_type = env["CONTENT_TYPE"]
+ content_type ||= ""
+ @blob = Blob.new({:id => params[:blob],
+ :bucket => params[:bucket],
+ :content_length => env["CONTENT_LENGTH"],
+ :content_type => content_type,
+ :last_modified => '',
+ :user_metadata => []}) #add metadata
+#debugger
+# respond_to do |format|
+# format.xml { haml :"blobs/show" }
+# end
+ @_format = :xml
+ haml :"blobs/show"
+ else
+ report_error(500) #OK?
+ end
+end
+
+#create a new blob using html interface - NON STREAMING (i.e. browser POST http form data)
post '/api/buckets/:bucket' do
bucket_id = params[:bucket]
- blob_id = params['blob_id']
+ blob_id = params['blob']
blob_data = params['blob_data']
user_meta = {}
#first try get blob_metadata from params (i.e., passed by http form post, e.g. browser)
@@ -710,11 +732,13 @@ post '/api/buckets/:bucket' do
key = "HTTP_X_Deltacloud_Blobmeta_#{key}"
value = params[:"meta_value#{i}"]
user_meta[key] = value
- end #max.each do
- else #can try to get blob_metadata from http headers
- meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
- meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
- end #end if
+ end
+ end #max.each do
+# else #can try to get blob_metadata from http headers - NO LONGER VALID (Have seperate PUT for this type of metadata headers)
+# meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
+# meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
+# end #end if
+# BlobPostStream.call(credentials, bucket_id, blob_id, env, user_meta)
@blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, user_meta)
respond_to do |format|
format.html { haml :"blobs/show"}
@@ -730,7 +754,7 @@ delete '/api/buckets/:bucket/:blob' do
respond_to do |format|
format.xml { 204 }
format.json { 204 }
- format.html { bucket_url(bucket_id) }
+ format.html { redirect(bucket_url(bucket_id)) }
end
end
diff --git a/server/views/blobs/new.html.haml b/server/views/blobs/new.html.haml
index a075f0a..bf5c6f5 100644
--- a/server/views/blobs/new.html.haml
+++ b/server/views/blobs/new.html.haml
@@ -3,13 +3,7 @@
%form{ :action => bucket_url(@bucket_id), :method => :post, :enctype => 'multipart/form-data'}
%label
Blob Name:
- %input{ :name => 'blob_id', :size => 512}/
- %label
- Blob Data:
- %br
- %input{ :type => "file", :name => 'blob_data', :size => 50}/
- %br
- %br
+ %input{ :name => 'blob', :size => 512}/
%input{ :type => "hidden", :name => "meta_params", :value => "0"}
%a{ :href => "javascript:;", :onclick => "more_fields();"} Add Metadata
%div{ :id => "metadata_holder", :style => "display: none;"}
@@ -23,4 +17,10 @@
%a{ :href => "javascript:;", :onclick => "less_fields();"} Less Metadata
%br
%br
+ %label
+ Blob Data:
+ %br
+ %input{ :type => "file", :name => 'blob_data', :size => 50}/
+ %br
+ %br
%input{ :type => :submit, :name => "commit", :value => "create"}/
--
1.7.3.4
Re: [PATCH] Adds blob streaming uploads using PUT (client
---STREAM---> deltacloud ---STREAM---> provider)
Posted by David Lutterkort <lu...@redhat.com>.
Hi Marios,
Good stuff, though I have comments ;)
First off, some stylistic remarks:
* make sure the first line of the commit message stays below 80
characters
* the same for some of the code; try to keep lines to 80 chars or
less.
* please don't use tabs for indentation, since they'll look
different for everybody. (I also cut whitespace at the end of
lines; I am more than happy to share my Emacs setup that does
all that automatically)
* I generally dislike comments like 'end # each_key do' - if they
are really necessary, it's a sign that the code probably needs
to be split up
On Fri, 2011-05-20 at 19:09 +0300, marios@redhat.com wrote:
> From: marios <ma...@redhat.com>
>
>
> Signed-off-by: marios <ma...@redhat.com>
> ---
> server/lib/deltacloud/drivers/ec2/ec2_driver.rb | 4 +-
> server/lib/deltacloud/helpers/blob_stream.rb | 168 +++++++++++++++++++++++
> server/server.rb | 40 +++++-
> server/views/blobs/new.html.haml | 14 +-
> 4 files changed, 210 insertions(+), 16 deletions(-)
>
> diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> index 14c5829..a31d358 100644
> --- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> +++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> @@ -1,3 +1,5 @@
> +# Copyright (C) 2009, 2010 Red Hat, Inc.
> +#
This needs to be removed - it'll just lead to tears for the next
release.
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements. See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership. The
> @@ -396,7 +398,7 @@ module Deltacloud
> end
>
> #--
> - # Create Blob
> + # Create Blob - NON Streaming way (i.e. was called with POST html multipart form data)
> #--
> def create_blob(credentials, bucket_id, blob_id, data = nil, opts = {})
> s3_client = new_client(credentials, :s3)
> diff --git a/server/lib/deltacloud/helpers/blob_stream.rb b/server/lib/deltacloud/helpers/blob_stream.rb
> index fce14d0..a96b9ca 100644
> --- a/server/lib/deltacloud/helpers/blob_stream.rb
> +++ b/server/lib/deltacloud/helpers/blob_stream.rb
> @@ -79,3 +79,171 @@ class Hash
> end #def
>
> end #class
> +
> +#Monkey patch for streaming blobs:
> +# Normally a client will upload a blob to deltacloud and thin will put this into a tempfile.
> +# Then deltacloud would stream up to the provider: client =-->>TEMP_FILE-->> deltacloud =-->>STREAM-->> provider
> +# Instead we want to recognise that this is a 'Post blob' operation and start streaming to the provider as
> +# the request is received: client =-->>STREAM-->> deltacloud =-->>STREAM-->> provider
Good that you added the comment - it should be formatted a little better
though to be more readable.
> +module Thin
> + class Request
> +
> + alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile
> + private
> + def move_body_to_tempfile
> + if BlobStreamIO::is_put_blob(self)
> + @body = BlobStreamIO.new(self)
> + else
> + move_body_to_tempfile_orig
> + end
> + end
> +
> + end
> +end
> +
> +require 'net/http'
> +#monkey patch for Net:HTTP
> +module Net
> + class HTTP
> +
> + alias :request_orig :request
> +
> + @blob_req = nil # needs global scope for close op later
I don't think that does what you need it to do, since this code is
evaluated once, when the class is loaded. But since @blob_req is an
instance variable that is used nowhere else, it will always be nil on
newly created Net::HTTP instances.
Strictly speaking, there's no need to clear out @blob_req; but you could
do that in end_request, after calling end_transport. That is still
vulnerable to requests being interrupted before they reach that point.
> + def request(req, body = nil, blob_stream = nil, &block)
> + unless blob_stream
> + return request_orig(req, body, &block)
> + end
> + @blob_req = req
> + do_start #start the connection
> +
> + req.set_body_internal body
> + begin_transport req
> + req.write_header_m @socket,@curr_http_version, edit_path(req.path)
> + @socket
> + end
> +
> + class Put < HTTPRequest
> + def write_header_m(sock, ver, path)
> + write_header(sock, ver, path)
> + end
> + end
> +
> + def end_request
> + begin
> + res = HTTPResponse.read_new(@socket)
> + end while res.kind_of?(HTTPContinue)
> + res.reading_body(@socket, @blob_req.response_body_permitted?) {
> + yield res if block_given?
> + }
> + end_transport @blob_req, res
> + do_finish
Set @blob_req = nil here
> + res
> + end
> + end
> +
> +end
> +
> +
> +
> +
> +require 'aws'
> +require 'uri'
> +class BlobStreamIO
> +
> + attr_accessor :size, :provider, :sock
> +
> + def initialize(request)
> + @request = request
> + @size = 0
> + provider = new_provider(request.driver.to_s)
> + bucket, blob = parse_args(request.env["PATH_INFO"], :bucket)
> + user, password = parse_args(request.env['HTTP_AUTHORIZATION'], :credentials)
> + content_type = request.env['CONTENT_TYPE']
> + content_type ||= ""
Simpler: content_type = request.env['CONTENT_TYPE'] || ""
> + @content_length = request.env['CONTENT_LENGTH']
> + #user_meta = {}
> + #meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> + #meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
Remove this if it isn't needed anymore
> + uri = URI.parse(provider)
> + timestamp = Time.now.httpdate
> + string_to_sign = "PUT\n\n#{content_type}\n#{timestamp}\n/#{bucket}/#{blob}"
> + auth_string = Aws::Utils::sign(password, string_to_sign)
> + @http = Net::HTTP.new("#{bucket}.#{uri.host}", uri.port )
> + @http.use_ssl = true
> + @http.verify_mode = OpenSSL::SSL::VERIFY_NONE
> + @provider_request = Net::HTTP::Put.new("/#{blob}")
> + @provider_request['Host'] = "#{bucket}.#{uri.host}"
> + @provider_request['Date'] = timestamp
> + @provider_request['Content-Type'] = content_type
> + @provider_request['Content-Length'] = @content_length
> + @provider_request['Authorization'] = "AWS #{user}:#{auth_string}"
> + @provider_request['Expect'] = "100-continue"
Can we move some (or all) of this stuff into the drivers ?
> +# @provider_request.body_stream = @buf
> +# true is the flag for blob_stream
> + @content_length = @content_length.to_i
> + @sock = @http.request(@provider_request, nil, true)
> + end
> +
> + def << (data)
> + @sock.write(data)
> + @size += data.length
> + if (@size >= @content_length)
> + result = @http.end_request
> + if result.inspect =~ (/Net::HTTPOK 200 OK/)
Is there really no better way to check that the request succeeded than
to depend on its string representation ?
> + @request.env["BLOB_SUCCESS"] = "true"
> + else
> + @request.env["BLOB_FAIL"] = result.body
> + end
> + end
> + end
> +
> + def rewind
> + puts "total size counted was #{size}"
> + end
Is this left over from debugging ?
> + #use the Request.env hash (populated by the ThinParser) to
> + #determine whether this is a post blob operation
> + #by definition, only get here with a body of > 112kbytes - thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32)
> + #request.env['PATH_INFO'] === "/api/buckets/mynewcoolbucketwoo"
> + def self.is_put_blob(request = nil)
> + path = request.env['PATH_INFO']
> + method = request.env['REQUEST_METHOD']
> + if ( ((path =~ /^\/api\/buckets/i) == 0) &&
> + ( method == 'PUT') )
The =~ returns nil when there is no match, which is false in Ruby; you
can write the above condition with fewer parens as
if path =~ /^\/api\/buckets/ && method == 'PUT'
Also, the URL's in the rest of the API are case-sensitive, so no need
for the /.../i
> + return true
> + else
> + return false
> + end
> + end
> +
> + private
> +
> + def parse_args(request_string, element)
> + case element
> + when :bucket
> + array = request_string.split("/")
> + blob = array.pop
> + bucket = array.pop
> + return bucket, blob
> + when :credentials
> + decoded = Base64.decode64(request_string.split('Basic ').last)
> + key = decoded.split(':').first
> + pass = decoded.split(':').last
> + return key, pass
> + else
> + nil
> + end
> + end
Why not make this two methods parse_bucket and parse_credentials, since
the two branches have nothing in common.
> + def new_provider(driver_name)
> + provider = case driver_name
> + when (/EC2/) then "https://s3.amazonaws.com"
> + when (/Rackspace/) then "Rax"
> + when (/Azure/) then "Azure"
> + else
> + "UNKNOWN" # should blow something up here - noisily
> + end
> + provider
> + end
Can't we just grab the driver/instantiate a new one and see whether it
supports streaming puts of a blob ? Going by the driver name seems
pretty fragile.
> diff --git a/server/server.rb b/server/server.rb
> index 86dd524..eef4218 100644
> --- a/server/server.rb
> +++ b/server/server.rb
> @@ -696,10 +696,32 @@ get '/api/buckets/:bucket/new_blob' do
> end
> end
>
> -#create a new blob
> +#create a new blob using PUT - streams through deltacloud
> +put '/api/buckets/:bucket/:blob' do
> + if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
> + content_type = env["CONTENT_TYPE"]
> + content_type ||= ""
> + @blob = Blob.new({:id => params[:blob],
> + :bucket => params[:bucket],
> + :content_length => env["CONTENT_LENGTH"],
> + :content_type => content_type,
> + :last_modified => '',
> + :user_metadata => []}) #add metadata
> +#debugger
> +# respond_to do |format|
> +# format.xml { haml :"blobs/show" }
> +# end
Debugging leftovers ? ;)
> + @_format = :xml
> + haml :"blobs/show"
> + else
> + report_error(500) #OK?
> + end
> +end
Why isn't there a respond_to block here ?
> +#create a new blob using html interface - NON STREAMING (i.e. browser POST http form data)
> post '/api/buckets/:bucket' do
> bucket_id = params[:bucket]
> - blob_id = params['blob_id']
> + blob_id = params['blob']
> blob_data = params['blob_data']
> user_meta = {}
> #first try get blob_metadata from params (i.e., passed by http form post, e.g. browser)
> @@ -710,11 +732,13 @@ post '/api/buckets/:bucket' do
> key = "HTTP_X_Deltacloud_Blobmeta_#{key}"
> value = params[:"meta_value#{i}"]
> user_meta[key] = value
> - end #max.each do
> - else #can try to get blob_metadata from http headers
> - meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> - meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
> - end #end if
> + end
> + end #max.each do
> +# else #can try to get blob_metadata from http headers - NO LONGER VALID (Have seperate PUT for this type of metadata headers)
> +# meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> +# meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
> +# end #end if
> +# BlobPostStream.call(credentials, bucket_id, blob_id, env, user_meta)
> @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, user_meta)
> respond_to do |format|
> format.html { haml :"blobs/show"}
Why all these changes to the HTML UI ? It doesn't seem that streaming
blobs has any connection with this.
David