You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@deltacloud.apache.org by ma...@apache.org on 2011/06/10 18:13:32 UTC
svn commit: r1134365 - in /incubator/deltacloud/trunk/server:
lib/deltacloud/drivers/ec2/ec2_driver.rb
lib/deltacloud/helpers/blob_stream.rb server.rb views/blobs/new.html.haml
Author: marios
Date: Fri Jun 10 16:13:32 2011
New Revision: 1134365
URL: http://svn.apache.org/viewvc?rev=1134365&view=rev
Log:
Adds blob streaming uploads using PUT (stream through thin+deltacloud)
Signed-off-by: marios <ma...@redhat.com>
Modified:
incubator/deltacloud/trunk/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
incubator/deltacloud/trunk/server/lib/deltacloud/helpers/blob_stream.rb
incubator/deltacloud/trunk/server/server.rb
incubator/deltacloud/trunk/server/views/blobs/new.html.haml
Modified: incubator/deltacloud/trunk/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
URL: http://svn.apache.org/viewvc/incubator/deltacloud/trunk/server/lib/deltacloud/drivers/ec2/ec2_driver.rb?rev=1134365&r1=1134364&r2=1134365&view=diff
==============================================================================
--- incubator/deltacloud/trunk/server/lib/deltacloud/drivers/ec2/ec2_driver.rb (original)
+++ incubator/deltacloud/trunk/server/lib/deltacloud/drivers/ec2/ec2_driver.rb Fri Jun 10 16:13:32 2011
@@ -396,7 +396,7 @@ module Deltacloud
end
#--
- # Create Blob
+ # Create Blob - NON Streaming way (i.e. was called with POST html multipart form data)
#--
def create_blob(credentials, bucket_id, blob_id, data = nil, opts = {})
s3_client = new_client(credentials, :s3)
@@ -445,6 +445,39 @@ module Deltacloud
end
end
+ #params: {:user,:password,:bucket,:blob,:content_type,:content_length,:metadata}
+ def blob_stream_connection(params)
+ #canonicalise metadata:
+ #http://docs.amazonwebservices.com/AmazonS3/latest/dev/index.html?RESTAuthentication.html
+ metadata = params[:metadata]
+ signature_meta_string = ""
+ unless metadata.nil?
+ metadata.gsub_keys('HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]', 'x-amz-meta-')
+ keys_array = metadata.keys.sort!
+ keys_array.each {|k| signature_meta_string << "#{k}:#{metadata[k]}\n"}
+ end
+ provider = "https://#{endpoint_for_service(:s3)}"
+ uri = URI.parse(provider)
+ http = Net::HTTP.new("#{params[:bucket]}.#{uri.host}", uri.port )
+ http.use_ssl = true
+ http.verify_mode = OpenSSL::SSL::VERIFY_NONE
+ timestamp = Time.now.httpdate
+ string_to_sign =
+ "PUT\n\n#{params[:content_type]}\n#{timestamp}\n#{signature_meta_string}/#{params[:bucket]}/#{params[:blob]}"
+ auth_string = Aws::Utils::sign(params[:password], string_to_sign)
+ request = Net::HTTP::Put.new("/#{params[:blob]}")
+ request['Host'] = "#{params[:bucket]}.#{uri.host}"
+ request['Date'] = timestamp
+ request['Content-Type'] = params[:content_type]
+ request['Content-Length'] = params[:content_length]
+ request['Authorization'] = "AWS #{params[:user]}:#{auth_string}"
+ request['Expect'] = "100-continue"
+ unless metadata.nil?
+ metadata.each{|k,v| request[k] = v}
+ end
+ return http, request
+ end
+
def storage_volumes(credentials, opts={})
ec2 = new_client( credentials )
volume_list = (opts and opts[:id]) ? opts[:id] : nil
@@ -582,7 +615,6 @@ module Deltacloud
end
private
-
def new_client(credentials, type = :ec2)
klass = case type
when :elb then Aws::Elb
Modified: incubator/deltacloud/trunk/server/lib/deltacloud/helpers/blob_stream.rb
URL: http://svn.apache.org/viewvc/incubator/deltacloud/trunk/server/lib/deltacloud/helpers/blob_stream.rb?rev=1134365&r1=1134364&r2=1134365&view=diff
==============================================================================
--- incubator/deltacloud/trunk/server/lib/deltacloud/helpers/blob_stream.rb (original)
+++ incubator/deltacloud/trunk/server/lib/deltacloud/helpers/blob_stream.rb Fri Jun 10 16:13:32 2011
@@ -69,13 +69,145 @@ class Hash
remove = []
self.each_key do |key|
if key.to_s.match(rgx_pattern)
- new_key = key.to_s.gsub(rgx_pattern, replacement)
+ new_key = key.to_s.gsub(rgx_pattern, replacement).downcase
self[new_key] = self[key]
remove << key
- end #key.match
- end # each_key do
+ end
+ end
#remove the original keys
self.delete_if{|k,v| remove.include?(k)}
- end #def
+ end
-end #class
+end
+
+#Monkey patch for streaming blobs:
+# Normally a client will upload a blob to deltacloud and thin will put
+# this into a tempfile. Then deltacloud would stream up to the provider:
+# i.e. client =-->>TEMP_FILE-->> deltacloud =-->>STREAM-->> provider
+# Instead we want to recognise that this is a 'PUT blob' operation and
+# start streaming to the provider as the request is received:
+# i.e. client =-->>STREAM-->> deltacloud =-->>STREAM-->> provider
+module Thin
+ class Request
+
+ alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile if defined?(Thin::Response)
+ private
+ def move_body_to_tempfile
+ if BlobStreamIO::is_put_blob(self)
+ @body = BlobStreamIO.new(self)
+ else
+ move_body_to_tempfile_orig
+ end
+ end
+
+ end
+end
+
+require 'net/http'
+#monkey patch for Net:HTTP
+module Net
+ class HTTP
+
+ alias :request_orig :request
+
+ def request(req, body = nil, blob_stream = nil, &block)
+ unless blob_stream
+ return request_orig(req, body, &block)
+ end
+ @blob_req = req
+ do_start #start the connection
+
+ req.set_body_internal body
+ begin_transport req
+ req.write_header_m @socket,@curr_http_version, edit_path(req.path)
+ @socket
+ end
+
+ class Put < HTTPRequest
+ def write_header_m(sock, ver, path)
+ write_header(sock, ver, path)
+ end
+ end
+
+ def end_request
+ begin
+ res = HTTPResponse.read_new(@socket)
+ end while res.kind_of?(HTTPContinue)
+ res.reading_body(@socket, @blob_req.response_body_permitted?) {
+ yield res if block_given? }
+ end_transport @blob_req, res
+ do_finish
+ res
+ end
+ end
+
+end
+
+require 'base64'
+class BlobStreamIO
+
+ attr_accessor :size, :provider, :sock
+
+ def initialize(request)
+ @client_request = request
+ @size = 0
+ bucket, blob = parse_bucket_blob(request.env["PATH_INFO"])
+ user, password = parse_credentials(request.env['HTTP_AUTHORIZATION'])
+ content_type = request.env['CONTENT_TYPE'] || ""
+ #deal with blob_metadata: (X-Deltacloud-Blobmeta-name: value)
+ meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
+ user_meta = meta_array.inject({}){ |result, array| result[array.first.upcase] = array.last; result}
+ @content_length = request.env['CONTENT_LENGTH']
+ @http, provider_request = driver.blob_stream_connection({:user=>user,
+ :password=>password, :bucket=>bucket, :blob=>blob, :metadata=> user_meta,
+ :content_type=>content_type, :content_length=>@content_length })
+ @content_length = @content_length.to_i #for comparison of size in '<< (data)'
+ @sock = @http.request(provider_request, nil, true)
+ end
+
+ def << (data)
+ @sock.write(data)
+ @size += data.length
+ if (@size >= @content_length)
+ result = @http.end_request
+ if result.is_a?(Net::HTTPSuccess)
+ @client_request.env["BLOB_SUCCESS"] = "true"
+ else
+ @client_request.env["BLOB_FAIL"] = result.body
+ end
+ end
+ end
+
+ def rewind
+ end
+
+ #use the Request.env hash (populated by the ThinParser) to determine whether
+ #this is a post blob operation. By definition, only get here with a body of
+ # > 112kbytes - thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32)
+ def self.is_put_blob(request = nil)
+ path = request.env['PATH_INFO']
+ method = request.env['REQUEST_METHOD']
+ if ( path =~ /^#{Regexp.escape(Sinatra::UrlForHelper::DEFAULT_URI_PREFIX)}\/buckets/ && method == 'PUT' )
+ return true
+ else
+ return false
+ end
+ end
+
+ private
+
+ def parse_bucket_blob(request_string)
+ array = request_string.split("/")
+ blob = array.pop
+ bucket = array.pop
+ return bucket, blob
+ end
+
+ def parse_credentials(request_string)
+ decoded = Base64.decode64(request_string.split('Basic ').last)
+ key = decoded.split(':').first
+ pass = decoded.split(':').last
+ return key, pass
+ end
+
+end
Modified: incubator/deltacloud/trunk/server/server.rb
URL: http://svn.apache.org/viewvc/incubator/deltacloud/trunk/server/server.rb?rev=1134365&r1=1134364&r2=1134365&view=diff
==============================================================================
--- incubator/deltacloud/trunk/server/server.rb (original)
+++ incubator/deltacloud/trunk/server/server.rb Fri Jun 10 16:13:32 2011
@@ -704,13 +704,47 @@ get "#{Sinatra::UrlForHelper::DEFAULT_UR
end
end
-#create a new blob
+#create a new blob using PUT - streams through deltacloud
+put "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket/:blob" do
+ if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
+ content_type = env["CONTENT_TYPE"]
+ content_type ||= ""
+ @blob = driver.blob(credentials, {:id => params[:blob],
+ 'bucket' => params[:bucket]})
+ respond_to do |format|
+ format.html { haml :"blobs/show" }
+ format.xml { haml :"blobs/show" }
+ format.json { convert_to_json(:blobs, @blob) }
+ end
+ elsif(env["BLOB_FAIL"])
+ report_error(500) #OK?
+ else # small blobs - < 112kb dont hit the streaming monkey patch - use 'normal' create_blob
+ # also, if running under webrick don't hit the streaming patch (Thin specific)
+ bucket_id = params[:bucket]
+ blob_id = params[:blob]
+ temp_file = Tempfile.new("temp_blob_file")
+ temp_file.write(env['rack.input'].read)
+ temp_file.flush
+ content_type = env['CONTENT_TYPE'] || ""
+ blob_data = {:tempfile => temp_file, :type => content_type}
+ meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
+ user_meta = meta_array.inject({}){ |result, array| result[array.first.upcase] = array.last; result}
+ @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, user_meta)
+ temp_file.delete
+ respond_to do |format|
+ format.html { haml :"blobs/show"}
+ format.xml { haml :"blobs/show" }
+ end
+ end
+end
+
+#create a new blob using html interface - NON STREAMING (i.e. browser POST http form data)
post "#{Sinatra::UrlForHelper::DEFAULT_URI_PREFIX}/buckets/:bucket" do
bucket_id = params[:bucket]
- blob_id = params['blob_id']
+ blob_id = params['blob']
blob_data = params['blob_data']
user_meta = {}
-#first try get blob_metadata from params (i.e., passed by http form post, e.g. browser)
+ #metadata from params (i.e., passed by http form post, e.g. browser)
max = params[:meta_params]
if(max)
(1..max.to_i).each do |i|
@@ -718,11 +752,8 @@ post "#{Sinatra::UrlForHelper::DEFAULT_U
key = "HTTP_X_Deltacloud_Blobmeta_#{key}"
value = params[:"meta_value#{i}"]
user_meta[key] = value
- end #max.each do
- else #can try to get blob_metadata from http headers
- meta_array = request.env.select{|k,v| k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
- meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = array.last}
- end #end if
+ end
+ end
@blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, user_meta)
respond_to do |format|
format.html { haml :"blobs/show"}
@@ -738,7 +769,7 @@ delete "#{Sinatra::UrlForHelper::DEFAULT
respond_to do |format|
format.xml { 204 }
format.json { 204 }
- format.html { bucket_url(bucket_id) }
+ format.html { redirect(bucket_url(bucket_id)) }
end
end
@@ -776,7 +807,7 @@ get "#{Sinatra::UrlForHelper::DEFAULT_UR
respond_to do |format|
format.html { haml :"blobs/show" }
format.xml { haml :"blobs/show" }
- format.json { convert_to_json(blobs, @blob) }
+ format.json { convert_to_json(:blobs, @blob) }
end
else
report_error(404)
Modified: incubator/deltacloud/trunk/server/views/blobs/new.html.haml
URL: http://svn.apache.org/viewvc/incubator/deltacloud/trunk/server/views/blobs/new.html.haml?rev=1134365&r1=1134364&r2=1134365&view=diff
==============================================================================
--- incubator/deltacloud/trunk/server/views/blobs/new.html.haml (original)
+++ incubator/deltacloud/trunk/server/views/blobs/new.html.haml Fri Jun 10 16:13:32 2011
@@ -3,13 +3,7 @@
%form{ :action => bucket_url(@bucket_id), :method => :post, :enctype => 'multipart/form-data'}
%label
Blob Name:
- %input{ :name => 'blob_id', :size => 512}/
- %label
- Blob Data:
- %br
- %input{ :type => "file", :name => 'blob_data', :size => 50}/
- %br
- %br
+ %input{ :name => 'blob', :size => 512}/
%input{ :type => "hidden", :name => "meta_params", :value => "0"}
%a{ :href => "javascript:;", :onclick => "more_fields();"} Add Metadata
%div{ :id => "metadata_holder", :style => "display: none;"}
@@ -23,4 +17,10 @@
%a{ :href => "javascript:;", :onclick => "less_fields();"} Less Metadata
%br
%br
+ %label
+ Blob Data:
+ %br
+ %input{ :type => "file", :name => 'blob_data', :size => 50}/
+ %br
+ %br
%input{ :type => :submit, :name => "commit", :value => "create"}/