You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/01/20 02:05:07 UTC
svn commit: r901024 [1/2] - in /hadoop/avro/trunk: ./ lang/ruby/
lang/ruby/lib/ lang/ruby/lib/avro/ lang/ruby/test/ share/
Author: cutting
Date: Wed Jan 20 01:05:06 2010
New Revision: 901024
URL: http://svn.apache.org/viewvc?rev=901024&view=rev
Log:
AVRO-306. Add ruby implementation. Contributed by Jeff Hodges.
Added:
hadoop/avro/trunk/lang/ruby/ (with props)
hadoop/avro/trunk/lang/ruby/.gitignore
hadoop/avro/trunk/lang/ruby/CHANGELOG
hadoop/avro/trunk/lang/ruby/Manifest
hadoop/avro/trunk/lang/ruby/Rakefile
hadoop/avro/trunk/lang/ruby/lib/
hadoop/avro/trunk/lang/ruby/lib/avro/
hadoop/avro/trunk/lang/ruby/lib/avro.rb
hadoop/avro/trunk/lang/ruby/lib/avro/collect_hash.rb
hadoop/avro/trunk/lang/ruby/lib/avro/data_file.rb
hadoop/avro/trunk/lang/ruby/lib/avro/io.rb
hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb
hadoop/avro/trunk/lang/ruby/lib/avro/protocol.rb
hadoop/avro/trunk/lang/ruby/lib/avro/schema.rb
hadoop/avro/trunk/lang/ruby/test/
hadoop/avro/trunk/lang/ruby/test/sample_ipc_client.rb
hadoop/avro/trunk/lang/ruby/test/sample_ipc_server.rb
hadoop/avro/trunk/lang/ruby/test/test_help.rb
hadoop/avro/trunk/lang/ruby/test/test_io.rb
hadoop/avro/trunk/lang/ruby/test/test_protocol.rb
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/README.txt
hadoop/avro/trunk/build.sh
hadoop/avro/trunk/share/rat-excludes.txt
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=901024&r1=901023&r2=901024&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Wed Jan 20 01:05:06 2010
@@ -68,6 +68,8 @@
AVRO-346. Add function to validate a datum against a schema. (massie)
+ AVRO-306. Add Ruby implementation. (Jeff Hodges via cutting)
+
IMPROVEMENTS
AVRO-157. Changes from code review comments for C++. (sbanacho)
Modified: hadoop/avro/trunk/README.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/README.txt?rev=901024&r1=901023&r2=901024&view=diff
==============================================================================
--- hadoop/avro/trunk/README.txt (original)
+++ hadoop/avro/trunk/README.txt Wed Jan 20 01:05:06 2010
@@ -12,6 +12,7 @@
- Python: 2.5 or greater
- C: gcc, autoconf, automake, libtool, asciidoc
- C++: g++, flex, bison, libboost-dev
+ - Ruby: ruby, gem, rake, echoe, jajl-ruby
- Apache Ant 1.7
- Apache Forrest 0.8 (for documentation, requires Java 1.5)
Modified: hadoop/avro/trunk/build.sh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/build.sh?rev=901024&r1=901023&r2=901024&view=diff
==============================================================================
--- hadoop/avro/trunk/build.sh (original)
+++ hadoop/avro/trunk/build.sh Wed Jan 20 01:05:06 2010
@@ -44,6 +44,7 @@
(cd lang/py; ant test)
(cd lang/c; ./build.sh test)
# (cd lang/c++; make test)
+ # (cd lang/ruby; rake test)
# create interop test data
(cd lang/java; ant interop-data-generate)
Propchange: hadoop/avro/trunk/lang/ruby/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Jan 20 01:05:06 2010
@@ -0,0 +1 @@
+tmp
Added: hadoop/avro/trunk/lang/ruby/.gitignore
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/.gitignore?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/.gitignore (added)
+++ hadoop/avro/trunk/lang/ruby/.gitignore Wed Jan 20 01:05:06 2010
@@ -0,0 +1 @@
+tmp
\ No newline at end of file
Added: hadoop/avro/trunk/lang/ruby/CHANGELOG
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/CHANGELOG?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/CHANGELOG (added)
+++ hadoop/avro/trunk/lang/ruby/CHANGELOG Wed Jan 20 01:05:06 2010
@@ -0,0 +1 @@
+v0.0.1 stuff
\ No newline at end of file
Added: hadoop/avro/trunk/lang/ruby/Manifest
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/Manifest?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/Manifest (added)
+++ hadoop/avro/trunk/lang/ruby/Manifest Wed Jan 20 01:05:06 2010
@@ -0,0 +1,15 @@
+CHANGELOG
+Rakefile
+lib/rb/avro.rb
+lib/rb/avro/collect_hash.rb
+lib/rb/avro/data_file.rb
+lib/rb/avro/io.rb
+lib/rb/avro/ipc.rb
+lib/rb/avro/protocol.rb
+lib/rb/avro/schema.rb
+test/rb/sample_ipc_client.rb
+test/rb/sample_ipc_server.rb
+test/rb/test_help.rb
+test/rb/test_io.rb
+test/rb/test_protocol.rb
+Manifest
Added: hadoop/avro/trunk/lang/ruby/Rakefile
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/Rakefile?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/Rakefile (added)
+++ hadoop/avro/trunk/lang/ruby/Rakefile Wed Jan 20 01:05:06 2010
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'echoe'
+Echoe.new('avro') do |p|
+ p.author = "Jeff Hodges"
+ p.author = "Ryan King"
+ p.summary = "Apache Avro for Ruby"
+ p.url = "http://hadoop.apache.org/avro/"
+ p.runtime_dependencies = %w[rubygems yajl]
+end
Added: hadoop/avro/trunk/lang/ruby/lib/avro.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/lib/avro.rb (added)
+++ hadoop/avro/trunk/lang/ruby/lib/avro.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'yajl'
+require 'set'
+require 'md5'
+
+module Avro
+ VERSION = "FIXME"
+
+ class AvroError < StandardError; end
+
+ class AvroTypeError < Avro::AvroError
+ def initialize(schm=nil, datum=nil, msg=nil)
+ msg ||= "Not a #{schm.to_s}: #{datum}"
+ super(msg)
+ end
+ end
+end
+
+require 'avro/collect_hash'
+require 'avro/schema'
+require 'avro/io'
+require 'avro/data_file'
+require 'avro/protocol'
+require 'avro/ipc'
Added: hadoop/avro/trunk/lang/ruby/lib/avro/collect_hash.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro/collect_hash.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/lib/avro/collect_hash.rb (added)
+++ hadoop/avro/trunk/lang/ruby/lib/avro/collect_hash.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+module Enumerable
+ def collect_hash
+ inject(Hash.new) do |memo, i|
+ k, v = yield(i)
+ memo[k] = v if k
+ memo
+ end
+ end
+end
Added: hadoop/avro/trunk/lang/ruby/lib/avro/data_file.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro/data_file.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/lib/avro/data_file.rb (added)
+++ hadoop/avro/trunk/lang/ruby/lib/avro/data_file.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,243 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'openssl'
+
+module Avro
+ module DataFile
+ VERSION = 0
+ MAGIC = "Obj" + [VERSION].pack('c')
+ MAGIC_SIZE = MAGIC.size
+ SYNC_SIZE = 16
+ SYNC_INTERVAL = 1000 * SYNC_SIZE
+ META_SCHEMA = Schema.parse('{"type": "map", "values": "bytes"}')
+ VALID_CODECS = ['null']
+ VALID_ENCODINGS = ['binary'] # not used yet
+
+ class DataFileError < AvroError; end
+
+ class Writer
+ def self.generate_sync_marker
+ OpenSSL::Random.random_bytes(16)
+ end
+
+ attr_reader :writer, :encoder, :datum_writer, :buffer_writer, :buffer_encoder, :sync_marker, :meta
+ attr_accessor :block_count
+
+ def initialize(writer, datum_writer, writers_schema=nil)
+ # If writers_schema is not present, presume we're appending
+ @writer = writer
+ @encoder = IO::BinaryEncoder.new(@writer)
+ @datum_writer = datum_writer
+ @buffer_writer = StringIO.new('', 'w')
+ @buffer_encoder = IO::BinaryEncoder.new(@buffer_writer)
+ @block_count = 0
+
+ @meta = {}
+
+ if writers_schema
+ @sync_marker = Writer.generate_sync_marker
+ meta['codec'] = 'null'
+ meta['schema'] = writers_schema.to_s
+ datum_writer.writers_schema = writers_schema
+ write_header
+ else
+ # open writer for reading to collect metadata
+ dfr = DataFileReader.new(writer, Avro::IO::DatumReader.new)
+
+ # FIXME(jmhodges): collect arbitrary metadata
+ # collect metadata
+ @sync_marker = dfr.sync_marker
+ meta['codec'] = dfr.meta['codec']
+
+ # get schema used to write existing file
+ schema_from_file = dfr.meta['schema']
+ meta['schema'] = schema_from_file
+ datum_writer.writers_schema = Schema.parse(schema_from_file)
+
+ # seek to the end of the file and prepare for writing
+ writer.seek(0,2)
+ end
+ end
+
+ # Append a datum to the file
+ def <<(datum)
+ datum_writer.write(datum, buffer_encoder)
+ self.block_count += 1
+
+ # if the data to write is larger than the sync interval, write
+ # the block
+ if buffer_writer.tell >= SYNC_INTERVAL
+ write_block
+ end
+ end
+
+ # Return the current position as a value that may be passed to
+ # DataFileReader.seek(long). Forces the end of the current block,
+ # emitting a synchronization marker.
+ def sync
+ write_block
+ writer.tell
+ end
+
+ # Flush the current state of the file, including metadata
+ def flush
+ write_block
+ writer.flush
+ end
+
+ def close
+ flush
+ writer.close
+ end
+
+ private
+
+ def write_header
+ # write magic
+ writer.write(MAGIC)
+
+ # write metadata
+ datum_writer.write_data(META_SCHEMA, meta, encoder)
+
+ # write sync marker
+ writer.write(sync_marker)
+ end
+
+ # TODO(jmhodges): make a schema for blocks and use datum_writer
+ # TODO(jmhodges): do we really need the number of items in the block?
+ # TODO(jmhodges): use codec when writing the block contents
+ def write_block
+ if block_count > 0
+ # write number of items in block and block size in bytes
+ encoder.write_long(block_count)
+ to_write = buffer_writer.string
+ encoder.write_long(to_write.size)
+
+ # write block contents
+ if meta['codec'] == 'null'
+ writer.write(to_write)
+ else
+ msg = "#{meta['codec'].inspect} coded is not supported"
+ raise DataFileError, msg
+ end
+
+ # write sync marker
+ writer.write(sync_marker)
+
+ # reset buffer
+ buffer_writer.truncate(0)
+ self.block_count = 0
+ end
+ end
+ end
+
+ # Read files written by DataFileWriter
+ class Reader
+ include ::Enumerable
+
+ attr_reader :reader, :decoder, :datum_reader, :sync_marker, :meta, :file_length
+ attr_accessor :block_count
+
+ def initialize(reader, datum_reader)
+ @reader = reader
+ @decoder = IO::BinaryDecoder.new(reader)
+ @datum_reader = datum_reader
+
+ # read the header: magic, meta, sync
+ read_header
+
+ # ensure the codec is valid
+ codec_from_file = meta['codec']
+ if codec_from_file && ! VALID_CODECS.include?(codec_from_file)
+ raise DataFileError, "Unknown codec: #{codec_from_file}"
+ end
+
+ # get ready to read
+ @block_count = 0
+ datum_reader.writers_schema = Schema.parse meta['schema']
+ end
+
+ # Iterates through each datum in this file
+ # TODO(jmhodges): handle block of length zero
+ def each
+ loop do
+ if block_count == 0
+ case
+ when eof?; break
+ when skip_sync
+ break if eof?
+ read_block_header
+ else
+ read_block_header
+ end
+ end
+
+ datum = datum_reader.read(decoder)
+ self.block_count -= 1
+ yield(datum)
+ end
+ end
+
+ def eof?; reader.eof?; end
+
+ def close
+ reader.close
+ end
+
+ private
+ def read_header
+ # seek to the beginning of the file to get magic block
+ reader.seek(0, 0)
+
+ # check magic number
+ magic_in_file = reader.read(MAGIC_SIZE)
+ if magic_in_file.size < MAGIC_SIZE
+ msg = 'Not an Avro data file: shorter than the Avro magic block'
+ raise DataFileError, msg
+ elsif magic_in_file != MAGIC
+ msg = "Not an Avro data file: #{magic_in_file} doesn't match #{MAGIC}"
+ raise DataFileError, msg
+ end
+
+ # read metadata
+ @meta = datum_reader.read_data(META_SCHEMA,
+ META_SCHEMA,
+ decoder)
+ # read sync marker
+ @sync_marker = reader.read(SYNC_SIZE)
+ end
+
+ def read_block_header
+ self.block_count = decoder.read_long
+ decoder.read_long # not doing anything with length in bytes
+ end
+
+ # read the length of the sync marker; if it matches the sync
+ # marker, return true. Otherwise, seek back to where we started
+ # and return false
+ def skip_sync
+ proposed_sync_marker = reader.read(SYNC_SIZE)
+ if proposed_sync_marker != sync_marker
+ reader.seek(-SYNC_SIZE, 1)
+ false
+ else
+ true
+ end
+ end
+ end
+ end
+end
Added: hadoop/avro/trunk/lang/ruby/lib/avro/io.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro/io.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/lib/avro/io.rb (added)
+++ hadoop/avro/trunk/lang/ruby/lib/avro/io.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,571 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+module Avro
+ module IO
+ # Raised when datum is not an example of schema
+ class AvroTypeError < AvroError
+ def initialize(expected_schema, datum)
+ super("The datum #{datum.inspect} is not an example of schema #{expected_schema}")
+ end
+ end
+
+ # Raised when writer's and reader's schema do not match
+ class SchemaMatchException < AvroError
+ def initialize(writers_schema, readers_schema)
+ super("Writer's schema #{writers_schema} and Reader's schema " +
+ "#{readers_schema} do not match.")
+ end
+ end
+
+ # FIXME(jmhodges) move validate to this module?
+
+ class BinaryDecoder
+ # Read leaf values
+
+ # reader is an object on which we can call read, seek and tell.
+ attr_reader :reader
+ def initialize(reader)
+ @reader = reader
+ end
+
+ def byte!
+ @reader.read(1)[0]
+ end
+
+ def read_null
+ # null is written as zero byte's
+ nil
+ end
+
+ def read_boolean
+ byte! == 1
+ end
+
+ def read_int; read_long; end
+
+ def read_long
+ # int and long values are written using variable-length,
+ # zig-zag coding.
+ b = byte!
+ n = b & 0x7F
+ shift = 7
+ while (b & 0x80) != 0
+ b = byte!
+ n |= (b & 0x7F) << shift
+ shift += 7
+ end
+ (n >> 1) ^ -(n & 1)
+ end
+
+ def read_float
+ # A float is written as 4 bytes.
+ # The float is converted into a 32-bit integer using a method
+ # equivalent to Java's floatToIntBits and then encoded in
+ # little-endian format.
+
+ bits = (byte! & 0xFF) |
+ ((byte! & 0xff) << 8) |
+ ((byte! & 0xff) << 16) |
+ ((byte! & 0xff) << 24)
+ [bits].pack('i').unpack('e')[0]
+ end
+
+ def read_double
+ # A double is written as 8 bytes.
+ # The double is converted into a 64-bit integer using a method
+ # equivalent to Java's doubleToLongBits and then encoded in
+ # little-endian format.
+
+ bits = (byte! & 0xFF) |
+ ((byte! & 0xff) << 8) |
+ ((byte! & 0xff) << 16) |
+ ((byte! & 0xff) << 24) |
+ ((byte! & 0xff) << 32) |
+ ((byte! & 0xff) << 40) |
+ ((byte! & 0xff) << 48) |
+ ((byte! & 0xff) << 56)
+ [bits].pack('Q').unpack('d')[0]
+ end
+
+ def read_bytes
+ # Bytes are encoded as a long followed by that many bytes of
+ # data.
+ read(read_long)
+ end
+
+ def read_string
+ # A string is encoded as a long followed by that many bytes of
+ # UTF-8 encoded character data.
+ # FIXME utf-8 encode this in 1.9
+ read_bytes
+ end
+
+ def read(len)
+ # Read n bytes
+ @reader.read(len)
+ end
+
+ def skip_null
+ nil
+ end
+
+ def skip_boolean
+ skip(1)
+ end
+
+ def skip_int
+ skip_long
+ end
+
+ def skip_long
+ b = byte!
+ while (b & 0x80) != 0
+ b = byte!
+ end
+ end
+
+ def skip_float
+ skip(4)
+ end
+
+ def skip_double
+ skip(8)
+ end
+
+ def skip_bytes
+ skip(read_long)
+ end
+
+ def skip_string
+ skip_bytes
+ end
+
+ def skip(n)
+ reader.seek(reader.tell() + n)
+ end
+ end
+
+ # Write leaf values
+ class BinaryEncoder
+ attr_reader :writer
+
+ def initialize(writer)
+ @writer = writer
+ end
+
+ # null is written as zero bytes
+ def write_null(datum)
+ nil
+ end
+
+ # a boolean is written as a single byte
+ # whose value is either 0 (false) or 1 (true).
+ def write_boolean(datum)
+ on_disk = datum ? 1.chr : 0.chr
+ writer.write(on_disk)
+ end
+
+ # int and long values are written using variable-length,
+ # zig-zag coding.
+ def write_int(n)
+ write_long(n)
+ end
+
+ # int and long values are written using variable-length,
+ # zig-zag coding.
+ def write_long(n)
+ foo = n
+ n = (n << 1) ^ (n >> 63)
+ while (n & ~0x7F) != 0
+ @writer.write(((n & 0x7f) | 0x80).chr)
+ n >>= 7
+ end
+ @writer.write(n.chr)
+ end
+
+ # A float is written as 4 bytes.
+ # The float is converted into a 32-bit integer using a method
+ # equivalent to Java's floatToIntBits and then encoded in
+ # little-endian format.
+ def write_float(datum)
+ bits = [datum].pack('e').unpack('i')[0]
+ @writer.write(((bits ) & 0xFF).chr)
+ @writer.write(((bits >> 8 ) & 0xFF).chr)
+ @writer.write(((bits >> 16) & 0xFF).chr)
+ @writer.write(((bits >> 24) & 0xFF).chr)
+ end
+
+ # A double is written as 8 bytes.
+ # The double is converted into a 64-bit integer using a method
+ # equivalent to Java's doubleToLongBits and then encoded in
+ # little-endian format.
+ def write_double(datum)
+ bits = [datum].pack('d').unpack('Q')[0]
+ @writer.write(((bits ) & 0xFF).chr)
+ @writer.write(((bits >> 8 ) & 0xFF).chr)
+ @writer.write(((bits >> 16) & 0xFF).chr)
+ @writer.write(((bits >> 24) & 0xFF).chr)
+ @writer.write(((bits >> 32) & 0xFF).chr)
+ @writer.write(((bits >> 40) & 0xFF).chr)
+ @writer.write(((bits >> 48) & 0xFF).chr)
+ @writer.write(((bits >> 56) & 0xFF).chr)
+ end
+
+ # Bytes are encoded as a long followed by that many bytes of data.
+ def write_bytes(datum)
+ write_long(datum.size)
+ @writer.write(datum)
+ end
+
+ # A string is encoded as a long followed by that many bytes of
+ # UTF-8 encoded character data
+ def write_string(datum)
+ # FIXME utf-8 encode this in 1.9
+ write_bytes(datum)
+ end
+
+ # Write an arbritary datum.
+ def write(datum)
+ writer.write(datum)
+ end
+ end
+
+ class DatumReader
+ def self.check_props(schema_one, schema_two, prop_list)
+ prop_list.all? do |prop|
+ schema_one.to_hash[prop] == schema_two.to_hash[prop]
+ end
+ end
+
+ def self.match_schemas(writers_schema, readers_schema)
+ w_type = writers_schema.type
+ r_type = readers_schema.type
+
+ # This conditional is begging for some OO love.
+ if [w_type, r_type].include? 'union'
+ return true
+ elsif Schema::PRIMITIVE_TYPES.include?(w_type) &&
+ Schema::PRIMITIVE_TYPES.include?(r_type) &&
+ w_type == r_type
+ return true
+ elsif (w_type == r_type) && (r_type == 'record') &&
+ check_props(writers_schema, readers_schema, ['fullname'])
+ return true
+ elsif w_type == r_type && r_type == 'error' && check_props(writers_scheam, readers_schema, ['fullname'])
+ return true
+ elsif w_type == r_type && r_type == 'request'
+ return true
+ elsif (w_type == r_type) && (r_type == 'fixed') &&
+ check_props(writers_schema, readers_schema, ['fullname', 'size'])
+ return true
+ elsif (w_type == r_type) && (r_type == 'enum') &&
+ check_props(writers_schema, readers_schema, ['fullname'])
+ return true
+ elsif (w_type == r_type) && (r_type == 'map') &&
+ check_props(writers_schema.values, readers_schema.values, ['type'])
+ return true
+ elsif (w_type == r_type) && (r_type == 'array') &&
+ check_props(writers_schema.items, readers_schema.items, ['type'])
+ return true
+ end
+
+ # Handle schema promotion
+ if w_type == 'int' && ['long', 'float', 'double'].include?(r_type)
+ return true
+ elsif w_type == 'long' && ['float', 'double'].include?(r_type)
+ return true
+ elsif w_type == 'float' && r_type == 'double'
+ return true
+ end
+
+ return false
+ end
+
+ attr_accessor :writers_schema, :readers_schema
+
+ def initialize(writers_schema=nil, readers_schema=nil)
+ @writers_schema = writers_schema
+ @readers_schema = readers_schema
+ end
+
+ def read(decoder)
+ self.readers_schema = writers_schema unless readers_schema
+ read_data(writers_schema, readers_schema, decoder)
+ end
+
+ def read_data(writers_schema, readers_schema, decoder)
+ # schema matching
+ unless self.class.match_schemas(writers_schema, readers_schema)
+ raise SchemaMatchException.new(writers_schema, readers_schema)
+ end
+
+ # schema resolution: reader's schema is a union, writer's
+ # schema is not
+ if writers_schema.type != 'union' && readers_schema.type == 'union'
+ rs = readers_schema.schemas.find{|s|
+ self.class.match_schemas(writers_schema, s)
+ }
+ return read_data(writers_schema, rs, decoder) if rs
+ raise SchemaMatchException.new(writers_schema, readers_schema)
+ end
+
+ # function dispatch for reading data based on type of writer's
+ # schema
+ case writers_schema.type
+ when 'null'; decoder.read_null
+ when 'boolean'; decoder.read_boolean
+ when 'string'; decoder.read_string
+ when 'int'; decoder.read_int
+ when 'long'; decoder.read_long
+ when 'float'; decoder.read_float
+ when 'double'; decoder.read_double
+ when 'bytes'; decoder.read_bytes
+ when 'fixed'; read_fixed(writers_schema, readers_schema, decoder)
+ when 'enum'; read_enum(writers_schema, readers_schema, decoder)
+ when 'array'; read_array(writers_schema, readers_schema, decoder)
+ when 'map'; read_map(writers_schema, readers_schema, decoder)
+ when 'union'; read_union(writers_schema, readers_schema, decoder)
+ when 'record', 'errors', 'request'; read_record(writers_schema, readers_schema, decoder)
+ else
+ raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}"
+ end
+ end
+
+ def read_fixed(writers_schema, readers_schema, decoder)
+ decoder.read(writers_schema.size)
+ end
+
+ def read_enum(writers_schema, readers_schema, decoder)
+ index_of_symbol = decoder.read_int
+ read_symbol = writers_schema.symbols[index_of_symbol]
+
+ # TODO(jmhodges): figure out what unset means for resolution
+ # schema resolution
+ unless readers_schema.symbols.include?(read_symbol)
+ # 'unset' here
+ end
+
+ read_symbol
+ end
+
+ def read_array(writers_schema, readers_schema, decoder)
+ read_items = []
+ block_count = decoder.read_long
+ while block_count != 0
+ if block_count < 0
+ block_count = -block_count
+ block_size = decoder.read_long
+ end
+ block_count.times do
+ read_items << read_data(writers_schema.items,
+ readers_schema.items,
+ decoder)
+ end
+ block_count = decoder.read_long
+ end
+
+ read_items
+ end
+
+ def read_map(writers_schema, readers_schema, decoder)
+ read_items = {}
+ block_count = decoder.read_long
+ while block_count != 0
+ if block_count < 0
+ block_count = -block_count
+ block_size = decoder.read_long
+ end
+ block_count.times do
+ key = decoder.read_string
+ read_items[key] = read_data(writers_schema.values,
+ readers_schema.values,
+ decoder)
+ end
+ block_count = decoder.read_long
+ end
+
+ read_items
+ end
+
+ def read_union(writers_schema, readers_schema, decoder)
+ index_of_schema = decoder.read_long
+ selected_writers_schema = writers_schema.schemas[index_of_schema]
+
+ read_data(selected_writers_schema, readers_schema, decoder)
+ end
+
+ def read_record(writers_schema, readers_schema, decoder)
+ readers_fields_hash = readers_schema.fields_hash
+ read_record = {}
+ writers_schema.fields.each do |field|
+ if readers_field = readers_fields_hash[field.name]
+ field_val = read_data(field.type, readers_field.type, decoder)
+ read_record[field.name] = field_val
+ else
+ skip_data(field.type, decoder)
+ end
+ end
+
+ # fill in the default values
+ if readers_fields_hash.size > read_record.size
+ writers_fields_hash = writers_schema.fields_hash
+ readers_fields_hash.each do |field_name, field|
+
+ unless writers_fields_hash.has_key? field_name
+ if !field.default.nil?
+ field_val = read_default_value(field.type, field.default)
+ read_record[field.name] = field_val
+ else
+ # FIXME(jmhodges) another 'unset' here
+ end
+ end
+ end
+ end
+
+ read_record
+ end
+
+ def read_default_value(field_schema, default_value)
+ # Basically a JSON Decoder?
+ case field_schema.type
+ when 'null'
+ return nil
+ when 'boolean'
+ return default_value
+ when 'int', 'long'
+ return Integer(default_value)
+ when 'float', 'double'
+ return Float(default_value)
+ when 'enum', 'fixed', 'string', 'bytes'
+ return default_value
+ when 'array'
+ read_array = []
+ default_value.each do |json_val|
+ item_val = read_default_value(field_schema.items, json_val)
+ read_array << item_val
+ end
+ return read_array
+ when 'map'
+ read_map = {}
+ default_value.each do |key, json_val|
+ map_val = read_default_value(field_schema.values, json_val)
+ read_map[key] = map_val
+ end
+ return read_map
+ when 'union'
+ return read_default_value(field_schema.schemas[0], default_value)
+ when 'record'
+ read_record = {}
+ field_schema.fields.each do |field|
+ json_val = default_value[field.name]
+ json_val = field.default unless json_val
+ field_val = read_default_value(field.type, json_val)
+ read_record[field.name] = field_val
+ end
+ return read_record
+ else
+ fail_msg = "Unknown type: #{field_schema.type}"
+ raise AvroError(fail_msg)
+ end
+ end
+ end # DatumReader
+
+ # DatumWriter for generic ruby objects
+ class DatumWriter
+ attr_accessor :writers_schema
+ def initialize(writers_schema=nil)
+ @writers_schema = writers_schema
+ end
+
+ def write(datum, encoder)
+ write_data(writers_schema, datum, encoder)
+ end
+
+ def write_data(writers_schema, datum, encoder)
+ unless Schema.validate(writers_schema, datum)
+ raise AvroTypeError.new(writers_schema, datum)
+ end
+
+ # function dispatch to write datum
+ case writers_schema.type
+ when 'null'; encoder.write_null(datum)
+ when 'boolean'; encoder.write_boolean(datum)
+ when 'string'; encoder.write_string(datum)
+ when 'int'; encoder.write_int(datum)
+ when 'long'; encoder.write_long(datum)
+ when 'float'; encoder.write_float(datum)
+ when 'double'; encoder.write_double(datum)
+ when 'bytes'; encoder.write_bytes(datum)
+ when 'fixed'; write_fixed(writers_schema, datum, encoder)
+ when 'enum'; write_enum(writers_schema, datum, encoder)
+ when 'array'; write_array(writers_schema, datum, encoder)
+ when 'map'; write_map(writers_schema, datum, encoder)
+ when 'union'; write_union(writers_schema, datum, encoder)
+ when 'record', 'errors', 'request'; write_record(writers_schema, datum, encoder)
+ else
+ raise AvroError.new("Unknown type: #{writers_schema.type}")
+ end
+ end
+
+ def write_fixed(writers_schema, datum, encoder)
+ encoder.write(datum)
+ end
+
+ def write_enum(writers_schema, datum, encoder)
+ index_of_datum = writers_schema.symbols.index(datum)
+ encoder.write_int(index_of_datum)
+ end
+
+ def write_array(writers_schema, datum, encoder)
+ if datum.size > 0
+ encoder.write_long(datum.size)
+ datum.each do |item|
+ write_data(writers_schema.items, item, encoder)
+ end
+ end
+ encoder.write_long(0)
+ end
+
+ def write_map(writers_schema, datum, encoder)
+ if datum.size > 0
+ encoder.write_long(datum.size)
+ datum.each do |k,v|
+ encoder.write_string(k)
+ write_data(writers_schema.values, v, encoder)
+ end
+ end
+ encoder.write_long(0)
+ end
+
+ def write_union(writers_schema, datum, encoder)
+ index_of_schema = writers_schema.schemas.
+ find_index{|e| Schema.validate(e, datum) }
+ unless index_of_schema
+ raise AvroTypeError.new(writers_schema, datum)
+ end
+ encoder.write_long(index_of_schema)
+ write_data(writers_schema.schemas[index_of_schema], datum, encoder)
+ end
+
+ def write_record(writers_schema, datum, encoder)
+ writers_schema.fields.each do |field|
+ write_data(field.type, datum[field.name], encoder)
+ end
+ end
+ end # DatumWriter
+ end
+end
Added: hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb (added)
+++ hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,443 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+require 'stringio'
+
+module Avro::IPC
+
+ class AvroRemoteError < Avro::AvroError; end
+
+ HANDSHAKE_REQUEST_SCHEMA = Avro::Schema.parse <<-JSON
+ {
+ "type": "record",
+ "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
+ "fields": [
+ {"name": "clientHash",
+ "type": {"type": "fixed", "name": "MD5", "size": 16}},
+ {"name": "clientProtocol", "type": ["null", "string"]},
+ {"name": "serverHash", "type": "MD5"},
+ {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
+ ]
+ }
+ JSON
+
+ HANDSHAKE_RESPONSE_SCHEMA = Avro::Schema.parse <<-JSON
+ {
+ "type": "record",
+ "name": "HandshakeResponse", "namespace": "org.apache.avro.ipc",
+ "fields": [
+ {"name": "match",
+ "type": {"type": "enum", "name": "HandshakeMatch",
+ "symbols": ["BOTH", "CLIENT", "NONE"]}},
+ {"name": "serverProtocol", "type": ["null", "string"]},
+ {"name": "serverHash",
+ "type": ["null", {"type": "fixed", "name": "MD5", "size": 16}]},
+ {"name": "meta",
+ "type": ["null", {"type": "map", "values": "bytes"}]}
+ ]
+ }
+ JSON
+
+ HANDSHAKE_REQUESTOR_WRITER = Avro::IO::DatumWriter.new(HANDSHAKE_REQUEST_SCHEMA)
+ HANDSHAKE_REQUESTOR_READER = Avro::IO::DatumReader.new(HANDSHAKE_RESPONSE_SCHEMA)
+ HANDSHAKE_RESPONDER_WRITER = Avro::IO::DatumWriter.new(HANDSHAKE_RESPONSE_SCHEMA)
+ HANDSHAKE_RESPONDER_READER = Avro::IO::DatumReader.new(HANDSHAKE_REQUEST_SCHEMA)
+
+ META_SCHEMA = Avro::Schema.parse('{"type": "map", "values": "bytes"}')
+ META_WRITER = Avro::IO::DatumWriter.new(META_SCHEMA)
+ META_READER = Avro::IO::DatumReader.new(META_SCHEMA)
+
+ SYSTEM_ERROR_SCHEMA = Avro::Schema.parse('["string"]')
+
+ # protocol cache
+ REMOTE_HASHES = {}
+ REMOTE_PROTOCOLS = {}
+
+ BUFFER_HEADER_LENGTH = 4
+ BUFFER_SIZE = 8192
+
+ # Raised when an error message is sent by an Avro requestor or responder.
+ class AvroRemoteException < Avro::AvroError; end
+
+ class ConnectionClosedException < Avro::AvroError; end
+
+ class Requestor
+ """Base class for the client side of a protocol interaction."""
+ attr_reader :local_protocol, :transport
+ attr_accessor :remote_protocol, :remote_hash, :send_protocol
+
+ def initialize(local_protocol, transport)
+ @local_protocol = local_protocol
+ @transport = transport
+ @remote_protocol = nil
+ @remote_hash = nil
+ @send_protocol = nil
+ end
+
+ def remote_protocol=(new_remote_protocol)
+ @remote_protocol = new_remote_protocol
+ REMOTE_PROTOCOLS[transport.remote_name] = remote_protocol
+ end
+
+ def remote_hash=(new_remote_hash)
+ @remote_hash = new_remote_hash
+ REMOTE_HASHES[transport.remote_name] = remote_hash
+ end
+
+ def request(message_name, request_datum)
+ # Writes a request message and reads a response or error message.
+ # build handshake and call request
+ buffer_writer = StringIO.new('', 'w+')
+ buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer)
+ write_handshake_request(buffer_encoder)
+ write_call_request(message_name, request_datum, buffer_encoder)
+
+ # send the handshake and call request; block until call response
+ call_request = buffer_writer.string
+ call_response = transport.transceive(call_request)
+
+ # process the handshake and call response
+ buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_response))
+ if read_handshake_response(buffer_decoder)
+ read_call_response(message_name, buffer_decoder)
+ else
+ request(message_name, request_datum)
+ end
+ end
+
+ def write_handshake_request(encoder)
+ local_hash = local_protocol.md5
+ remote_name = transport.remote_name
+ remote_hash = REMOTE_HASHES[remote_name]
+ unless remote_hash
+ remote_hash = local_hash
+ self.remote_protocol = local_protocol
+ end
+ request_datum = {
+ 'clientHash' => local_hash,
+ 'serverHash' => remote_hash
+ }
+ if send_protocol
+ request_datum['clientProtocol'] = local_protocol.to_s
+ end
+ HANDSHAKE_REQUESTOR_WRITER.write(request_datum, encoder)
+ end
+
+ def write_call_request(message_name, request_datum, encoder)
+ # The format of a call request is:
+ # * request metadata, a map with values of type bytes
+ # * the message name, an Avro string, followed by
+ # * the message parameters. Parameters are serialized according to
+ # the message's request declaration.
+
+ # TODO request metadata (not yet implemented)
+ request_metadata = {}
+ META_WRITER.write(request_metadata, encoder)
+
+ message = local_protocol.messages[message_name]
+ unless message
+ raise AvroError, "Unknown message: #{message_name}"
+ end
+ encoder.write_string(message.name)
+
+ write_request(message.request, request_datum, encoder)
+ end
+
+ def write_request(request_schema, request_datum, encoder)
+ datum_writer = Avro::IO::DatumWriter.new(request_schema)
+ datum_writer.write(request_datum, encoder)
+ end
+
+ def read_handshake_response(decoder)
+ handshake_response = HANDSHAKE_REQUESTOR_READER.read(decoder)
+ case match = handshake_response['match']
+ when 'BOTH'
+ self.send_protocol = false
+ true
+ when 'CLIENT'
+ raise AvroError.new('Handshake failure. match == CLIENT') if send_protocol
+ self.remote_protocol = handshake_response['serverProtocol']
+ self.remote_hash = handshake_response['serverHash']
+ self.send_protocol = false
+ false
+ when 'NONE'
+ raise AvroError.new('Handshake failure. match == NONE') if send_protocol
+ self.remote_protocol = handshake_response['serverProtocol']
+ self.remote_hash = handshake_response['serverHash']
+ self.send_protocol = true
+ false
+ else
+ raise AvroError.new("Unexpected match: #{match}")
+ end
+ end
+
+ def read_call_response(message_name, decoder)
+ # The format of a call response is:
+ # * response metadata, a map with values of type bytes
+ # * a one-byte error flag boolean, followed by either:
+ # * if the error flag is false,
+ # the message response, serialized per the message's response schema.
+ # * if the error flag is true,
+ # the error, serialized per the message's error union schema.
+ response_metadata = META_READER.read(decoder)
+
+ # remote response schema
+ remote_message_schema = remote_protocol.messages[message_name]
+ raise AvroError.new("Unknown remote message: #{message_name}") unless remote_message_schema
+
+ # local response schema
+ local_message_schema = local_protocol.messages[message_name]
+ unless local_message_schema
+ raise AvroError.new("Unknown local message: #{message_name}")
+ end
+
+ # error flag
+ if !decoder.read_boolean
+ writers_schema = remote_message_schema.response
+ readers_schema = local_message_schema.response
+ read_response(writers_schema, readers_schema, decoder)
+ else
+ writers_schema = remote_message_schema.errors || SYSTEM_ERROR_SCHEMA
+ readers_schema = local_message_schema.errors || SYSTEM_ERROR_SCHEMA
+ raise read_error(writers_schema, readers_schema, decoder)
+ end
+ end
+
+ def read_response(writers_schema, readers_schema, decoder)
+ datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
+ datum_reader.read(decoder)
+ end
+
+ def read_error(writers_schema, readers_schema, decoder)
+ datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
+ AvroRemoteError.new(datum_reader.read(decoder))
+ end
+ end
+
+ # Base class for the server side of a protocol interaction.
+ class Responder
+ attr_reader :local_protocol, :local_hash, :protocol_cache
+ def initialize(local_protocol)
+ @local_protocol = local_protocol
+ @local_hash = self.local_protocol.md5
+ @protocol_cache = {}
+ protocol_cache[local_hash] = local_protocol
+ end
+
+ def respond(transport)
+ # Called by a server to deserialize a request, compute and serialize
+ # a response or error. Compare to 'handle()' in Thrift.
+
+ call_request = transport.read_framed_message
+ buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request))
+ buffer_writer = StringIO.new('', 'w+')
+ buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer)
+ error = nil
+ response_metadata = {}
+
+ begin
+ remote_protocol = process_handshake(transport, buffer_decoder, buffer_encoder)
+ # handshake failure
+ unless remote_protocol
+ return buffer_writer.string
+ end
+
+ # read request using remote protocol
+ request_metadata = META_READER.read(buffer_decoder)
+ remote_message_name = buffer_decoder.read_string
+
+ # get remote and local request schemas so we can do
+ # schema resolution (one fine day)
+ remote_message = remote_protocol.messages[remote_message_name]
+ unless remote_message
+ raise AvroError.new("Unknown remote message: #{remote_message_name}")
+ end
+ local_message = local_protocol.messages[remote_message_name]
+ unless local_message
+ raise AvroError.new("Unknown local message: #{remote_message_name}")
+ end
+ writers_schema = remote_message.request
+ readers_schema = local_message.request
+ request = read_request(writers_schema, readers_schema, buffer_decoder)
+ # perform server logic
+ begin
+ response = call(local_message, request)
+ rescue AvroRemoteError => e
+ error = e
+ rescue Exception => e
+ error = AvroRemoteError.new(e.to_s)
+ end
+
+ # write response using local protocol
+ META_WRITER.write(response_metadata, buffer_encoder)
+ buffer_encoder.write_boolean(!!error)
+ if error.nil?
+ writers_schema = local_message.response
+ write_response(writers_schema, response, buffer_encoder)
+ else
+ writers_schema = local_message.errors || SYSTEM_ERROR_SCHEMA
+ write_error(writers_schema, error, buffer_encoder)
+ end
+ rescue Avro::AvroError => e
+ error = AvroRemoteException.new(e.to_s)
+ buffer_encoder = Avro::IO::BinaryEncoder.new(StringIO.new)
+ META_WRITER.write(response_metadata, buffer_encoder)
+ buffer_encoder.write_boolean(true)
+ self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder)
+ end
+ buffer_writer.string
+ end
+
+ def process_handshake(transport, decoder, encoder)
+ handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder)
+ handshake_response = {}
+
+ # determine the remote protocol
+ client_hash = handshake_request['clientHash']
+ client_protocol = handshake_request['clientProtocol']
+ remote_protocol = protocol_cache[client_hash]
+ if !remote_protocol && client_protocol
+ remote_protocol = protocol.parse(client_protocol)
+ protocol_cache[client_hash] = remote_protocol
+ end
+
+ # evaluate remote's guess of the local protocol
+ server_hash = handshake_request['serverHash']
+ if local_hash == server_hash
+ if !remote_protocol
+ handshake_response['match'] = 'NONE'
+ else
+ handshake_response['match'] = 'BOTH'
+ end
+ else
+ if !remote_protocol
+ handshake_response['match'] = 'NONE'
+ else
+ handshake_response['match'] = 'CLIENT'
+ end
+ end
+
+ if handshake_response['match'] != 'BOTH'
+ handshake_response['serverProtocol'] = local_protocol.to_s
+ handshake_response['serverHash'] = local_hash
+ end
+
+ HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder)
+ remote_protocol
+ end
+
+ def call(local_message, request)
+ # Actual work done by server: cf. handler in thrift.
+ raise NotImplementedError
+ end
+
+ def read_request(writers_schema, readers_schema, decoder)
+ datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
+ datum_reader.read(decoder)
+ end
+
+ def write_response(writers_schema, response_datum, encoder)
+ datum_writer = Avro::IO::DatumWriter.new(writers_schema)
+ datum_writer.write(response_datum, encoder)
+ end
+
+ def write_error(writers_schema, error_exception, encoder)
+ datum_writer = Avro::IO::DatumWriter.new(writers_schema)
+ datum_writer.write(error_exception.to_s, encoder)
+ end
+ end
+
+ class SocketTransport
+ # A simple socket-based Transport implementation.
+
+ attr_reader :sock, :remote_name
+
+ def initialize(sock)
+ @sock = sock
+ end
+
+ def transceive(request)
+ write_framed_message(request)
+ read_framed_message
+ end
+
+ def read_framed_message
+ message = []
+ loop do
+ buffer = StringIO.new
+ buffer_length = read_buffer_length
+ if buffer_length == 0
+ return message.join
+ end
+ while buffer.tell < buffer_length
+ chunk = sock.read(buffer_length - buffer.tell)
+ if chunk == ''
+ raise ConnectionClosedException.new("Socket read 0 bytes.")
+ end
+ buffer.write(chunk)
+ end
+ message << buffer.string
+ end
+ end
+
+ def write_framed_message(message)
+ message_length = message.size
+ total_bytes_sent = 0
+ while message_length - total_bytes_sent > 0
+ if message_length - total_bytes_sent > BUFFER_SIZE:
+ buffer_length = BUFFER_SIZE
+ else
+ buffer_length = message_length - total_bytes_sent
+ end
+ write_buffer(message[total_bytes_sent,buffer_length])
+ total_bytes_sent += buffer_length
+ end
+ # A message is always terminated by a zero-length buffer.
+ write_buffer_length(0)
+ end
+
+ def write_buffer(chunk)
+ buffer_length = chunk.size
+ write_buffer_length(buffer_length)
+ total_bytes_sent = 0
+ while total_bytes_sent < buffer_length
+ bytes_sent = self.sock.write(chunk[total_bytes_sent..-1])
+ if bytes_sent == 0
+ raise ConnectionClosedException.new("Socket sent 0 bytes.")
+ end
+ total_bytes_sent += bytes_sent
+ end
+ end
+
+ def write_buffer_length(n)
+ bytes_sent = sock.write([n].pack('I'))
+ if bytes_sent == 0
+ raise ConnectionClosedException.new("socket sent 0 bytes")
+ end
+ end
+
+ def read_buffer_length
+ read = sock.read(BUFFER_HEADER_LENGTH)
+ if read == '' || read == nil
+ raise ConnectionClosedException.new("Socket read 0 bytes.")
+ end
+ read.unpack('I')[0]
+ end
+
+ def close
+ sock.close
+ end
+ end
+end
Added: hadoop/avro/trunk/lang/ruby/lib/avro/protocol.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro/protocol.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/lib/avro/protocol.rb (added)
+++ hadoop/avro/trunk/lang/ruby/lib/avro/protocol.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,160 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+module Avro
+ class Protocol
+ VALID_TYPE_SCHEMA_TYPES = Set.new(%w[enum record error fixed])
+ class ProtocolParseError < Avro::AvroError; end
+
+ attr_reader :name, :namespace, :types, :messages, :md5
+ def self.parse(protocol_string)
+ json_data = Yajl.load(protocol_string)
+
+ if json_data.is_a? Hash
+ name = json_data['protocol']
+ namespace = json_data['namespace']
+ types = json_data['types']
+ messages = json_data['messages']
+ Protocol.new(name, namespace, types, messages)
+ else
+ raise ProtocolParseError, "Not a JSON object: #{json_data}"
+ end
+ end
+
+ def initialize(name, namespace=nil, types=nil, messages=nil)
+ # Ensure valid ctor args
+ if !name
+ raise ProtocolParseError, 'Protocols must have a non-empty name.'
+ elsif !name.is_a?(String)
+ raise ProtocolParseError, 'The name property must be a string.'
+ elsif !namespace.is_a?(String)
+ raise ProtocolParseError, 'The namespace property must be a string.'
+ elsif !types.is_a?(Array)
+ raise ProtocolParseError, 'The types property must be a list.'
+ elsif !messages.is_a?(Hash)
+ raise ProtocolParseError, 'The messages property must be a JSON object.'
+ end
+
+ @name = name
+ @namespace = namespace
+ type_names = {}
+ @types = parse_types(types, type_names)
+ @messages = parse_messages(messages, type_names)
+ @md5 = Digest::MD5.digest(to_s)
+ end
+
+ def to_s
+ Yajl.dump to_hash
+ end
+
+ def ==(other)
+ to_hash == Yajl.load(other.to_s)
+ end
+
+ private
+ def parse_types(types, type_names)
+ type_objects = []
+ types.collect do |type|
+ # FIXME adding type.name to type_names is not defined in the
+ # spec. Possible bug in the python impl and the spec.
+ type_object = Schema.real_parse(type, type_names)
+ unless VALID_TYPE_SCHEMA_TYPES.include?(type_object.type)
+ msg = "Type #{type} not an enum, record, fixed or error."
+ raise ProtocolParseError, msg
+ end
+ type_object
+ end
+ end
+
+ def parse_messages(messages, names)
+ message_objects = {}
+ messages.each do |name, body|
+ if message_objects.has_key?(name)
+ raise ProtocolParseError, "Message name \"#{name}\" repeated."
+ elsif !body.is_a?(Hash)
+ raise ProtocolParseError, "Message name \"#{name}\" has non-object body #{body.inspect}"
+ end
+
+ request = body['request']
+ response = body['response']
+ errors = body['errors']
+ message_objects[name] = Message.new(name, request, response, errors, names)
+ end
+ message_objects
+ end
+
+ def to_hash
+ hsh = {'protocol' => name}
+ hsh['namespace'] = namespace if namespace
+ hsh['types'] = types.map{|t| Yajl.load(t.to_s) } if types
+
+ if messages
+ hsh['messages'] = messages.collect_hash{|k,t| [k, Yajl.load(t.to_s)] }
+ end
+
+ hsh
+ end
+
+ class Message
+ attr_reader :name, :response_from_names, :request, :response, :errors
+ def initialize(name, request, response, errors=nil, names=nil)
+ @name = name
+ @response_from_names = false
+
+ @request = parse_request(request, names)
+ @response = parse_response(response, names)
+ @errors = parse_errors(errors, names) if errors
+ end
+
+ def to_s
+ hsh = {'request' => Yajl.load(request.to_s)}
+ if response_from_names
+ hsh['response'] = response.fullname
+ else
+ hsh['response'] = Yajl.load(response.to_s)
+ end
+
+ if errors
+ hsh['errors'] = Yajl.load(errors.to_s)
+ end
+ Yajl.dump hsh
+ end
+
+ def parse_request(request, names)
+ unless request.is_a?(Array)
+ raise ProtocolParseError, "Request property not an Array: #{request.inspect}"
+ end
+ Schema::RecordSchema.new(nil, nil, request, names, 'request')
+ end
+
+ def parse_response(response, names)
+ if response.is_a?(String) && names[response]
+ @response_from_names = true
+ names[response]
+ else
+ Schema.real_parse(response, names)
+ end
+ end
+
+ def parse_errors(errors, names)
+ unless errors.is_a?(Array)
+ raise ProtocolParseError, "Errors property not an Array: #{errors}"
+ end
+ Schema.real_parse(errors, names)
+ end
+ end
+ end
+end
Added: hadoop/avro/trunk/lang/ruby/lib/avro/schema.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro/schema.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/lib/avro/schema.rb (added)
+++ hadoop/avro/trunk/lang/ruby/lib/avro/schema.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,431 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+module Avro
+ class Schema
+ # FIXME turn these into symbols to prevent some gc pressure
+ PRIMITIVE_TYPES = Set.new(%w[null boolean string bytes int long float double])
+ NAMED_TYPES = Set.new(%w[fixed enum record error])
+
+ VALID_TYPES = PRIMITIVE_TYPES + NAMED_TYPES + Set.new(%w[array map union request])
+
+ INT_MIN_VALUE = -(1 << 31)
+ INT_MAX_VALUE = (1 << 31) - 1
+ LONG_MIN_VALUE = -(1 << 63)
+ LONG_MAX_VALUE = (1 << 63) - 1
+
+ def self.parse(json_string)
+ real_parse(Yajl.load(json_string), {})
+ end
+
+ # Build Avro Schema from data parsed out of JSON string.
+ def self.real_parse(json_obj, names=nil)
+ if json_obj.is_a? Hash
+ type = json_obj['type']
+ if PRIMITIVE_TYPES.include?(type)
+ return PrimitiveSchema.new(type)
+ elsif NAMED_TYPES.include? type
+ name = json_obj['name']
+ namespace = json_obj['namespace']
+ case type
+ when 'fixed'
+ size = json_obj['size']
+ return FixedSchema.new(name, namespace, size, names)
+ when 'enum'
+ symbols = json_obj['symbols']
+ return EnumSchema.new(name, namespace, symbols, names)
+ when 'record', 'error'
+ fields = json_obj['fields']
+ return RecordSchema.new(name, namespace, fields, names, type)
+ else
+ raise SchemaParseError.new("Unknown Named Type: #{type}")
+ end
+ elsif VALID_TYPES.include?(type)
+ case type
+ when 'array'
+ return ArraySchema.new(json_obj['items'], names)
+ when 'map'
+ return MapSchema.new(json_obj['values'], names)
+ else
+ raise SchemaParseError.new("Unknown Valid Type: #{type}")
+ end
+ elsif type.nil?
+ raise SchemaParseError.new("No \"type\" property: #{json_obj}")
+ else
+ raise SchemaParseError.new("Undefined type: #{type}")
+ end
+ elsif json_obj.is_a? Array
+ # JSON array (union)
+ return UnionSchema.new(json_obj, names)
+ elsif PRIMITIVE_TYPES.include? json_obj
+ return PrimitiveSchema.new(json_obj)
+ else
+ msg = "Could not make an Avro Schema object from #{json_obj}"
+ raise SchemaParseError.new(msg)
+ end
+ end
+
+ # Determine if a ruby datum is an instance of a schema
+ def self.validate(expected_schema, datum)
+ case expected_schema.type
+ when 'null'
+ datum.nil?
+ when 'boolean'
+ datum == true || datum == false
+ when 'string', 'bytes'
+ datum.is_a? String
+ when 'int'
+ (datum.is_a?(Fixnum) || datum.is_a?(Bignum)) &&
+ (INT_MIN_VALUE <= datum) && (datum <= INT_MAX_VALUE)
+ when 'long'
+ (datum.is_a?(Fixnum) || datum.is_a?(Bignum)) &&
+ (LONG_MIN_VALUE <= datum) && (datum <= LONG_MAX_VALUE)
+ when 'float', 'double'
+ datum.is_a?(Float) || datum.is_a?(Fixnum) || datum.is_a?(Bignum)
+ when 'fixed'
+ datum.is_a?(String) && datum.size == expected_schema.size
+ when 'enum'
+ expected_schema.symbols.include? datum
+ when 'array'
+ datum.is_a?(Array) &&
+ datum.all?{|d| validate(expected_schema.items, d) }
+ when 'map':
+ datum.keys.all?{|k| k.is_a? String } &&
+ datum.values.all?{|v| validate(expected_schema.values, v) }
+ when 'union'
+ expected_schema.schemas.any?{|s| validate(s, datum) }
+ when 'record', 'error', 'request'
+ datum.is_a?(Hash) &&
+ expected_schema.fields.all?{|f| validate(f.type, datum[f.name]) }
+ else
+ raise "you suck #{expected_schema.inspect} is not allowed."
+ end
+ end
+
+ def initialize(type)
+ @type = type
+ end
+
+ def type; @type; end
+
+ def ==(other, seen=nil)
+ other.is_a?(Schema) && @type == other.type
+ end
+
+ def hash(seen=nil)
+ @type.hash
+ end
+
+ def to_hash
+ {'type' => @type}
+ end
+
+ def to_s
+ Yajl.dump to_hash
+ end
+
+ class NamedSchema < Schema
+ attr_reader :name, :namespace
+ def initialize(type, name, namespace=nil, names=nil)
+ super(type)
+ @name, @namespace = Name.extract_namespace(name, namespace)
+ names = Name.add_name(names, self)
+ end
+
+ def to_hash
+ props = {'name' => @name}
+ props.merge!('namespace' => @namespace) if @namespace
+ super.merge props
+ end
+
+ def fullname
+ Name.make_fullname(@name, @namespace)
+ end
+ end
+
+ class RecordSchema < NamedSchema
+ attr_reader :fields
+
+ def self.make_field_objects(field_data, names)
+ field_objects, field_names = [], Set.new
+ field_data.each_with_index do |field, i|
+ if field.respond_to?(:[]) # TODO(jmhodges) wtffffff
+ type = field['type']
+ name = field['name']
+ default = field['default']
+ order = field['order']
+ new_field = Field.new(type, name, default, order, names)
+ # make sure field name has not been used yet
+ if field_names.include?(new_field.name)
+ raise SchemaParseError, "Field name #{new_field.name.inspect} is already in use"
+ end
+ field_names << new_field.name
+ else
+ raise SchemaParseError, "Not a valid field: #{field}"
+ end
+ field_objects << new_field
+ end
+ field_objects
+ end
+
+ def initialize(name, namespace, fields, names=nil, schema_type='record')
+ if schema_type == 'request'
+ @type = schema_type
+ else
+ super(schema_type, name, namespace, names)
+ end
+ @fields = RecordSchema.make_field_objects(fields, names)
+ end
+
+ def fields_hash
+ fields.inject({}){|hsh, field| hsh[field.name] = field; hsh }
+ end
+
+ def to_hash
+ hsh = super.merge('fields' => @fields.map {|f|Yajl.load(f.to_s)} )
+ if type == 'request'
+ hsh['fields']
+ else
+ hsh
+ end
+ end
+ end
+
+ class ArraySchema < Schema
+ attr_reader :items, :items_schema_from_names
+ def initialize(items, names=nil)
+ @items_schema_from_names = false
+
+ super('array')
+
+ if items.is_a?(String) && names.has_key?(items)
+ @items = names[items]
+ @items_schema_from_names = true
+ else
+ begin
+ @items = Schema.real_parse(items, names)
+ rescue => e
+ msg = "Items schema not a valid Avro schema" + e.to_s
+ raise SchemaParseError, msg
+ end
+ end
+ end
+
+ def to_hash
+ name_or_json = if items_schema_from_names
+ items.fullname
+ else
+ Yajl.load(items.to_s)
+ end
+ super.merge('items' => name_or_json)
+ end
+ end
+
+ class MapSchema < Schema
+ attr_reader :values, :values_schema_from_names
+
+ def initialize(values, names=nil)
+ @values_schema_from_names = false
+ super('map')
+ if values.is_a?(String) && names.has_key?(values)
+ values_schema = names[values]
+ @values_schema_from_names = true
+ else
+ begin
+ values_schema = Schema.real_parse(values, names)
+ rescue => e
+ raise SchemaParseError.new('Values schema not a valid Avro schema.' + e.to_s)
+ end
+ end
+ @values = values_schema
+ end
+
+ def to_hash
+ to_dump = super
+ if values_schema_from_names
+ to_dump['values'] = values
+ else
+ to_dump['values'] = Yajl.load(values.to_s)
+ end
+ to_dump
+ end
+ end
+
+ class UnionSchema < Schema
+ attr_reader :schemas, :schema_from_names_indices
+ def initialize(schemas, names=nil)
+ super('union')
+
+ schema_objects = []
+ @schema_from_names_indices = []
+ schemas.each_with_index do |schema, i|
+ from_names = false
+ if schema.is_a?(String) && names.has_key?(schema)
+ new_schema = names[schema]
+ from_names = true
+ else
+ begin
+ new_schema = Schema.real_parse(schema, names)
+ rescue
+ raise SchemaParseError, 'Union item must be a valid Avro schema'
+ end
+ end
+
+ ns_type = new_schema.type
+ if VALID_TYPES.include?(ns_type) &&
+ !NAMED_TYPES.include?(ns_type) &&
+ schema_objects.map(&:type).include?(ns_type)
+ raise SchemaParseError, "#{ns_type} is already in Union"
+ elsif ns_type == 'union'
+ raise SchemaParseError, "Unions cannot contain other unions"
+ else
+ schema_objects << new_schema
+ @schema_from_names_indices << i if from_names
+ end
+ @schemas = schema_objects
+ end
+ end
+
+ def to_s
+ # FIXME(jmhodges) this from_name pattern is really weird and
+ # seems code-smelly.
+ to_dump = []
+ schemas.each_with_index do |schema, i|
+ if schema_from_names_indices.include?(i)
+ to_dump << schema.fullname
+ else
+ to_dump << Yajl.load(schema.to_s)
+ end
+ end
+ Yajl.dump(to_dump)
+ end
+ end
+
+ class EnumSchema < NamedSchema
+ attr_reader :symbols
+ def initialize(name, space, symbols, names=nil)
+ if symbols.uniq.length < symbols.length
+ fail_msg = 'Duplicate symbol: %s' % symbols
+ raise Avro::SchemaParseError, fail_msg
+ end
+ super('enum', name, space, names)
+ @symbols = symbols
+ end
+
+ def to_hash
+ super.merge('symbols' => symbols)
+ end
+ end
+
+ # Valid primitive types are in PRIMITIVE_TYPES.
+ class PrimitiveSchema < Schema
+ def initialize(type)
+ unless PRIMITIVE_TYPES.include? type
+ raise AvroError.new("#{type} is not a valid primitive type.")
+ end
+
+ super(type)
+ end
+
+ def to_s
+ to_hash.size == 1 ? type.inspect : Yajl.dump(to_hash)
+ end
+ end
+
+ class FixedSchema < NamedSchema
+ attr_reader :size
+ def initialize(name, space, size, names=nil)
+ # Ensure valid cto args
+ unless size.is_a?(Fixnum) || size.is_a?(Bignum)
+ raise AvroError, 'Fixed Schema requires a valid integer for size property.'
+ end
+ super('fixed', name, space, names)
+ @size = size
+ end
+
+ def to_hash
+ super.merge('size' => @size)
+ end
+ end
+
+ class Field
+ attr_reader :type, :name, :default, :order, :type_from_names
+ def initialize(type, name, default=nil, order=nil, names=nil)
+ @type_from_names = false
+ if type.is_a?(String) && names && names.has_key?(type)
+ type_schema = names[type]
+ @type_from_names = true
+ else
+ type_schema = Schema.real_parse(type, names)
+ end
+ @type = type_schema
+ @name = name
+ @default = default
+ @order = order
+ end
+
+ def to_hash
+ sigh_type = type_from_names ? type.fullname : Yajl.load(type.to_s)
+ hsh = {
+ 'name' => name,
+ 'type' => sigh_type
+ }
+ hsh['default'] = default if default
+ hsh['order'] = order if order
+ hsh
+ end
+
+ def to_s
+ Yajl.dump(to_hash)
+ end
+ end
+ end
+
+ class SchemaParseError < AvroError; end
+
+ module Name
+ def self.extract_namespace(name, namespace)
+ parts = name.split('.')
+ if parts.size > 1
+ namespace, name = parts[0..-2].join('.'), parts.last
+ end
+ return name, namespace
+ end
+
+ # Add a new schema object to the names dictionary (in place).
+ def self.add_name(names, new_schema)
+ new_fullname = new_schema.fullname
+ if Avro::Schema::VALID_TYPES.include?(new_fullname)
+ raise SchemaParseError, "#{new_fullname} is a reserved type name."
+ elsif names.nil?
+ names = {}
+ elsif names.has_key?(new_fullname)
+ raise SchemaParseError, "The name \"#{new_fullname}\" is already in use."
+ end
+
+ names[new_fullname] = new_schema
+ names
+ end
+
+ def self.make_fullname(name, namespace)
+ if !name.include?('.') && !namespace.nil?
+ namespace + '.' + name
+ else
+ name
+ end
+ end
+ end
+end
Added: hadoop/avro/trunk/lang/ruby/test/sample_ipc_client.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/test/sample_ipc_client.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/test/sample_ipc_client.rb (added)
+++ hadoop/avro/trunk/lang/ruby/test/sample_ipc_client.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,86 @@
+#!/usr/bin/env ruby
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'socket'
+require 'avro'
+
+MAIL_PROTOCOL_JSON = <<-JSON
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+ {"name": "Message", "type": "record",
+ "fields": [
+ {"name": "to", "type": "string"},
+ {"name": "from", "type": "string"},
+ {"name": "body", "type": "string"}
+ ]
+ }
+ ],
+
+ "messages": {
+ "send": {
+ "request": [{"name": "message", "type": "Message"}],
+ "response": "string"
+ },
+ "replay": {
+ "request": [],
+ "response": "string"
+ }
+ }
+}
+JSON
+
+MAIL_PROTOCOL = Avro::Protocol.parse(MAIL_PROTOCOL_JSON)
+
+def make_requestor(server_address, port, protocol)
+ sock = TCPSocket.new(server_address, port)
+ client = Avro::IPC::SocketTransport.new(sock)
+ Avro::IPC::Requestor.new(protocol, client)
+end
+
+if $0 == __FILE__
+ if ![3, 4].include?(ARGV.length)
+ raise "Usage: <to> <from> <body> [<count>]"
+ end
+
+ # client code - attach to the server and send a message
+ # fill in the Message record
+ message = {
+ 'to' => ARGV[0],
+ 'from' => ARGV[1],
+ 'body' => ARGV[2]
+ }
+
+ num_messages = ARGV[3].to_i
+ num_message = 1 if num_messages == 0
+
+ # build the parameters for the request
+ params = {'message' => message}
+
+ # send the requests and print the result
+ num_messages.times do
+ requestor = make_requestor('localhost', 9090, MAIL_PROTOCOL)
+ result = requestor.request('send', params)
+ puts("Result: " + result)
+ end
+
+ # try out a replay message
+ requestor = make_requestor('localhost', 9090, MAIL_PROTOCOL)
+ result = requestor.request('replay', {})
+ puts("Replay Result: " + result)
+end
\ No newline at end of file
Added: hadoop/avro/trunk/lang/ruby/test/sample_ipc_server.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/test/sample_ipc_server.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/test/sample_ipc_server.rb (added)
+++ hadoop/avro/trunk/lang/ruby/test/sample_ipc_server.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,91 @@
+#!/usr/bin/env ruby
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'socket'
+require 'avro'
+
+MAIL_PROTOCOL_JSON = <<-EOS
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+ {"name": "Message", "type": "record",
+ "fields": [
+ {"name": "to", "type": "string"},
+ {"name": "from", "type": "string"},
+ {"name": "body", "type": "string"}
+ ]
+ }
+ ],
+
+ "messages": {
+ "send": {
+ "request": [{"name": "message", "type": "Message"}],
+ "response": "string"
+ },
+ "replay": {
+ "request": [],
+ "response": "string"
+ }
+ }
+}
+EOS
+
+MAIL_PROTOCOL = Avro::Protocol.parse(MAIL_PROTOCOL_JSON)
+
+class MailResponder < Avro::IPC::Responder
+ def initialize
+ super(MAIL_PROTOCOL)
+ end
+
+ def call(message, request)
+ if message.name == 'send'
+ request_content = request['message']
+ "Sent message to #{request_content['to']} from #{request_content['from']} with body #{request_content['body']}"
+ elsif message.name == 'replay'
+ 'replay'
+ end
+ end
+end
+
+class RequestHandler
+ def initialize(address, port)
+ @ip_address = address
+ @port = port
+ end
+
+ def run
+ server = TCPServer.new(@ip_address, @port)
+ while (session = server.accept)
+ handle(session)
+ session.close
+ end
+ end
+end
+
+class MailHandler < RequestHandler
+ def handle(request)
+ responder = MailResponder.new()
+ transport = Avro::IPC::SocketTransport.new(request)
+ transport.write_framed_message(responder.respond(transport))
+ end
+end
+
+if $0 == __FILE__
+ handler = MailHandler.new('localhost', 9090)
+ handler.run
+end
\ No newline at end of file
Added: hadoop/avro/trunk/lang/ruby/test/test_help.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/test/test_help.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/test/test_help.rb (added)
+++ hadoop/avro/trunk/lang/ruby/test/test_help.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'rubygems'
+require 'test/unit'
+require 'avro'
+require 'stringio'
+
+require 'fileutils'
+FileUtils.mkdir_p('tmp')
+
+class RandomData
+ def initialize(schm, seed=nil)
+ srand(seed) if seed
+ @seed = seed
+ @schm = schm
+ end
+
+ def next
+ nextdata(@schm)
+ end
+
+ def nextdata(schm, d=0)
+ case schm.type
+ when 'boolean'
+ rand > 0.5
+ when 'string'
+ randstr()
+ when 'int'
+ rand(Avro::Schema::INT_MAX_VALUE - Avro::Schema::INT_MIN_VALUE) + Avro::Schema::INT_MIN_VALUE
+ when 'long'
+ rand(Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) + Avro::Schema::LONG_MIN_VALUE
+ when 'float'
+ (-1024 + 2048 * rand).round.to_f
+ when 'double'
+ Avro::Schema::LONG_MIN_VALUE + (Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) * rand
+ when 'bytes'
+ randstr(BYTEPOOL)
+ when 'null'
+ nil
+ when 'array'
+ arr = []
+ len = rand(5) + 2 - d
+ len = 0 if len < 0
+ len.times{ arr << nextdata(schm.items, d+1) }
+ arr
+ when 'map'
+ map = {}
+ len = rand(5) + 2 - d
+ len = 0 if len < 0
+ len.times do
+ map[nextdata(Avro::Schema::PrimitiveSchema.new('string'))] = nextdata(schm.values, d+1)
+ end
+ map
+ when 'record'
+ m = {}
+ schm.fields.each do |field|
+ m[field.name] = nextdata(field.type, d+1)
+ end
+ m
+ when 'union'
+ types = schm.schemas
+ nextdata(types[rand(types.size)], d)
+ when 'enum'
+ symbols = schm.symbols
+ len = symbols.size
+ return nil if len == 0
+ symbols[rand(len)]
+ when 'fixed'
+ BYTEPOOL[rand(BYTEPOOL.size), 1]
+ end
+ end
+
+ CHARPOOL = 'abcdefghjkmnpqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ23456789'
+ BYTEPOOL = '12345abcd'
+
+ def randstr(chars=CHARPOOL, length=20)
+ str = ''
+ rand(length+1).times { str << chars[rand(chars.size)] }
+ str
+ end
+end