You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/03/01 20:02:18 UTC
svn commit: r1451689 - in /avro/trunk: CHANGES.txt lang/ruby/lib/avro/io.rb
lang/ruby/lib/avro/protocol.rb lang/ruby/lib/avro/schema.rb
lang/ruby/test/random_data.rb
Author: cutting
Date: Fri Mar 1 19:02:18 2013
New Revision: 1451689
URL: http://svn.apache.org/r1451689
Log:
AVRO-1260. Ruby: Improve read performance. Contributed by Martin Kleppmann.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/ruby/lib/avro/io.rb
avro/trunk/lang/ruby/lib/avro/protocol.rb
avro/trunk/lang/ruby/lib/avro/schema.rb
avro/trunk/lang/ruby/test/random_data.rb
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1451689&r1=1451688&r2=1451689&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Mar 1 19:02:18 2013
@@ -6,6 +6,8 @@ Trunk (not yet released)
IMPROVEMENTS
+ AVRO-1260. Ruby: Improve read performance. (Martin Kleppmann via cutting)
+
BUG FIXES
Avro 1.7.4 (22 February 2012)
Modified: avro/trunk/lang/ruby/lib/avro/io.rb
URL: http://svn.apache.org/viewvc/avro/trunk/lang/ruby/lib/avro/io.rb?rev=1451689&r1=1451688&r2=1451689&view=diff
==============================================================================
--- avro/trunk/lang/ruby/lib/avro/io.rb (original)
+++ avro/trunk/lang/ruby/lib/avro/io.rb Fri Mar 1 19:02:18 2013
@@ -220,51 +220,43 @@ module Avro
end
class DatumReader
- def self.check_props(schema_one, schema_two, prop_list)
- prop_list.all? do |prop|
- schema_one.send(prop) == schema_two.send(prop)
- end
- end
-
def self.match_schemas(writers_schema, readers_schema)
- w_type = writers_schema.type
- r_type = readers_schema.type
+ w_type = writers_schema.type_sym
+ r_type = readers_schema.type_sym
# This conditional is begging for some OO love.
- if w_type == 'union' || r_type == 'union'
+ if w_type == :union || r_type == :union
return true
end
if w_type == r_type
- if Schema::PRIMITIVE_TYPES.include?(w_type) &&
- Schema::PRIMITIVE_TYPES.include?(r_type)
- return true
- end
+ return true if Schema::PRIMITIVE_TYPES_SYM.include?(r_type)
case r_type
- when 'record'
- return check_props(writers_schema, readers_schema, [:fullname])
- when 'error'
- return check_props(writers_schema, readers_schema, [:fullname])
- when 'request'
+ when :record
+ return writers_schema.fullname == readers_schema.fullname
+ when :error
+ return writers_schema.fullname == readers_schema.fullname
+ when :request
return true
- when 'fixed'
- return check_props(writers_schema, readers_schema, [:fullname, :size])
- when 'enum'
- return check_props(writers_schema, readers_schema, [:fullname])
- when 'map'
- return check_props(writers_schema.values, readers_schema.values, [:type])
- when 'array'
- return check_props(writers_schema.items, readers_schema.items, [:type])
+ when :fixed
+ return writers_schema.fullname == readers_schema.fullname &&
+ writers_schema.size == readers_schema.size
+ when :enum
+ return writers_schema.fullname == readers_schema.fullname
+ when :map
+ return writers_schema.values.type == readers_schema.values.type
+ when :array
+ return writers_schema.items.type == readers_schema.items.type
end
end
# Handle schema promotion
- if w_type == 'int' && ['long', 'float', 'double'].include?(r_type)
+ if w_type == :int && [:long, :float, :double].include?(r_type)
return true
- elsif w_type == 'long' && ['float', 'double'].include?(r_type)
+ elsif w_type == :long && [:float, :double].include?(r_type)
return true
- elsif w_type == 'float' && r_type == 'double'
+ elsif w_type == :float && r_type == :double
return true
end
@@ -291,7 +283,7 @@ module Avro
# schema resolution: reader's schema is a union, writer's
# schema is not
- if writers_schema.type != 'union' && readers_schema.type == 'union'
+ if writers_schema.type_sym != :union && readers_schema.type_sym == :union
rs = readers_schema.schemas.find{|s|
self.class.match_schemas(writers_schema, s)
}
@@ -301,21 +293,21 @@ module Avro
# function dispatch for reading data based on type of writer's
# schema
- case writers_schema.type
- when 'null'; decoder.read_null
- when 'boolean'; decoder.read_boolean
- when 'string'; decoder.read_string
- when 'int'; decoder.read_int
- when 'long'; decoder.read_long
- when 'float'; decoder.read_float
- when 'double'; decoder.read_double
- when 'bytes'; decoder.read_bytes
- when 'fixed'; read_fixed(writers_schema, readers_schema, decoder)
- when 'enum'; read_enum(writers_schema, readers_schema, decoder)
- when 'array'; read_array(writers_schema, readers_schema, decoder)
- when 'map'; read_map(writers_schema, readers_schema, decoder)
- when 'union'; read_union(writers_schema, readers_schema, decoder)
- when 'record', 'error', 'request'; read_record(writers_schema, readers_schema, decoder)
+ case writers_schema.type_sym
+ when :null; decoder.read_null
+ when :boolean; decoder.read_boolean
+ when :string; decoder.read_string
+ when :int; decoder.read_int
+ when :long; decoder.read_long
+ when :float; decoder.read_float
+ when :double; decoder.read_double
+ when :bytes; decoder.read_bytes
+ when :fixed; read_fixed(writers_schema, readers_schema, decoder)
+ when :enum; read_enum(writers_schema, readers_schema, decoder)
+ when :array; read_array(writers_schema, readers_schema, decoder)
+ when :map; read_map(writers_schema, readers_schema, decoder)
+ when :union; read_union(writers_schema, readers_schema, decoder)
+ when :record, :error, :request; read_record(writers_schema, readers_schema, decoder)
else
raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}"
end
@@ -416,34 +408,34 @@ module Avro
def read_default_value(field_schema, default_value)
# Basically a JSON Decoder?
- case field_schema.type
- when 'null'
+ case field_schema.type_sym
+ when :null
return nil
- when 'boolean'
+ when :boolean
return default_value
- when 'int', 'long'
+ when :int, :long
return Integer(default_value)
- when 'float', 'double'
+ when :float, :double
return Float(default_value)
- when 'enum', 'fixed', 'string', 'bytes'
+ when :enum, :fixed, :string, :bytes
return default_value
- when 'array'
+ when :array
read_array = []
default_value.each do |json_val|
item_val = read_default_value(field_schema.items, json_val)
read_array << item_val
end
return read_array
- when 'map'
+ when :map
read_map = {}
default_value.each do |key, json_val|
map_val = read_default_value(field_schema.values, json_val)
read_map[key] = map_val
end
return read_map
- when 'union'
+ when :union
return read_default_value(field_schema.schemas[0], default_value)
- when 'record', 'error'
+ when :record, :error
read_record = {}
field_schema.fields.each do |field|
json_val = default_value[field.name]
@@ -459,37 +451,37 @@ module Avro
end
def skip_data(writers_schema, decoder)
- case writers_schema.type
- when 'null'
+ case writers_schema.type_sym
+ when :null
decoder.skip_null
- when 'boolean'
+ when :boolean
decoder.skip_boolean
- when 'string'
+ when :string
decoder.skip_string
- when 'int'
+ when :int
decoder.skip_int
- when 'long'
+ when :long
decoder.skip_long
- when 'float'
+ when :float
decoder.skip_float
- when 'double'
+ when :double
decoder.skip_double
- when 'bytes'
+ when :bytes
decoder.skip_bytes
- when 'fixed'
+ when :fixed
skip_fixed(writers_schema, decoder)
- when 'enum'
+ when :enum
skip_enum(writers_schema, decoder)
- when 'array'
+ when :array
skip_array(writers_schema, decoder)
- when 'map'
+ when :map
skip_map(writers_schema, decoder)
- when 'union'
+ when :union
skip_union(writers_schema, decoder)
- when 'record', 'error', 'request'
+ when :record, :error, :request
skip_record(writers_schema, decoder)
else
- raise AvroError, "Unknown schema type: #{schm.type}"
+ raise AvroError, "Unknown schema type: #{writers_schema.type}"
end
end
@@ -552,21 +544,21 @@ module Avro
end
# function dispatch to write datum
- case writers_schema.type
- when 'null'; encoder.write_null(datum)
- when 'boolean'; encoder.write_boolean(datum)
- when 'string'; encoder.write_string(datum)
- when 'int'; encoder.write_int(datum)
- when 'long'; encoder.write_long(datum)
- when 'float'; encoder.write_float(datum)
- when 'double'; encoder.write_double(datum)
- when 'bytes'; encoder.write_bytes(datum)
- when 'fixed'; write_fixed(writers_schema, datum, encoder)
- when 'enum'; write_enum(writers_schema, datum, encoder)
- when 'array'; write_array(writers_schema, datum, encoder)
- when 'map'; write_map(writers_schema, datum, encoder)
- when 'union'; write_union(writers_schema, datum, encoder)
- when 'record', 'error', 'request'; write_record(writers_schema, datum, encoder)
+ case writers_schema.type_sym
+ when :null; encoder.write_null(datum)
+ when :boolean; encoder.write_boolean(datum)
+ when :string; encoder.write_string(datum)
+ when :int; encoder.write_int(datum)
+ when :long; encoder.write_long(datum)
+ when :float; encoder.write_float(datum)
+ when :double; encoder.write_double(datum)
+ when :bytes; encoder.write_bytes(datum)
+ when :fixed; write_fixed(writers_schema, datum, encoder)
+ when :enum; write_enum(writers_schema, datum, encoder)
+ when :array; write_array(writers_schema, datum, encoder)
+ when :map; write_map(writers_schema, datum, encoder)
+ when :union; write_union(writers_schema, datum, encoder)
+ when :record, :error, :request; write_record(writers_schema, datum, encoder)
else
raise AvroError.new("Unknown type: #{writers_schema.type}")
end
Modified: avro/trunk/lang/ruby/lib/avro/protocol.rb
URL: http://svn.apache.org/viewvc/avro/trunk/lang/ruby/lib/avro/protocol.rb?rev=1451689&r1=1451688&r2=1451689&view=diff
==============================================================================
--- avro/trunk/lang/ruby/lib/avro/protocol.rb (original)
+++ avro/trunk/lang/ruby/lib/avro/protocol.rb Fri Mar 1 19:02:18 2013
@@ -17,6 +17,7 @@
module Avro
class Protocol
VALID_TYPE_SCHEMA_TYPES = Set.new(%w[enum record error fixed])
+ VALID_TYPE_SCHEMA_TYPES_SYM = Set.new(VALID_TYPE_SCHEMA_TYPES.map(&:to_sym))
class ProtocolParseError < Avro::AvroError; end
attr_reader :name, :namespace, :types, :messages, :md5
@@ -71,7 +72,7 @@ module Avro
# FIXME adding type.name to type_names is not defined in the
# spec. Possible bug in the python impl and the spec.
type_object = Schema.real_parse(type, type_names)
- unless VALID_TYPE_SCHEMA_TYPES.include?(type_object.type)
+ unless VALID_TYPE_SCHEMA_TYPES_SYM.include?(type_object.type_sym)
msg = "Type #{type} not an enum, record, fixed or error."
raise ProtocolParseError, msg
end
@@ -142,7 +143,7 @@ module Avro
unless request.is_a?(Array)
raise ProtocolParseError, "Request property not an Array: #{request.inspect}"
end
- Schema::RecordSchema.new(nil, nil, request, names, 'request')
+ Schema::RecordSchema.new(nil, nil, request, names, :request)
end
def parse_response(response, names)
Modified: avro/trunk/lang/ruby/lib/avro/schema.rb
URL: http://svn.apache.org/viewvc/avro/trunk/lang/ruby/lib/avro/schema.rb?rev=1451689&r1=1451688&r2=1451689&view=diff
==============================================================================
--- avro/trunk/lang/ruby/lib/avro/schema.rb (original)
+++ avro/trunk/lang/ruby/lib/avro/schema.rb Fri Mar 1 19:02:18 2013
@@ -16,12 +16,17 @@
module Avro
class Schema
- # FIXME turn these into symbols to prevent some gc pressure
+ # Sets of strings, for backwards compatibility. See below for sets of symbols,
+ # for better performance.
PRIMITIVE_TYPES = Set.new(%w[null boolean string bytes int long float double])
NAMED_TYPES = Set.new(%w[fixed enum record error])
VALID_TYPES = PRIMITIVE_TYPES + NAMED_TYPES + Set.new(%w[array map union request])
+ PRIMITIVE_TYPES_SYM = Set.new(PRIMITIVE_TYPES.map(&:to_sym))
+ NAMED_TYPES_SYM = Set.new(NAMED_TYPES.map(&:to_sym))
+ VALID_TYPES_SYM = Set.new(VALID_TYPES.map(&:to_sym))
+
INT_MIN_VALUE = -(1 << 31)
INT_MAX_VALUE = (1 << 31) - 1
LONG_MIN_VALUE = -(1 << 63)
@@ -35,38 +40,46 @@ module Avro
def self.real_parse(json_obj, names=nil)
if json_obj.is_a? Hash
type = json_obj['type']
- if PRIMITIVE_TYPES.include?(type)
- return PrimitiveSchema.new(type)
- elsif NAMED_TYPES.include? type
+ raise SchemaParseError, %Q(No "type" property: #{json_obj}) if type.nil?
+
+ # Check that the type is valid before calling #to_sym, since symbols are never garbage
+ # collected (important to avoid DoS if we're accepting schemas from untrusted clients)
+ unless VALID_TYPES.include?(type)
+ raise SchemaParseError, "Unknown type: #{type}"
+ end
+
+ type_sym = type.to_sym
+ if PRIMITIVE_TYPES_SYM.include?(type_sym)
+ return PrimitiveSchema.new(type_sym)
+
+ elsif NAMED_TYPES_SYM.include? type_sym
name = json_obj['name']
namespace = json_obj['namespace']
- case type
- when 'fixed'
+ case type_sym
+ when :fixed
size = json_obj['size']
return FixedSchema.new(name, namespace, size, names)
- when 'enum'
+ when :enum
symbols = json_obj['symbols']
return EnumSchema.new(name, namespace, symbols, names)
- when 'record', 'error'
+ when :record, :error
fields = json_obj['fields']
- return RecordSchema.new(name, namespace, fields, names, type)
+ return RecordSchema.new(name, namespace, fields, names, type_sym)
else
raise SchemaParseError.new("Unknown named type: #{type}")
end
- elsif VALID_TYPES.include?(type)
- case type
- when 'array'
+
+ else
+ case type_sym
+ when :array
return ArraySchema.new(json_obj['items'], names)
- when 'map'
+ when :map
return MapSchema.new(json_obj['values'], names)
else
raise SchemaParseError.new("Unknown Valid Type: #{type}")
end
- elsif type.nil?
- raise SchemaParseError.new("No \"type\" property: #{json_obj}")
- else
- raise SchemaParseError.new("Undefined type: #{type}")
end
+
elsif json_obj.is_a? Array
# JSON array (union)
return UnionSchema.new(json_obj, names)
@@ -80,34 +93,34 @@ module Avro
# Determine if a ruby datum is an instance of a schema
def self.validate(expected_schema, datum)
- case expected_schema.type
- when 'null'
+ case expected_schema.type_sym
+ when :null
datum.nil?
- when 'boolean'
+ when :boolean
datum == true || datum == false
- when 'string', 'bytes'
+ when :string, :bytes
datum.is_a? String
- when 'int'
+ when :int
(datum.is_a?(Fixnum) || datum.is_a?(Bignum)) &&
(INT_MIN_VALUE <= datum) && (datum <= INT_MAX_VALUE)
- when 'long'
+ when :long
(datum.is_a?(Fixnum) || datum.is_a?(Bignum)) &&
(LONG_MIN_VALUE <= datum) && (datum <= LONG_MAX_VALUE)
- when 'float', 'double'
+ when :float, :double
datum.is_a?(Float) || datum.is_a?(Fixnum) || datum.is_a?(Bignum)
- when 'fixed'
+ when :fixed
datum.is_a?(String) && datum.size == expected_schema.size
- when 'enum'
+ when :enum
expected_schema.symbols.include? datum
- when 'array'
+ when :array
datum.is_a?(Array) &&
datum.all?{|d| validate(expected_schema.items, d) }
- when 'map'
+ when :map
datum.keys.all?{|k| k.is_a? String } &&
datum.values.all?{|v| validate(expected_schema.values, v) }
- when 'union'
+ when :union
expected_schema.schemas.any?{|s| validate(s, datum) }
- when 'record', 'error', 'request'
+ when :record, :error, :request
datum.is_a?(Hash) &&
expected_schema.fields.all?{|f| validate(f.type, datum[f.name]) }
else
@@ -116,17 +129,21 @@ module Avro
end
def initialize(type)
- @type = type
+ @type_sym = type.is_a?(Symbol) ? type : type.to_sym
end
- def type; @type; end
+ attr_reader :type_sym
+
+ # Returns the type as a string (rather than a symbol), for backwards compatibility.
+ # Deprecated in favor of {#type_sym}.
+ def type; @type_sym.to_s; end
def ==(other, seen=nil)
- other.is_a?(Schema) && @type == other.type
+ other.is_a?(Schema) && type_sym == other.type_sym
end
def hash(seen=nil)
- @type.hash
+ type_sym.hash
end
def subparse(json_obj, names=nil)
@@ -139,7 +156,7 @@ module Avro
end
def to_avro
- {'type' => @type}
+ {'type' => type}
end
def to_s
@@ -161,7 +178,7 @@ module Avro
end
def fullname
- Name.make_fullname(@name, @namespace)
+ @fullname ||= Name.make_fullname(@name, @namespace)
end
end
@@ -190,9 +207,9 @@ module Avro
field_objects
end
- def initialize(name, namespace, fields, names=nil, schema_type='record')
- if schema_type == 'request'
- @type = schema_type
+ def initialize(name, namespace, fields, names=nil, schema_type=:record)
+ if schema_type == :request || schema_type == 'request'
+ @type_sym = schema_type.to_sym
else
super(schema_type, name, namespace, names)
end
@@ -200,12 +217,12 @@ module Avro
end
def fields_hash
- fields.inject({}){|hsh, field| hsh[field.name] = field; hsh }
+ @fields_hash ||= fields.inject({}){|hsh, field| hsh[field.name] = field; hsh }
end
def to_avro
hsh = super.merge('fields' => @fields.map {|f| f.to_avro } )
- if type == 'request'
+ if type_sym == :request
hsh['fields']
else
hsh
@@ -218,7 +235,7 @@ module Avro
def initialize(items, names=nil)
@items_schema_from_names = false
- super('array')
+ super(:array)
if items.is_a?(String) && names.has_key?(items)
@items = names[items]
@@ -243,7 +260,7 @@ module Avro
def initialize(values, names=nil)
@values_schema_from_names = false
- super('map')
+ super(:map)
if values.is_a?(String) && names.has_key?(values)
values_schema = names[values]
@values_schema_from_names = true
@@ -267,7 +284,7 @@ module Avro
class UnionSchema < Schema
attr_reader :schemas, :schema_from_names_indices
def initialize(schemas, names=nil)
- super('union')
+ super(:union)
schema_objects = []
@schema_from_names_indices = []
@@ -280,12 +297,12 @@ module Avro
new_schema = subparse(schema, names)
end
- ns_type = new_schema.type
- if VALID_TYPES.include?(ns_type) &&
- !NAMED_TYPES.include?(ns_type) &&
- schema_objects.map{|o| o.type }.include?(ns_type)
+ ns_type = new_schema.type_sym
+ if VALID_TYPES_SYM.include?(ns_type) &&
+ !NAMED_TYPES_SYM.include?(ns_type) &&
+ schema_objects.any?{|o| o.type_sym == ns_type }
raise SchemaParseError, "#{ns_type} is already in Union"
- elsif ns_type == 'union'
+ elsif ns_type == :union
raise SchemaParseError, "Unions cannot contain other unions"
else
schema_objects << new_schema
@@ -317,7 +334,7 @@ module Avro
fail_msg = 'Duplicate symbol: %s' % symbols
raise Avro::SchemaParseError, fail_msg
end
- super('enum', name, space, names)
+ super(:enum, name, space, names)
@symbols = symbols
end
@@ -329,11 +346,13 @@ module Avro
# Valid primitive types are in PRIMITIVE_TYPES.
class PrimitiveSchema < Schema
def initialize(type)
- unless PRIMITIVE_TYPES.include? type
+ if PRIMITIVE_TYPES_SYM.include?(type)
+ super(type)
+ elsif PRIMITIVE_TYPES.include?(type)
+ super(type.to_sym)
+ else
raise AvroError.new("#{type} is not a valid primitive type.")
end
-
- super(type)
end
def to_avro
@@ -349,7 +368,7 @@ module Avro
unless size.is_a?(Fixnum) || size.is_a?(Bignum)
raise AvroError, 'Fixed Schema requires a valid integer for size property.'
end
- super('fixed', name, space, names)
+ super(:fixed, name, space, names)
@size = size
end
Modified: avro/trunk/lang/ruby/test/random_data.rb
URL: http://svn.apache.org/viewvc/avro/trunk/lang/ruby/test/random_data.rb?rev=1451689&r1=1451688&r2=1451689&view=diff
==============================================================================
--- avro/trunk/lang/ruby/test/random_data.rb (original)
+++ avro/trunk/lang/ruby/test/random_data.rb Fri Mar 1 19:02:18 2013
@@ -27,52 +27,52 @@ class RandomData
end
def nextdata(schm, d=0)
- case schm.type
- when 'boolean'
+ case schm.type_sym
+ when :boolean
rand > 0.5
- when 'string'
+ when :string
randstr()
- when 'int'
+ when :int
rand(Avro::Schema::INT_MAX_VALUE - Avro::Schema::INT_MIN_VALUE) + Avro::Schema::INT_MIN_VALUE
- when 'long'
+ when :long
rand(Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) + Avro::Schema::LONG_MIN_VALUE
- when 'float'
+ when :float
(-1024 + 2048 * rand).round.to_f
- when 'double'
+ when :double
Avro::Schema::LONG_MIN_VALUE + (Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) * rand
- when 'bytes'
+ when :bytes
randstr(BYTEPOOL)
- when 'null'
+ when :null
nil
- when 'array'
+ when :array
arr = []
len = rand(5) + 2 - d
len = 0 if len < 0
len.times{ arr << nextdata(schm.items, d+1) }
arr
- when 'map'
+ when :map
map = {}
len = rand(5) + 2 - d
len = 0 if len < 0
len.times do
- map[nextdata(Avro::Schema::PrimitiveSchema.new('string'))] = nextdata(schm.values, d+1)
+ map[nextdata(Avro::Schema::PrimitiveSchema.new(:string))] = nextdata(schm.values, d+1)
end
map
- when 'record', 'error'
+ when :record, :error
m = {}
schm.fields.each do |field|
m[field.name] = nextdata(field.type, d+1)
end
m
- when 'union'
+ when :union
types = schm.schemas
nextdata(types[rand(types.size)], d)
- when 'enum'
+ when :enum
symbols = schm.symbols
len = symbols.size
return nil if len == 0
symbols[rand(len)]
- when 'fixed'
+ when :fixed
f = ""
schm.size.times { f << BYTEPOOL[rand(BYTEPOOL.size), 1] }
f