You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by jm...@apache.org on 2010/04/06 07:52:09 UTC

svn commit: r931026 - in /hadoop/avro/trunk: CHANGES.txt lang/ruby/lib/avro/ipc.rb lang/ruby/test/sample_ipc_http_client.rb lang/ruby/test/sample_ipc_http_server.rb

Author: jmhodges
Date: Tue Apr  6 05:52:09 2010
New Revision: 931026

URL: http://svn.apache.org/viewvc?rev=931026&view=rev
Log:
AVRO-450. HTTP IPC for ruby.

Added:
    hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_client.rb
    hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_server.rb
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=931026&r1=931025&r2=931026&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Tue Apr  6 05:52:09 2010
@@ -18,6 +18,8 @@ Avro 1.4.0 (unreleased)
 
     AVRO-497. Minor changes to C++ autotools, makefiles, and code generator. (sbanacho)
 
+    AVRO-450. HTTP IPC for ruby. (jmhodges)
+
   BUG FIXES
     AVRO-461. Skipping primitives in the ruby side (jmhodges)
 

Modified: hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb?rev=931026&r1=931025&r2=931026&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb (original)
+++ hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb Tue Apr  6 05:52:09 2010
@@ -168,13 +168,13 @@ module Avro::IPC
         true
       when 'CLIENT'
         raise AvroError.new('Handshake failure. match == CLIENT') if send_protocol
-        self.remote_protocol = handshake_response['serverProtocol']
+        self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol'])
         self.remote_hash = handshake_response['serverHash']
         self.send_protocol = false
         false
       when 'NONE'
         raise AvroError.new('Handshake failure. match == NONE') if send_protocol
-        self.remote_protocol = handshake_response['serverProtocol']
+        self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol'])
         self.remote_hash = handshake_response['serverHash']
         self.send_protocol = true
         false
@@ -236,11 +236,10 @@ module Avro::IPC
       protocol_cache[local_hash] = local_protocol
     end
 
-    def respond(transport)
-      # Called by a server to deserialize a request, compute and serialize
-      # a response or error. Compare to 'handle()' in Thrift.
-
-      call_request = transport.read_framed_message
+    # Called by a server to deserialize a request, compute and serialize
+    # a response or error. Compare to 'handle()' in Thrift.
+    def respond(call_request)
+      buffer_reader = StringIO.new(call_request)
       buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request))
       buffer_writer = StringIO.new('', 'w+')
       buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer)
@@ -248,7 +247,7 @@ module Avro::IPC
       response_metadata = {}
 
       begin
-        remote_protocol = process_handshake(transport, buffer_decoder, buffer_encoder)
+        remote_protocol = process_handshake(buffer_decoder, buffer_encoder)
         # handshake failure
         unless remote_protocol
           return buffer_writer.string
@@ -300,7 +299,7 @@ module Avro::IPC
       buffer_writer.string
     end
 
-    def process_handshake(transport, decoder, encoder)
+    def process_handshake(decoder, encoder)
       handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder)
       handshake_response = {}
 
@@ -308,6 +307,7 @@ module Avro::IPC
       client_hash = handshake_request['clientHash']
       client_protocol = handshake_request['clientProtocol']
       remote_protocol = protocol_cache[client_hash]
+
       if !remote_protocol && client_protocol
         remote_protocol = protocol.parse(client_protocol)
         protocol_cache[client_hash] = remote_protocol
@@ -440,4 +440,95 @@ module Avro::IPC
       sock.close
     end
   end
+
+  class ConnectionClosedError < StandardError; end
+
+  class FramedWriter
+    attr_reader :writer
+    def initialize(writer)
+      @writer = writer
+    end
+
+    def write_framed_message(message)
+      message_size = message.size
+      total_bytes_sent = 0
+      while message_size - total_bytes_sent > 0
+        if message_size - total_bytes_sent > BUFFER_SIZE
+          buffer_size = BUFFER_SIZE
+        else
+          buffer_size = message_size - total_bytes_sent
+        end
+        write_buffer(message[total_bytes_sent, buffer_size])
+        total_bytes_sent += buffer_size
+      end
+      write_buffer_size(0)
+    end
+
+    def to_s; writer.string; end
+
+    private
+    def write_buffer(chunk)
+      buffer_size = chunk.size
+      write_buffer_size(buffer_size)
+      writer << chunk
+    end
+
+    def write_buffer_size(n)
+      writer.write([n].pack('N'))
+    end
+  end
+
+  class FramedReader
+    attr_reader :reader
+
+    def initialize(reader)
+      @reader = reader
+    end
+
+    def read_framed_message
+      message = []
+      loop do
+        buffer = ""
+        buffer_size = read_buffer_size
+
+        return message.join if buffer_size == 0
+
+        while buffer.size < buffer_size
+          chunk = reader.read(buffer_size - buffer.size)
+          chunk_error?(chunk)
+          buffer << chunk
+        end
+        message << buffer
+      end
+    end
+
+    private
+    def read_buffer_size
+      header = reader.read(BUFFER_HEADER_LENGTH)
+      chunk_error?(header)
+      header.unpack('N')[0]
+    end
+
+    def chunk_error?(chunk)
+      raise ConnectionClosedError.new("Reader read 0 bytes") if chunk == ''
+    end
+  end
+
+  # Only works for clients. Sigh.
+  class HTTPTransceiver
+    attr_reader :remote_name, :host, :port
+    def initialize(host, port)
+      @host, @port = host, port
+      @remote_name = "#{host}:#{port}"
+    end
+
+    def transceive(message)
+      writer = FramedWriter.new(StringIO.new)
+      writer.write_framed_message(message)
+      resp = Net::HTTP.start(host, port) do |http|
+        http.post('/', writer.to_s, {'Content-Type' => 'avro/binary'})
+      end
+      FramedReader.new(StringIO.new(resp.body)).read_framed_message
+    end
+  end
 end

Added: hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_client.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_client.rb?rev=931026&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_client.rb (added)
+++ hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_client.rb Tue Apr  6 05:52:09 2010
@@ -0,0 +1,84 @@
+#!/usr/bin/env ruby
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'socket'
+require 'avro'
+
+MAIL_PROTOCOL_JSON = <<-JSON
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+     {"name": "Message", "type": "record",
+      "fields": [
+          {"name": "to",   "type": "string"},
+          {"name": "from", "type": "string"},
+          {"name": "body", "type": "string"}
+      ]
+     }
+ ],
+
+ "messages": {
+     "send": {
+         "request": [{"name": "message", "type": "Message"}],
+         "response": "string"
+     },
+     "replay": {
+         "request": [],
+         "response": "string"
+     }
+ }
+}
+JSON
+
+MAIL_PROTOCOL = Avro::Protocol.parse(MAIL_PROTOCOL_JSON)
+
+def make_requestor(server_address, port, protocol)
+  transport = Avro::IPC::HTTPTransceiver.new(server_address, port)
+  Avro::IPC::Requestor.new(protocol, transport)
+end
+
+if $0 == __FILE__
+  if ![3, 4].include?(ARGV.length)
+    raise "Usage: <to> <from> <body> [<count>]"
+  end
+
+  # client code - attach to the server and send a message
+  # fill in the Message record
+  message = {
+    'to'   => ARGV[0],
+    'from' => ARGV[1],
+    'body' => ARGV[2]
+  }
+
+  num_messages = (ARGV[3] || 1).to_i
+
+  # build the parameters for the request
+  params = {'message' => message}
+  # send the requests and print the result
+
+  num_messages.times do
+    requestor = make_requestor('localhost', 9090, MAIL_PROTOCOL)
+    result = requestor.request('send', params)
+    puts("Result: " + result)
+  end
+
+  # try out a replay message
+  requestor = make_requestor('localhost', 9090, MAIL_PROTOCOL)
+  result = requestor.request('replay', {})
+  puts("Replay Result: " + result)
+end

Added: hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_server.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_server.rb?rev=931026&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_server.rb (added)
+++ hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_server.rb Tue Apr  6 05:52:09 2010
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+require 'avro'
+require 'webrick'
+
+MAIL_PROTOCOL_JSON = <<-JSON
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+     {"name": "Message", "type": "record",
+      "fields": [
+          {"name": "to",   "type": "string"},
+          {"name": "from", "type": "string"},
+          {"name": "body", "type": "string"}
+      ]
+     }
+ ],
+
+ "messages": {
+     "send": {
+         "request": [{"name": "message", "type": "Message"}],
+         "response": "string"
+     },
+     "replay": {
+         "request": [],
+         "response": "string"
+     }
+ }
+}
+JSON
+
+MAIL_PROTOCOL = Avro::Protocol.parse(MAIL_PROTOCOL_JSON)
+
+class MailResponder < Avro::IPC::Responder
+  def initialize
+    super(MAIL_PROTOCOL)
+  end
+
+  def call(message, request)
+    if message.name == 'send'
+      request_content = request['message']
+      "Sent message to #{request_content['to']} from #{request_content['from']} with body #{request_content['body']}"
+    elsif message.name == 'replay'
+      'replay'
+    end
+  end
+end
+
+class MailHandler < WEBrick::HTTPServlet::AbstractServlet
+  def do_POST(req, resp)
+    responder = MailResponder.new
+    call_request = Avro::IPC::FramedReader.new(StringIO.new(req.body)).read_framed_message
+    unframed_resp = responder.respond(call_request)
+    writer = Avro::IPC::FramedWriter.new(StringIO.new)
+    writer.write_framed_message(unframed_resp)
+    resp.body = writer.to_s
+    raise WEBrick::HTTPStatus::OK
+  end
+end
+
+if $0 == __FILE__
+  server = WEBrick::HTTPServer.new(:Host => 'localhost', :Port => 9090)
+  server.mount '/', MailHandler
+  trap("INT") { server.shutdown }
+  server.start
+end