You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by tj...@apache.org on 2020/05/30 13:04:08 UTC
[avro] branch master updated: AVRO-2545: Add Ruby support for
aliases (#636)
This is an automated email from the ASF dual-hosted git repository.
tjwp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new 5287c79 AVRO-2545: Add Ruby support for aliases (#636)
5287c79 is described below
commit 5287c794d17ec0aefbb4b964a0c01166333c8649
Author: Tim Perkins <tj...@users.noreply.github.com>
AuthorDate: Sat May 30 09:03:59 2020 -0400
AVRO-2545: Add Ruby support for aliases (#636)
---
lang/ruby/lib/avro/io.rb | 26 ++++----
lang/ruby/lib/avro/schema.rb | 83 +++++++++++++++++++------
lang/ruby/lib/avro/schema_compatibility.rb | 23 ++++---
lang/ruby/test/test_io.rb | 16 +++++
lang/ruby/test/test_schema.rb | 94 +++++++++++++++++++++++++++++
lang/ruby/test/test_schema_compatibility.rb | 63 +++++++++++++++++++
6 files changed, 267 insertions(+), 38 deletions(-)
diff --git a/lang/ruby/lib/avro/io.rb b/lang/ruby/lib/avro/io.rb
index 48118f6..fbf7939 100644
--- a/lang/ruby/lib/avro/io.rb
+++ b/lang/ruby/lib/avro/io.rb
@@ -359,26 +359,28 @@ module Avro
readers_fields_hash = readers_schema.fields_hash
read_record = {}
writers_schema.fields.each do |field|
- if (readers_field = readers_fields_hash[field.name])
+ readers_field = readers_fields_hash[field.name]
+ if readers_field
field_val = read_data(field.type, readers_field.type, decoder)
read_record[field.name] = field_val
+ elsif readers_schema.fields_by_alias.key?(field.name)
+ readers_field = readers_schema.fields_by_alias[field.name]
+ field_val = read_data(field.type, readers_field.type, decoder)
+ read_record[readers_field.name] = field_val
else
skip_data(field.type, decoder)
end
end
# fill in the default values
- if readers_fields_hash.size > read_record.size
- writers_fields_hash = writers_schema.fields_hash
- readers_fields_hash.each do |field_name, field|
- unless writers_fields_hash.has_key? field_name
- if field.default?
- field_val = read_default_value(field.type, field.default)
- read_record[field.name] = field_val
- else
- raise AvroError, "Missing data for #{field.type} with no default"
- end
- end
+ readers_fields_hash.each do |field_name, field|
+ next if read_record.key?(field_name)
+
+ if field.default?
+ field_val = read_default_value(field.type, field.default)
+ read_record[field.name] = field_val
+ else
+ raise AvroError, "Missing data for #{field.type} with no default"
end
end
diff --git a/lang/ruby/lib/avro/schema.rb b/lang/ruby/lib/avro/schema.rb
index 454c723..ed90323 100644
--- a/lang/ruby/lib/avro/schema.rb
+++ b/lang/ruby/lib/avro/schema.rb
@@ -69,19 +69,20 @@ module Avro
raise SchemaParseError, "Name #{name} is invalid for type #{type}!"
end
namespace = json_obj.include?('namespace') ? json_obj['namespace'] : default_namespace
+ aliases = json_obj['aliases']
case type_sym
when :fixed
size = json_obj['size']
- return FixedSchema.new(name, namespace, size, names, logical_type)
+ return FixedSchema.new(name, namespace, size, names, logical_type, aliases)
when :enum
symbols = json_obj['symbols']
doc = json_obj['doc']
default = json_obj['default']
- return EnumSchema.new(name, namespace, symbols, names, doc, default)
+ return EnumSchema.new(name, namespace, symbols, names, doc, default, aliases)
when :record, :error
fields = json_obj['fields']
doc = json_obj['doc']
- return RecordSchema.new(name, namespace, fields, names, type_sym, doc)
+ return RecordSchema.new(name, namespace, fields, names, type_sym, doc, aliases)
else
raise SchemaParseError.new("Unknown named type: #{type}")
end
@@ -230,13 +231,25 @@ module Avro
MultiJson.dump to_avro
end
+ def validate_aliases!
+ unless aliases.nil? ||
+ (aliases.is_a?(Array) && aliases.all? { |a| a.is_a?(String) })
+
+ raise Avro::SchemaParseError,
+ "Invalid aliases value #{aliases.inspect} for #{type} #{name}. Must be an array of strings."
+ end
+ end
+ private :validate_aliases!
+
class NamedSchema < Schema
- attr_reader :name, :namespace
+ attr_reader :name, :namespace, :aliases
- def initialize(type, name, namespace=nil, names=nil, doc=nil, logical_type=nil)
+ def initialize(type, name, namespace=nil, names=nil, doc=nil, logical_type=nil, aliases=nil)
super(type, logical_type)
@name, @namespace = Name.extract_namespace(name, namespace)
- @doc = doc
+ @doc = doc
+ @aliases = aliases
+ validate_aliases! if aliases
Name.add_name(names, self)
end
@@ -247,20 +260,34 @@ module Avro
end
props = {'name' => @name}
props.merge!('namespace' => @namespace) if @namespace
- props.merge!('doc' => @doc) if @doc
+ props['namespace'] = @namespace if @namespace
+ props['doc'] = @doc if @doc
+ props['aliases'] = aliases if aliases && aliases.any?
super.merge props
end
def fullname
@fullname ||= Name.make_fullname(@name, @namespace)
end
+
+ def fullname_aliases
+ @fullname_aliases ||= if aliases
+ aliases.map { |a| Name.make_fullname(a, namespace) }
+ else
+ []
+ end
+ end
+
+ def match_fullname?(name)
+ name == fullname || fullname_aliases.include?(name)
+ end
end
class RecordSchema < NamedSchema
attr_reader :fields, :doc
def self.make_field_objects(field_data, names, namespace=nil)
- field_objects, field_names = [], Set.new
+ field_objects, field_names, alias_names = [], Set.new, Set.new
field_data.each do |field|
if field.respond_to?(:[]) # TODO(jmhodges) wtffffff
type = field['type']
@@ -268,12 +295,18 @@ module Avro
default = field.key?('default') ? field['default'] : :no_default
order = field['order']
doc = field['doc']
- new_field = Field.new(type, name, default, order, names, namespace, doc)
+ aliases = field['aliases']
+ new_field = Field.new(type, name, default, order, names, namespace, doc, aliases)
# make sure field name has not been used yet
if field_names.include?(new_field.name)
raise SchemaParseError, "Field name #{new_field.name.inspect} is already in use"
end
field_names << new_field.name
+ # make sure alias has not be been used yet
+ if new_field.aliases && alias_names.intersect?(new_field.aliases.to_set)
+ raise SchemaParseError, "Alias #{(alias_names & new_field.aliases).to_a} already in use"
+ end
+ alias_names.merge(new_field.aliases) if new_field.aliases
else
raise SchemaParseError, "Not a valid field: #{field}"
end
@@ -282,14 +315,14 @@ module Avro
field_objects
end
- def initialize(name, namespace, fields, names=nil, schema_type=:record, doc=nil)
+ def initialize(name, namespace, fields, names=nil, schema_type=:record, doc=nil, aliases=nil)
if schema_type == :request || schema_type == 'request'
@type_sym = schema_type.to_sym
@namespace = namespace
@name = nil
@doc = nil
else
- super(schema_type, name, namespace, names, doc)
+ super(schema_type, name, namespace, names, doc, nil, aliases)
end
@fields = if fields
RecordSchema.make_field_objects(fields, names, self.namespace)
@@ -302,6 +335,16 @@ module Avro
@fields_hash ||= fields.inject({}){|hsh, field| hsh[field.name] = field; hsh }
end
+ def fields_by_alias
+ @fields_by_alias ||= fields.each_with_object({}) do |field, hash|
+ if field.aliases
+ field.aliases.each do |a|
+ hash[a] = field
+ end
+ end
+ end
+ end
+
def to_avro(names=Set.new)
hsh = super
return hsh unless hsh.is_a?(Hash)
@@ -372,7 +415,7 @@ module Avro
attr_reader :symbols, :doc, :default
- def initialize(name, space, symbols, names=nil, doc=nil, default=nil)
+ def initialize(name, space, symbols, names=nil, doc=nil, default=nil, aliases=nil)
if symbols.uniq.length < symbols.length
fail_msg = "Duplicate symbol: #{symbols}"
raise Avro::SchemaParseError, fail_msg
@@ -391,7 +434,7 @@ module Avro
raise Avro::SchemaParseError, "Default '#{default}' is not a valid symbol for enum #{name}"
end
- super(:enum, name, space, names, doc)
+ super(:enum, name, space, names, doc, nil, aliases)
@default = default
@symbols = symbols
end
@@ -444,12 +487,12 @@ module Avro
class FixedSchema < NamedSchema
attr_reader :size
- def initialize(name, space, size, names=nil, logical_type=nil)
+ def initialize(name, space, size, names=nil, logical_type=nil, aliases=nil)
# Ensure valid cto args
unless size.is_a?(Integer)
raise AvroError, 'Fixed Schema requires a valid integer for size property.'
end
- super(:fixed, name, space, names, nil, logical_type)
+ super(:fixed, name, space, names, nil, logical_type, aliases)
@size = size
end
@@ -460,14 +503,16 @@ module Avro
end
class Field < Schema
- attr_reader :type, :name, :default, :order, :doc
+ attr_reader :type, :name, :default, :order, :doc, :aliases
- def initialize(type, name, default=:no_default, order=nil, names=nil, namespace=nil, doc=nil)
+ def initialize(type, name, default=:no_default, order=nil, names=nil, namespace=nil, doc=nil, aliases=nil)
@type = subparse(type, names, namespace)
@name = name
@default = default
@order = order
@doc = doc
+ @aliases = aliases
+ validate_aliases! if aliases
validate_default! if default? && !Avro.disable_field_default_validation
end
@@ -483,6 +528,10 @@ module Avro
end
end
+ def alias_names
+ @alias_names ||= Array(aliases)
+ end
+
private
def validate_default!
diff --git a/lang/ruby/lib/avro/schema_compatibility.rb b/lang/ruby/lib/avro/schema_compatibility.rb
index a4b763d..1d5b24e 100644
--- a/lang/ruby/lib/avro/schema_compatibility.rb
+++ b/lang/ruby/lib/avro/schema_compatibility.rb
@@ -28,10 +28,8 @@ module Avro
end
# Perform a basic check that a datum written with the writers_schema could
- # be read using the readers_schema. This check only includes matching the types,
- # including schema promotion, and matching the full name for named types.
- # Aliases for named types are not supported here, and the ruby implementation
- # of Avro in general does not include support for aliases.
+ # be read using the readers_schema. This check includes matching the types,
+ # including schema promotion, and matching the full name (including aliases) for named types.
def self.match_schemas(writers_schema, readers_schema)
w_type = writers_schema.type_sym
r_type = readers_schema.type_sym
@@ -46,16 +44,16 @@ module Avro
case r_type
when :record
- return writers_schema.fullname == readers_schema.fullname
+ return readers_schema.match_fullname?(writers_schema.fullname)
when :error
- return writers_schema.fullname == readers_schema.fullname
+ return readers_schema.match_fullname?(writers_schema.fullname)
when :request
return true
when :fixed
- return writers_schema.fullname == readers_schema.fullname &&
+ return readers_schema.match_fullname?(writers_schema.fullname) &&
writers_schema.size == readers_schema.size
when :enum
- return writers_schema.fullname == readers_schema.fullname
+ return readers_schema.match_fullname?(writers_schema.fullname)
when :map
return match_schemas(writers_schema.values, readers_schema.values)
when :array
@@ -148,7 +146,14 @@ module Avro
if writer_fields_hash.key?(field.name)
return false unless full_match_schemas(writer_fields_hash[field.name].type, field.type)
else
- return false unless field.default?
+ names = writer_fields_hash.keys & field.alias_names
+ if names.size > 1
+ return false
+ elsif names.size == 1
+ return false unless full_match_schemas(writer_fields_hash[names.first].type, field.type)
+ else
+ return false unless field.default?
+ end
end
end
diff --git a/lang/ruby/test/test_io.rb b/lang/ruby/test/test_io.rb
index e364421..fcc3f97 100644
--- a/lang/ruby/test/test_io.rb
+++ b/lang/ruby/test/test_io.rb
@@ -473,6 +473,22 @@ EOS
assert_equal(datum_read, datum_to_write)
end
+ def test_aliased
+ writers_schema = Avro::Schema.parse(<<-SCHEMA)
+ {"type":"record", "name":"Rec1", "fields":[
+ {"name":"field1", "type":"int"}
+ ]}
+ SCHEMA
+ readers_schema = Avro::Schema.parse(<<-SCHEMA)
+ {"type":"record", "name":"Rec2", "aliases":["Rec1"], "fields":[
+ {"name":"field2", "aliases":["field1"], "type":"int"}
+ ]}
+ SCHEMA
+ writer, * = write_datum({ 'field1' => 1 }, writers_schema)
+ datum_read = read_datum(writer, writers_schema, readers_schema)
+ assert_equal(datum_read, { 'field2' => 1 })
+ end
+
def test_snappy_backward_compat
# a snappy-compressed block payload without the checksum
# this has no back-references, just one literal so the last 9
diff --git a/lang/ruby/test/test_schema.rb b/lang/ruby/test/test_schema.rb
index b52ed5c..b54bced 100644
--- a/lang/ruby/test/test_schema.rb
+++ b/lang/ruby/test/test_schema.rb
@@ -151,6 +151,20 @@ class TestSchema < Test::Unit::TestCase
}
end
+ def test_to_avro_includes_aliases
+ hash = {
+ 'type' => 'record',
+ 'name' => 'test_record',
+ 'aliases' => %w(alt_record),
+ 'fields' => [
+ { 'name' => 'f', 'type' => { 'type' => 'fixed', 'size' => 2, 'name' => 'test_fixed', 'aliases' => %w(alt_fixed) } },
+ { 'name' => 'e', 'type' => { 'type' => 'enum', 'symbols' => %w(A B), 'name' => 'test_enum', 'aliases' => %w(alt_enum) } }
+ ]
+ }
+ schema = hash_to_schema(hash)
+ assert_equal(schema.to_avro, hash)
+ end
+
def test_unknown_named_type
error = assert_raise Avro::UnknownSchemaError do
Avro::Schema.parse <<-SCHEMA
@@ -630,4 +644,84 @@ class TestSchema < Test::Unit::TestCase
ensure
Avro.disable_enum_symbol_validation = nil
end
+
+ def test_validate_field_aliases
+ exception = assert_raise(Avro::SchemaParseError) do
+ hash_to_schema(
+ type: 'record',
+ name: 'fruits',
+ fields: [
+ { name: 'banana', type: 'string', aliases: 'banane' }
+ ]
+ )
+ end
+
+ assert_match(/Invalid aliases value "banane" for "string" banana/, exception.to_s)
+ end
+
+ def test_validate_same_alias_multiple_fields
+ exception = assert_raise(Avro::SchemaParseError) do
+ hash_to_schema(
+ type: 'record',
+ name: 'fruits',
+ fields: [
+ { name: 'banana', type: 'string', aliases: %w(yellow) },
+ { name: 'lemo', type: 'string', aliases: %w(yellow) }
+ ]
+ )
+ end
+
+ assert_match('Alias ["yellow"] already in use', exception.to_s)
+ end
+
+ def test_validate_repeated_aliases
+ assert_nothing_raised do
+ hash_to_schema(
+ type: 'record',
+ name: 'fruits',
+ fields: [
+ { name: 'banana', type: 'string', aliases: %w(yellow yellow) },
+ ]
+ )
+ end
+ end
+
+ def test_validate_record_aliases
+ exception = assert_raise(Avro::SchemaParseError) do
+ hash_to_schema(
+ type: 'record',
+ name: 'fruits',
+ aliases: ["foods", 2],
+ fields: []
+ )
+ end
+
+ assert_match(/Invalid aliases value \["foods", 2\] for record fruits/, exception.to_s)
+ end
+
+ def test_validate_enum_aliases
+ exception = assert_raise(Avro::SchemaParseError) do
+ hash_to_schema(
+ type: 'enum',
+ name: 'vowels',
+ aliases: [1, 2],
+ symbols: %w(A E I O U)
+ )
+ end
+
+ assert_match(/Invalid aliases value \[1, 2\] for enum vowels/, exception.to_s)
+ end
+
+ def test_validate_fixed_aliases
+ exception = assert_raise(Avro::SchemaParseError) do
+ hash_to_schema(
+ type: 'fixed',
+ name: 'uuid',
+ size: 36,
+ aliases: "unique_id"
+ )
+ end
+
+ assert_match(/Invalid aliases value "unique_id" for fixed uuid/, exception.to_s)
+ end
end
diff --git a/lang/ruby/test/test_schema_compatibility.rb b/lang/ruby/test/test_schema_compatibility.rb
index e5134e5..926c18d 100644
--- a/lang/ruby/test/test_schema_compatibility.rb
+++ b/lang/ruby/test/test_schema_compatibility.rb
@@ -39,9 +39,13 @@ class TestSchemaCompatibility < Test::Unit::TestCase
long_map_schema, int_map_schema,
enum1_ab_schema, enum1_ab_schema,
+ enum1_ab_aliased_schema, enum1_ab_schema,
enum1_abc_schema, enum1_ab_schema,
enum1_ab_default_schema, enum1_abc_schema,
+ fixed1_schema, fixed1_schema,
+ fixed1_aliased_schema, fixed1_schema,
+
string_schema, bytes_schema,
bytes_schema, string_schema,
@@ -56,6 +60,7 @@ class TestSchemaCompatibility < Test::Unit::TestCase
empty_record1_schema, empty_record1_schema,
empty_record1_schema, a_int_record1_schema,
+ empty_record1_aliased_schema, empty_record1_schema,
a_int_record1_schema, a_int_record1_schema,
a_dint_record1_schema, a_int_record1_schema,
@@ -118,16 +123,22 @@ class TestSchemaCompatibility < Test::Unit::TestCase
int_map_schema, long_map_schema,
enum1_ab_schema, enum1_abc_schema,
+ enum1_ab_schema, enum1_ab_aliased_schema,
enum1_bc_schema, enum1_abc_schema,
enum1_ab_schema, enum2_ab_schema,
int_schema, enum2_ab_schema,
enum2_ab_schema, int_schema,
+ fixed1_schema, fixed2_schema,
+ fixed1_schema, fixed1_size3_schema,
+ fixed1_schema, fixed1_aliased_schema,
+
int_union_schema, int_string_union_schema,
string_union_schema, int_string_union_schema,
empty_record2_schema, empty_record1_schema,
+ empty_record1_schema, empty_record1_aliased_schema,
a_int_record1_schema, empty_record1_schema,
a_int_b_dint_record1_schema, empty_record1_schema,
@@ -170,6 +181,17 @@ class TestSchemaCompatibility < Test::Unit::TestCase
assert_false(can_read?(reader_schema, writer_schema))
end
+ def test_aliased_field
+ reader_schema = Avro::Schema.parse(<<-SCHEMA)
+ {"type":"record", "name":"Record", "fields":[
+ {"name":"newname1", "aliases":["oldfield1"], "type":"int"},
+ {"name":"oldfield2", "type":"string"}
+ ]}
+ SCHEMA
+ assert_true(can_read?(writer_schema, reader_schema))
+ assert_false(can_read?(reader_schema, writer_schema))
+ end
+
def test_all_fields
reader_schema = Avro::Schema.parse <<-SCHEMA
{"type":"record", "name":"Record", "fields":[
@@ -251,6 +273,23 @@ class TestSchemaCompatibility < Test::Unit::TestCase
assert_true(can_read?(enum_schema1, enum_schema2))
end
+ def test_crossed_aliases
+ writer_schema = Avro::Schema.parse(<<-SCHEMA)
+ {"type":"record", "name":"Record", "fields":[
+ {"name":"field1", "type": "int"},
+ {"name":"field2", "type": "string"}
+ ]}
+ SCHEMA
+ reader_schema = Avro::Schema.parse(<<-SCHEMA)
+ {"type":"record", "name":"Record", "fields":[
+ {"name":"field1", "aliases":["field2"], "type":"string"},
+ {"name":"field2", "aliases":["field1"], "type":"int"}
+ ]}
+ SCHEMA
+ # Not supported; alias is not used if there is a redirect match
+ assert_false(can_read?(writer_schema, reader_schema))
+ end
+
# Tests from lang/java/avro/src/test/java/org/apache/avro/io/parsing/TestResolvingGrammarGenerator2.java
def point_2d_schema
@@ -378,6 +417,10 @@ class TestSchemaCompatibility < Test::Unit::TestCase
Avro::Schema.parse('{"type":"enum", "name":"Enum1", "symbols":["A","B"], "default":"A"}')
end
+ def enum1_ab_aliased_schema
+ Avro::Schema.parse('{"type":"enum", "name":"Enum2", "aliases":["Enum1"], "symbols":["A","B"]}')
+ end
+
def enum1_abc_schema
Avro::Schema.parse('{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}')
end
@@ -390,10 +433,30 @@ class TestSchemaCompatibility < Test::Unit::TestCase
Avro::Schema.parse('{"type":"enum", "name":"Enum2", "symbols":["A","B"]}')
end
+ def fixed1_schema
+ Avro::Schema.parse('{"type":"fixed", "name":"Fixed1", "size": 2}')
+ end
+
+ def fixed1_aliased_schema
+ Avro::Schema.parse('{"type":"fixed", "name":"Fixed2", "aliases":["Fixed1"], "size": 2}')
+ end
+
+ def fixed2_schema
+ Avro::Schema.parse('{"type":"fixed", "name":"Fixed2", "size": 2}')
+ end
+
+ def fixed1_size3_schema
+ Avro::Schema.parse('{"type":"fixed", "name":"Fixed1", "size": 3}')
+ end
+
def empty_record1_schema
Avro::Schema.parse('{"type":"record", "name":"Record1"}')
end
+ def empty_record1_aliased_schema
+ Avro::Schema.parse('{"type":"record", "name":"Record2", "aliases":["Record1"]}')
+ end
+
def empty_record2_schema
Avro::Schema.parse('{"type":"record", "name":"Record2"}')
end