You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@deltacloud.apache.org by ma...@redhat.com on 2011/05/20 18:09:31 UTC

[PATCH] Adds blob streaming uploads using PUT (client ---STREAM---> deltacloud ---STREAM---> provider)

From: marios <ma...@redhat.com>


Signed-off-by: marios <ma...@redhat.com>
---
 server/lib/deltacloud/drivers/ec2/ec2_driver.rb |    4 +-
 server/lib/deltacloud/helpers/blob_stream.rb    |  168 +++++++++++++++++++++++
 server/server.rb                                |   40 +++++-
 server/views/blobs/new.html.haml                |   14 +-
 4 files changed, 210 insertions(+), 16 deletions(-)

diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
index 14c5829..a31d358 100644
--- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
+++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
@@ -1,3 +1,5 @@
+# Copyright (C) 2009, 2010  Red Hat, Inc.
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.  The
@@ -396,7 +398,7 @@ module Deltacloud
         end
 
         #--
-        # Create Blob
+        # Create Blob - NON Streaming way (i.e. was called with POST html multipart form data)
         #--
         def create_blob(credentials, bucket_id, blob_id, data = nil, opts = {})
           s3_client = new_client(credentials, :s3)
diff --git a/server/lib/deltacloud/helpers/blob_stream.rb b/server/lib/deltacloud/helpers/blob_stream.rb
index fce14d0..a96b9ca 100644
--- a/server/lib/deltacloud/helpers/blob_stream.rb
+++ b/server/lib/deltacloud/helpers/blob_stream.rb
@@ -79,3 +79,171 @@ class Hash
   end #def
 
 end #class
+
+#Monkey patch for streaming blobs:
+# Normally a client will upload a blob to deltacloud and thin will put this into a tempfile.
+# Then deltacloud would stream up to the provider: client =-->>TEMP_FILE-->> deltacloud =-->>STREAM-->> provider
+# Instead we want to recognise that this is a 'Post blob' operation and start streaming to the provider as
+# the request is received: client =-->>STREAM-->> deltacloud =-->>STREAM-->> provider
+module Thin
+  class Request
+
+    alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile
+    private
+      def move_body_to_tempfile
+        if BlobStreamIO::is_put_blob(self)
+          @body = BlobStreamIO.new(self)
+        else
+          move_body_to_tempfile_orig
+        end
+      end
+
+  end
+end
+
+require 'net/http'
+#monkey patch for Net:HTTP
+module Net
+	class HTTP
+
+		alias :request_orig :request
+
+		@blob_req = nil # needs global scope for close op later
+
+		def request(req, body = nil, blob_stream = nil, &block)
+			unless blob_stream
+				return request_orig(req, body, &block)
+			end
+			@blob_req = req
+			do_start #start the connection
+
+      req.set_body_internal body
+      begin_transport req
+      req.write_header_m @socket,@curr_http_version, edit_path(req.path)
+			@socket
+		end
+
+		class Put < HTTPRequest
+			def write_header_m(sock, ver, path)
+				write_header(sock, ver, path)
+			end
+		end
+
+		def end_request
+	    begin
+	    res = HTTPResponse.read_new(@socket)
+      end while res.kind_of?(HTTPContinue)
+      res.reading_body(@socket, @blob_req.response_body_permitted?) {
+	      yield res if block_given?
+       }
+	end_transport @blob_req, res
+			do_finish
+			res
+		end
+	end
+
+end
+
+
+
+
+require 'aws'
+require 'uri'
+class BlobStreamIO
+
+  attr_accessor :size, :provider, :sock
+
+  def initialize(request)
+		@request = request
+    @size = 0
+    provider = new_provider(request.driver.to_s)
+    bucket, blob = parse_args(request.env["PATH_INFO"], :bucket)
+		user, password = parse_args(request.env['HTTP_AUTHORIZATION'], :credentials)
+		content_type = request.env['CONTENT_TYPE']
+		content_type ||= ""
+    @content_length = request.env['CONTENT_LENGTH']
+		#user_meta = {}
+		#meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
+		#meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
+		uri = URI.parse(provider)
+		timestamp = Time.now.httpdate
+		string_to_sign = "PUT\n\n#{content_type}\n#{timestamp}\n/#{bucket}/#{blob}"
+		auth_string = Aws::Utils::sign(password, string_to_sign)
+		@http = Net::HTTP.new("#{bucket}.#{uri.host}", uri.port )
+		@http.use_ssl = true
+		@http.verify_mode = OpenSSL::SSL::VERIFY_NONE
+		@provider_request = Net::HTTP::Put.new("/#{blob}")
+		@provider_request['Host'] = "#{bucket}.#{uri.host}"
+		@provider_request['Date'] = timestamp
+		@provider_request['Content-Type'] = content_type
+		@provider_request['Content-Length'] = @content_length
+		@provider_request['Authorization'] = "AWS #{user}:#{auth_string}"
+		@provider_request['Expect'] = "100-continue"
+#		@provider_request.body_stream = @buf
+#		true is the flag for blob_stream
+		@content_length = @content_length.to_i
+		@sock = @http.request(@provider_request, nil, true)
+  end
+
+  def << (data)
+		@sock.write(data)
+    @size += data.length
+		if (@size >= @content_length)
+			result = @http.end_request
+			if result.inspect =~ (/Net::HTTPOK 200 OK/)
+				@request.env["BLOB_SUCCESS"] = "true"
+			else
+				@request.env["BLOB_FAIL"] = result.body
+			end
+		end
+  end
+
+  def rewind
+    puts "total size counted was #{size}"
+  end
+
+  #use the Request.env hash (populated by the ThinParser) to
+  #determine whether this is a post blob operation
+  #by definition, only get here with a body of > 112kbytes - thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32)
+  #request.env['PATH_INFO']  === "/api/buckets/mynewcoolbucketwoo"
+  def self.is_put_blob(request = nil)
+    path = request.env['PATH_INFO']
+    method = request.env['REQUEST_METHOD']
+    if ( ((path =~ /^\/api\/buckets/i) == 0) &&
+         ( method == 'PUT') )
+      return true
+    else
+      return false
+    end
+  end
+
+  private
+
+  def parse_args(request_string, element)
+    case element
+      when :bucket
+				array = request_string.split("/")
+				blob = array.pop
+				bucket = array.pop
+        return bucket, blob
+      when :credentials
+        decoded = Base64.decode64(request_string.split('Basic ').last)
+        key = decoded.split(':').first
+        pass = decoded.split(':').last
+        return key, pass
+      else
+        nil
+    end
+  end
+
+  def new_provider(driver_name)
+    provider = case driver_name
+      when (/EC2/) then "https://s3.amazonaws.com"
+      when (/Rackspace/) then "Rax"
+      when (/Azure/) then "Azure"
+      else
+        "UNKNOWN" # should blow something up here - noisily
+    end
+    provider
+  end
+end
diff --git a/server/server.rb b/server/server.rb
index 86dd524..eef4218 100644
--- a/server/server.rb
+++ b/server/server.rb
@@ -696,10 +696,32 @@ get '/api/buckets/:bucket/new_blob' do
   end
 end
 
-#create a new blob
+#create a new blob using PUT - streams through deltacloud
+put '/api/buckets/:bucket/:blob' do
+	if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
+		content_type = env["CONTENT_TYPE"]
+		content_type ||=  ""
+		@blob = Blob.new({:id => params[:blob],
+											:bucket => params[:bucket],
+											:content_length => env["CONTENT_LENGTH"],
+											:content_type => content_type,
+											:last_modified => '',
+											:user_metadata => []}) #add metadata
+#debugger
+#	  respond_to do |format|
+#    	format.xml { haml :"blobs/show" }
+#  	end
+		@_format = :xml
+		haml :"blobs/show"
+	else
+		report_error(500) #OK?
+	end
+end
+
+#create a new blob using html interface - NON STREAMING (i.e. browser POST http form data)
 post '/api/buckets/:bucket' do
   bucket_id = params[:bucket]
-  blob_id = params['blob_id']
+  blob_id = params['blob']
   blob_data = params['blob_data']
   user_meta = {}
 #first try get blob_metadata from params (i.e., passed by http form post, e.g. browser)
@@ -710,11 +732,13 @@ post '/api/buckets/:bucket' do
       key = "HTTP_X_Deltacloud_Blobmeta_#{key}"
       value = params[:"meta_value#{i}"]
       user_meta[key] = value
-    end #max.each do
-  else #can try to get blob_metadata from http headers
-    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
-    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
-  end #end if
+		end
+  end #max.each do
+#  else #can try to get blob_metadata from http headers - NO LONGER VALID (Have seperate PUT for this type of metadata headers)
+#    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
+#    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
+#  end #end if
+#  BlobPostStream.call(credentials, bucket_id, blob_id, env, user_meta)
   @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, user_meta)
   respond_to do |format|
     format.html { haml :"blobs/show"}
@@ -730,7 +754,7 @@ delete '/api/buckets/:bucket/:blob' do
   respond_to do |format|
     format.xml {  204 }
     format.json {  204 }
-    format.html { bucket_url(bucket_id) }
+    format.html { redirect(bucket_url(bucket_id)) }
   end
 end
 
diff --git a/server/views/blobs/new.html.haml b/server/views/blobs/new.html.haml
index a075f0a..bf5c6f5 100644
--- a/server/views/blobs/new.html.haml
+++ b/server/views/blobs/new.html.haml
@@ -3,13 +3,7 @@
 %form{ :action => bucket_url(@bucket_id), :method => :post, :enctype => 'multipart/form-data'}
   %label
     Blob Name:
-    %input{ :name => 'blob_id', :size => 512}/
-  %label
-    Blob Data:
-    %br
-    %input{ :type => "file", :name => 'blob_data', :size => 50}/
-    %br
-    %br
+    %input{ :name => 'blob', :size => 512}/
   %input{ :type => "hidden", :name => "meta_params", :value => "0"}
   %a{ :href => "javascript:;", :onclick => "more_fields();"} Add Metadata
   %div{ :id => "metadata_holder", :style => "display: none;"}
@@ -23,4 +17,10 @@
   %a{ :href => "javascript:;", :onclick => "less_fields();"} Less Metadata
   %br
   %br
+  %label
+    Blob Data:
+    %br
+    %input{ :type => "file", :name => 'blob_data', :size => 50}/
+    %br
+    %br
   %input{ :type => :submit, :name => "commit", :value => "create"}/
-- 
1.7.3.4


Re: [PATCH] Adds blob streaming uploads using PUT (client ---STREAM---> deltacloud ---STREAM---> provider)

Posted by David Lutterkort <lu...@redhat.com>.
Hi Marios,

Good stuff, though I have comments ;)

First off, some stylistic remarks:

      * make sure the first line of the commit message stays below 80
        characters
      * the same for some of the code; try to keep lines to 80 chars or
        less.
      * please don't use tabs for indentation, since they'll look
        different for everybody. (I also cut whitespace at the end of
        lines; I am more than happy to share my Emacs setup that does
        all that automatically)
      * I generally dislike comments like 'end # each_key do' - if they
        are really necessary, it's a sign that the code probably needs
        to be split up

On Fri, 2011-05-20 at 19:09 +0300, marios@redhat.com wrote:
> From: marios <ma...@redhat.com>
> 
> 
> Signed-off-by: marios <ma...@redhat.com>
> ---
>  server/lib/deltacloud/drivers/ec2/ec2_driver.rb |    4 +-
>  server/lib/deltacloud/helpers/blob_stream.rb    |  168 +++++++++++++++++++++++
>  server/server.rb                                |   40 +++++-
>  server/views/blobs/new.html.haml                |   14 +-
>  4 files changed, 210 insertions(+), 16 deletions(-)
> 
> diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> index 14c5829..a31d358 100644
> --- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> +++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> @@ -1,3 +1,5 @@
> +# Copyright (C) 2009, 2010  Red Hat, Inc.
> +#

This needs to be removed - it'll just lead to tears for the next
release.

>  # Licensed to the Apache Software Foundation (ASF) under one or more
>  # contributor license agreements.  See the NOTICE file distributed with
>  # this work for additional information regarding copyright ownership.  The
> @@ -396,7 +398,7 @@ module Deltacloud
>          end
>  
>          #--
> -        # Create Blob
> +        # Create Blob - NON Streaming way (i.e. was called with POST html multipart form data)
>          #--
>          def create_blob(credentials, bucket_id, blob_id, data = nil, opts = {})
>            s3_client = new_client(credentials, :s3)
> diff --git a/server/lib/deltacloud/helpers/blob_stream.rb b/server/lib/deltacloud/helpers/blob_stream.rb
> index fce14d0..a96b9ca 100644
> --- a/server/lib/deltacloud/helpers/blob_stream.rb
> +++ b/server/lib/deltacloud/helpers/blob_stream.rb
> @@ -79,3 +79,171 @@ class Hash
>    end #def
>  
>  end #class
> +
> +#Monkey patch for streaming blobs:
> +# Normally a client will upload a blob to deltacloud and thin will put this into a tempfile.
> +# Then deltacloud would stream up to the provider: client =-->>TEMP_FILE-->> deltacloud =-->>STREAM-->> provider
> +# Instead we want to recognise that this is a 'Post blob' operation and start streaming to the provider as
> +# the request is received: client =-->>STREAM-->> deltacloud =-->>STREAM-->> provider

Good that you added the comment - it should be formatted a little better
though to be more readable.

> +module Thin
> +  class Request
> +
> +    alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile
> +    private
> +      def move_body_to_tempfile
> +        if BlobStreamIO::is_put_blob(self)
> +          @body = BlobStreamIO.new(self)
> +        else
> +          move_body_to_tempfile_orig
> +        end
> +      end
> +
> +  end
> +end
> +
> +require 'net/http'
> +#monkey patch for Net:HTTP
> +module Net
> +	class HTTP
> +
> +		alias :request_orig :request
> +
> +		@blob_req = nil # needs global scope for close op later

I don't think that does what you need it to do, since this code is
evaluated once, when the class is loaded. But since @blob_req is an
instance variable that is used nowhere else, it will always be nil on
newly created Net::HTTP instances.

Strictly speaking, there's no need to clear out @blob_req; but you could
do that in end_request, after calling end_transport. That is still
vulnerable to requests being interrupted before they reach that point.

> +		def request(req, body = nil, blob_stream = nil, &block)
> +			unless blob_stream
> +				return request_orig(req, body, &block)
> +			end
> +			@blob_req = req
> +			do_start #start the connection
> +
> +      req.set_body_internal body
> +      begin_transport req
> +      req.write_header_m @socket,@curr_http_version, edit_path(req.path)
> +			@socket
> +		end
> +
> +		class Put < HTTPRequest
> +			def write_header_m(sock, ver, path)
> +				write_header(sock, ver, path)
> +			end
> +		end
> +
> +		def end_request
> +	    begin
> +	    res = HTTPResponse.read_new(@socket)
> +      end while res.kind_of?(HTTPContinue)
> +      res.reading_body(@socket, @blob_req.response_body_permitted?) {
> +	      yield res if block_given?
> +       }
> +	end_transport @blob_req, res
> +			do_finish

Set @blob_req = nil here

> +			res
> +		end
> +	end
> +
> +end
> +
> +
> +
> +
> +require 'aws'
> +require 'uri'
> +class BlobStreamIO
> +
> +  attr_accessor :size, :provider, :sock
> +
> +  def initialize(request)
> +		@request = request
> +    @size = 0
> +    provider = new_provider(request.driver.to_s)
> +    bucket, blob = parse_args(request.env["PATH_INFO"], :bucket)
> +		user, password = parse_args(request.env['HTTP_AUTHORIZATION'], :credentials)
> +		content_type = request.env['CONTENT_TYPE']
> +		content_type ||= ""

Simpler: content_type = request.env['CONTENT_TYPE'] || ""

> +    @content_length = request.env['CONTENT_LENGTH']
> +		#user_meta = {}
> +		#meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> +		#meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}

Remove this if it isn't needed anymore

> +		uri = URI.parse(provider)
> +		timestamp = Time.now.httpdate
> +		string_to_sign = "PUT\n\n#{content_type}\n#{timestamp}\n/#{bucket}/#{blob}"
> +		auth_string = Aws::Utils::sign(password, string_to_sign)
> +		@http = Net::HTTP.new("#{bucket}.#{uri.host}", uri.port )
> +		@http.use_ssl = true
> +		@http.verify_mode = OpenSSL::SSL::VERIFY_NONE
> +		@provider_request = Net::HTTP::Put.new("/#{blob}")
> +		@provider_request['Host'] = "#{bucket}.#{uri.host}"
> +		@provider_request['Date'] = timestamp
> +		@provider_request['Content-Type'] = content_type
> +		@provider_request['Content-Length'] = @content_length
> +		@provider_request['Authorization'] = "AWS #{user}:#{auth_string}"
> +		@provider_request['Expect'] = "100-continue"

Can we move some (or all) of this stuff into the drivers ?

> +#		@provider_request.body_stream = @buf
> +#		true is the flag for blob_stream
> +		@content_length = @content_length.to_i
> +		@sock = @http.request(@provider_request, nil, true)
> +  end
> +
> +  def << (data)
> +		@sock.write(data)
> +    @size += data.length
> +		if (@size >= @content_length)
> +			result = @http.end_request
> +			if result.inspect =~ (/Net::HTTPOK 200 OK/)

Is there really no better way to check that the request succeeded than
to depend on its string representation ?

> +				@request.env["BLOB_SUCCESS"] = "true"
> +			else
> +				@request.env["BLOB_FAIL"] = result.body
> +			end
> +		end
> +  end
> +
> +  def rewind
> +    puts "total size counted was #{size}"
> +  end

Is this left over from debugging ?

> +  #use the Request.env hash (populated by the ThinParser) to
> +  #determine whether this is a post blob operation
> +  #by definition, only get here with a body of > 112kbytes - thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32)
> +  #request.env['PATH_INFO']  === "/api/buckets/mynewcoolbucketwoo"
> +  def self.is_put_blob(request = nil)
> +    path = request.env['PATH_INFO']
> +    method = request.env['REQUEST_METHOD']
> +    if ( ((path =~ /^\/api\/buckets/i) == 0) &&
> +         ( method == 'PUT') )

The =~ returns nil when there is no match, which is false in Ruby; you
can write the above condition with fewer parens as

        if path =~ /^\/api\/buckets/ && method == 'PUT'

Also, the URL's in the rest of the API are case-sensitive, so no need
for the /.../i

> +      return true
> +    else
> +      return false
> +    end
> +  end
> +
> +  private
> +
> +  def parse_args(request_string, element)
> +    case element
> +      when :bucket
> +				array = request_string.split("/")
> +				blob = array.pop
> +				bucket = array.pop
> +        return bucket, blob
> +      when :credentials
> +        decoded = Base64.decode64(request_string.split('Basic ').last)
> +        key = decoded.split(':').first
> +        pass = decoded.split(':').last
> +        return key, pass
> +      else
> +        nil
> +    end
> +  end

Why not make this two methods parse_bucket and parse_credentials, since
the two branches have nothing in common.

> +  def new_provider(driver_name)
> +    provider = case driver_name
> +      when (/EC2/) then "https://s3.amazonaws.com"
> +      when (/Rackspace/) then "Rax"
> +      when (/Azure/) then "Azure"
> +      else
> +        "UNKNOWN" # should blow something up here - noisily
> +    end
> +    provider
> +  end

Can't we just grab the driver/instantiate a new one and see whether it
supports streaming puts of a blob ? Going by the driver name seems
pretty fragile.

> diff --git a/server/server.rb b/server/server.rb
> index 86dd524..eef4218 100644
> --- a/server/server.rb
> +++ b/server/server.rb
> @@ -696,10 +696,32 @@ get '/api/buckets/:bucket/new_blob' do
>    end
>  end
>  
> -#create a new blob
> +#create a new blob using PUT - streams through deltacloud
> +put '/api/buckets/:bucket/:blob' do
> +	if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
> +		content_type = env["CONTENT_TYPE"]
> +		content_type ||=  ""
> +		@blob = Blob.new({:id => params[:blob],
> +											:bucket => params[:bucket],
> +											:content_length => env["CONTENT_LENGTH"],
> +											:content_type => content_type,
> +											:last_modified => '',
> +											:user_metadata => []}) #add metadata
> +#debugger
> +#	  respond_to do |format|
> +#    	format.xml { haml :"blobs/show" }
> +#  	end

Debugging leftovers ? ;)

> +		@_format = :xml
> +		haml :"blobs/show"
> +	else
> +		report_error(500) #OK?
> +	end
> +end

Why isn't there a respond_to block here ?

> +#create a new blob using html interface - NON STREAMING (i.e. browser POST http form data)
>  post '/api/buckets/:bucket' do
>    bucket_id = params[:bucket]
> -  blob_id = params['blob_id']
> +  blob_id = params['blob']
>    blob_data = params['blob_data']
>    user_meta = {}
>  #first try get blob_metadata from params (i.e., passed by http form post, e.g. browser)
> @@ -710,11 +732,13 @@ post '/api/buckets/:bucket' do
>        key = "HTTP_X_Deltacloud_Blobmeta_#{key}"
>        value = params[:"meta_value#{i}"]
>        user_meta[key] = value
> -    end #max.each do
> -  else #can try to get blob_metadata from http headers
> -    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> -    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
> -  end #end if
> +		end
> +  end #max.each do
> +#  else #can try to get blob_metadata from http headers - NO LONGER VALID (Have seperate PUT for this type of metadata headers)
> +#    meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> +#    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
> +#  end #end if
> +#  BlobPostStream.call(credentials, bucket_id, blob_id, env, user_meta)
>    @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, user_meta)
>    respond_to do |format|
>      format.html { haml :"blobs/show"}

Why all these changes to the HTML UI ? It doesn't seem that streaming
blobs has any connection with this.

David