You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by fo...@apache.org on 2018/11/07 21:50:15 UTC
[avro] branch master updated: AVRO-1695: Ruby support for logical
types revisited (#116)
This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new 33f7177 AVRO-1695: Ruby support for logical types revisited (#116)
33f7177 is described below
commit 33f7177e66e04c4843676e3d7a9cf739ba812304
Author: Tim Perkins <tj...@users.noreply.github.com>
AuthorDate: Wed Nov 7 16:50:10 2018 -0500
AVRO-1695: Ruby support for logical types revisited (#116)
* AVRO-1695: Ruby support for logical types
* Support values for logical types that were already encoded
---
lang/ruby/Manifest | 2 +
lang/ruby/lib/avro/io.rb | 10 ++-
lang/ruby/lib/avro/logical_types.rb | 90 +++++++++++++++++++++++++
lang/ruby/lib/avro/schema.rb | 38 +++++++----
lang/ruby/lib/avro/schema_validator.rb | 12 +++-
lang/ruby/test/random_data.rb | 23 ++++++-
lang/ruby/test/test_io.rb | 14 +++-
lang/ruby/test/test_logical_types.rb | 120 +++++++++++++++++++++++++++++++++
lang/ruby/test/test_schema.rb | 15 +++++
9 files changed, 302 insertions(+), 22 deletions(-)
diff --git a/lang/ruby/Manifest b/lang/ruby/Manifest
index 87bfd98..9fc48c2 100644
--- a/lang/ruby/Manifest
+++ b/lang/ruby/Manifest
@@ -9,6 +9,7 @@ lib/avro.rb
lib/avro/data_file.rb
lib/avro/io.rb
lib/avro/ipc.rb
+lib/avro/logical_types.rb
lib/avro/protocol.rb
lib/avro/schema.rb
lib/avro/schema_compatibility.rb
@@ -24,6 +25,7 @@ test/test_datafile.rb
test/test_fingerprints.rb
test/test_help.rb
test/test_io.rb
+test/test_logical_types.rb
test/test_protocol.rb
test/test_schema.rb
test/test_schema_compatibility.rb
diff --git a/lang/ruby/lib/avro/io.rb b/lang/ruby/lib/avro/io.rb
index b04a19a..26bda97 100644
--- a/lang/ruby/lib/avro/io.rb
+++ b/lang/ruby/lib/avro/io.rb
@@ -254,7 +254,7 @@ module Avro
# function dispatch for reading data based on type of writer's
# schema
- case writers_schema.type_sym
+ datum = case writers_schema.type_sym
when :null; decoder.read_null
when :boolean; decoder.read_boolean
when :string; decoder.read_string
@@ -272,6 +272,8 @@ module Avro
else
raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}"
end
+
+ readers_schema.type_adapter.decode(datum)
end
def read_fixed(writers_schema, readers_schema, decoder)
@@ -499,8 +501,10 @@ module Avro
write_data(writers_schema, datum, encoder)
end
- def write_data(writers_schema, datum, encoder)
- unless Schema.validate(writers_schema, datum)
+ def write_data(writers_schema, logical_datum, encoder)
+ datum = writers_schema.type_adapter.encode(logical_datum)
+
+ unless Schema.validate(writers_schema, datum, encoded = true)
raise AvroTypeError.new(writers_schema, datum)
end
diff --git a/lang/ruby/lib/avro/logical_types.rb b/lang/ruby/lib/avro/logical_types.rb
new file mode 100644
index 0000000..e1b219d
--- /dev/null
+++ b/lang/ruby/lib/avro/logical_types.rb
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'date'
+
+module Avro
+ module LogicalTypes
+ module IntDate
+ EPOCH_START = Date.new(1970, 1, 1)
+
+ def self.encode(date)
+ return date.to_i if date.is_a?(Numeric)
+
+ (date - EPOCH_START).to_i
+ end
+
+ def self.decode(int)
+ EPOCH_START + int
+ end
+ end
+
+ module TimestampMillis
+ def self.encode(value)
+ return value.to_i if value.is_a?(Numeric)
+
+ time = value.to_time
+ time.to_i * 1000 + time.usec / 1000
+ end
+
+ def self.decode(int)
+ s, ms = int / 1000, int % 1000
+ Time.at(s, ms * 1000).utc
+ end
+ end
+
+ module TimestampMicros
+ def self.encode(value)
+ return value.to_i if value.is_a?(Numeric)
+
+ time = value.to_time
+ time.to_i * 1000_000 + time.usec
+ end
+
+ def self.decode(int)
+ s, us = int / 1000_000, int % 1000_000
+ Time.at(s, us).utc
+ end
+ end
+
+ module Identity
+ def self.encode(datum)
+ datum
+ end
+
+ def self.decode(datum)
+ datum
+ end
+ end
+
+ TYPES = {
+ "int" => {
+ "date" => IntDate
+ },
+ "long" => {
+ "timestamp-millis" => TimestampMillis,
+ "timestamp-micros" => TimestampMicros
+ },
+ }.freeze
+
+ def self.type_adapter(type, logical_type)
+ return unless logical_type
+
+ TYPES.fetch(type, {}.freeze).fetch(logical_type, Identity)
+ end
+ end
+end
diff --git a/lang/ruby/lib/avro/schema.rb b/lang/ruby/lib/avro/schema.rb
index 024d562..3acd07b 100644
--- a/lang/ruby/lib/avro/schema.rb
+++ b/lang/ruby/lib/avro/schema.rb
@@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+require 'avro/logical_types'
+
module Avro
class Schema
# Sets of strings, for backwards compatibility. See below for sets of symbols,
@@ -40,6 +42,7 @@ module Avro
def self.real_parse(json_obj, names=nil, default_namespace=nil)
if json_obj.is_a? Hash
type = json_obj['type']
+ logical_type = json_obj['logicalType']
raise SchemaParseError, %Q(No "type" property: #{json_obj}) if type.nil?
# Check that the type is valid before calling #to_sym, since symbols are never garbage
@@ -50,7 +53,7 @@ module Avro
type_sym = type.to_sym
if PRIMITIVE_TYPES_SYM.include?(type_sym)
- return PrimitiveSchema.new(type_sym)
+ return PrimitiveSchema.new(type_sym, logical_type)
elsif NAMED_TYPES_SYM.include? type_sym
name = json_obj['name']
@@ -58,7 +61,7 @@ module Avro
case type_sym
when :fixed
size = json_obj['size']
- return FixedSchema.new(name, namespace, size, names)
+ return FixedSchema.new(name, namespace, size, names, logical_type)
when :enum
symbols = json_obj['symbols']
doc = json_obj['doc']
@@ -93,23 +96,29 @@ module Avro
end
# Determine if a ruby datum is an instance of a schema
- def self.validate(expected_schema, datum)
- SchemaValidator.validate!(expected_schema, datum)
+ def self.validate(expected_schema, logical_datum, encoded = false)
+ SchemaValidator.validate!(expected_schema, logical_datum, encoded)
true
rescue SchemaValidator::ValidationError
false
end
- def initialize(type)
+ def initialize(type, logical_type=nil)
@type_sym = type.is_a?(Symbol) ? type : type.to_sym
+ @logical_type = logical_type
end
attr_reader :type_sym
+ attr_reader :logical_type
# Returns the type as a string (rather than a symbol), for backwards compatibility.
# Deprecated in favor of {#type_sym}.
def type; @type_sym.to_s; end
+ def type_adapter
+ @type_adapter ||= LogicalTypes.type_adapter(type, logical_type) || LogicalTypes::Identity
+ end
+
# Returns the MD5 fingerprint of the schema as an Integer.
def md5_fingerprint
parsing_form = SchemaNormalization.to_parsing_form(self)
@@ -157,7 +166,9 @@ module Avro
end
def to_avro(names=nil)
- {'type' => type}
+ props = {'type' => type}
+ props['logicalType'] = logical_type if logical_type
+ props
end
def to_s
@@ -166,8 +177,9 @@ module Avro
class NamedSchema < Schema
attr_reader :name, :namespace
- def initialize(type, name, namespace=nil, names=nil, doc=nil)
- super(type)
+
+ def initialize(type, name, namespace=nil, names=nil, doc=nil, logical_type=nil)
+ super(type, logical_type)
@name, @namespace = Name.extract_namespace(name, namespace)
@doc = doc
names = Name.add_name(names, self)
@@ -318,11 +330,11 @@ module Avro
# Valid primitive types are in PRIMITIVE_TYPES.
class PrimitiveSchema < Schema
- def initialize(type)
+ def initialize(type, logical_type=nil)
if PRIMITIVE_TYPES_SYM.include?(type)
- super(type)
+ super(type, logical_type)
elsif PRIMITIVE_TYPES.include?(type)
- super(type.to_sym)
+ super(type.to_sym, logical_type)
else
raise AvroError.new("#{type} is not a valid primitive type.")
end
@@ -336,12 +348,12 @@ module Avro
class FixedSchema < NamedSchema
attr_reader :size
- def initialize(name, space, size, names=nil)
+ def initialize(name, space, size, names=nil, logical_type=nil)
# Ensure valid cto args
unless size.is_a?(Integer)
raise AvroError, 'Fixed Schema requires a valid integer for size property.'
end
- super(:fixed, name, space, names)
+ super(:fixed, name, space, names, logical_type)
@size = size
end
diff --git a/lang/ruby/lib/avro/schema_validator.rb b/lang/ruby/lib/avro/schema_validator.rb
index 89b0a9c..67464fb 100644
--- a/lang/ruby/lib/avro/schema_validator.rb
+++ b/lang/ruby/lib/avro/schema_validator.rb
@@ -62,16 +62,22 @@ module Avro
TypeMismatchError = Class.new(ValidationError)
class << self
- def validate!(expected_schema, datum)
+ def validate!(expected_schema, logical_datum, encoded = false)
result = Result.new
- validate_recursive(expected_schema, datum, ROOT_IDENTIFIER, result)
+ validate_recursive(expected_schema, logical_datum, ROOT_IDENTIFIER, result, encoded)
fail ValidationError, result if result.failure?
result
end
private
- def validate_recursive(expected_schema, datum, path, result)
+ def validate_recursive(expected_schema, logical_datum, path, result, encoded = false)
+ datum = if encoded
+ logical_datum
+ else
+ expected_schema.type_adapter.encode(logical_datum) rescue nil
+ end
+
case expected_schema.type_sym
when :null
fail TypeMismatchError unless datum.nil?
diff --git a/lang/ruby/test/random_data.rb b/lang/ruby/test/random_data.rb
index 9d276f7..54fa878 100644
--- a/lang/ruby/test/random_data.rb
+++ b/lang/ruby/test/random_data.rb
@@ -27,15 +27,17 @@ class RandomData
end
def nextdata(schm, d=0)
+ return logical_nextdata(schm, d=0) unless schm.type_adapter.eql?(Avro::LogicalTypes::Identity)
+
case schm.type_sym
when :boolean
rand > 0.5
when :string
randstr()
when :int
- rand(Avro::Schema::INT_MAX_VALUE - Avro::Schema::INT_MIN_VALUE) + Avro::Schema::INT_MIN_VALUE
+ rand_int
when :long
- rand(Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) + Avro::Schema::LONG_MIN_VALUE
+ rand_long
when :float
(-1024 + 2048 * rand).round.to_f
when :double
@@ -79,6 +81,15 @@ class RandomData
end
end
+ def logical_nextdata(schm, _d=0)
+ case schm.logical_type
+ when 'date'
+ Avro::LogicalTypes::IntDate.decode(rand_int)
+ when 'timestamp-millis', 'timestamp-micros'
+ Avro::LogicalTypes::TimestampMicros.decode(rand_long)
+ end
+ end
+
CHARPOOL = 'abcdefghjkmnpqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ23456789'
BYTEPOOL = '12345abcd'
@@ -87,4 +98,12 @@ class RandomData
rand(length+1).times { str << chars[rand(chars.size)] }
str
end
+
+ def rand_int
+ rand(Avro::Schema::INT_MAX_VALUE - Avro::Schema::INT_MIN_VALUE) + Avro::Schema::INT_MIN_VALUE
+ end
+
+ def rand_long
+ rand(Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) + Avro::Schema::LONG_MIN_VALUE
+ end
end
diff --git a/lang/ruby/test/test_io.rb b/lang/ruby/test/test_io.rb
index fc0088b..70bb4d6 100644
--- a/lang/ruby/test/test_io.rb
+++ b/lang/ruby/test/test_io.rb
@@ -84,6 +84,17 @@ EOS
check_default(record_schema, '{"f": 11}', {"f" => 11})
end
+ def test_record_with_logical_type
+ record_schema = <<EOS
+ {"type": "record",
+ "name": "Test",
+ "fields": [{"name": "ts",
+ "type": {"type": "long",
+ "logicalType": "timestamp-micros"}}]}
+EOS
+ check(record_schema)
+ end
+
def test_error
error_schema = <<EOS
{"type": "error",
@@ -115,6 +126,7 @@ EOS
def test_union
union_schema = <<EOS
["string",
+ {"type": "int", "logicalType": "date"},
"null",
"long",
{"type": "record",
@@ -451,7 +463,7 @@ EOS
def checkser(schm, randomdata)
datum = randomdata.next
- assert validate(schm, datum)
+ assert validate(schm, datum), 'datum is not valid for schema'
w = Avro::IO::DatumWriter.new(schm)
writer = StringIO.new "", "w"
w.write(datum, Avro::IO::BinaryEncoder.new(writer))
diff --git a/lang/ruby/test/test_logical_types.rb b/lang/ruby/test/test_logical_types.rb
new file mode 100644
index 0000000..5416e11
--- /dev/null
+++ b/lang/ruby/test/test_logical_types.rb
@@ -0,0 +1,120 @@
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'test_help'
+
+class TestLogicalTypes < Test::Unit::TestCase
+ def test_int_date
+ schema = Avro::Schema.parse <<-SCHEMA
+ { "type": "int", "logicalType": "date" }
+ SCHEMA
+
+ assert_equal 'date', schema.logical_type
+ today = Date.today
+ assert_encode_and_decode today, schema
+ assert_preencoded Avro::LogicalTypes::IntDate.encode(today), schema, today
+ end
+
+ def test_int_date_conversion
+ type = Avro::LogicalTypes::IntDate
+
+ assert_equal 5, type.encode(Date.new(1970, 1, 6))
+ assert_equal 0, type.encode(Date.new(1970, 1, 1))
+ assert_equal -5, type.encode(Date.new(1969, 12, 27))
+
+ assert_equal Date.new(1970, 1, 6), type.decode(5)
+ assert_equal Date.new(1970, 1, 1), type.decode(0)
+ assert_equal Date.new(1969, 12, 27), type.decode(-5)
+ end
+
+ def test_timestamp_millis_long
+ schema = Avro::Schema.parse <<-SCHEMA
+ { "type": "long", "logicalType": "timestamp-millis" }
+ SCHEMA
+
+ # The Time.at format is (seconds, microseconds) since Epoch.
+ time = Time.at(628232400, 12000)
+
+ assert_equal 'timestamp-millis', schema.logical_type
+ assert_encode_and_decode time, schema
+ assert_preencoded Avro::LogicalTypes::TimestampMillis.encode(time), schema, time.utc
+ end
+
+ def test_timestamp_millis_long_conversion
+ type = Avro::LogicalTypes::TimestampMillis
+
+ now = Time.now.utc
+ now_millis = Time.utc(now.year, now.month, now.day, now.hour, now.min, now.sec, now.usec / 1000 * 1000)
+
+ assert_equal now_millis, type.decode(type.encode(now_millis))
+ assert_equal 1432849613221, type.encode(Time.utc(2015, 5, 28, 21, 46, 53, 221000))
+ assert_equal 1432849613221, type.encode(DateTime.new(2015, 5, 28, 21, 46, 53.221))
+ assert_equal Time.utc(2015, 5, 28, 21, 46, 53, 221000), type.decode(1432849613221)
+ end
+
+ def test_timestamp_micros_long
+ schema = Avro::Schema.parse <<-SCHEMA
+ { "type": "long", "logicalType": "timestamp-micros" }
+ SCHEMA
+
+ # The Time.at format is (seconds, microseconds) since Epoch.
+ time = Time.at(628232400, 12345)
+
+ assert_equal 'timestamp-micros', schema.logical_type
+ assert_encode_and_decode time, schema
+ assert_preencoded Avro::LogicalTypes::TimestampMicros.encode(time), schema, time.utc
+ end
+
+ def test_timestamp_micros_long_conversion
+ type = Avro::LogicalTypes::TimestampMicros
+
+ now = Time.now.utc
+
+ assert_equal Time.utc(now.year, now.month, now.day, now.hour, now.min, now.sec, now.usec), type.decode(type.encode(now))
+ assert_equal 1432849613221843, type.encode(Time.utc(2015, 5, 28, 21, 46, 53, 221843))
+ assert_equal 1432849613221843, type.encode(DateTime.new(2015, 5, 28, 21, 46, 53.221843))
+ assert_equal Time.utc(2015, 5, 28, 21, 46, 53, 221843), type.decode(1432849613221843)
+ end
+
+ def encode(datum, schema)
+ buffer = StringIO.new("")
+ encoder = Avro::IO::BinaryEncoder.new(buffer)
+
+ datum_writer = Avro::IO::DatumWriter.new(schema)
+ datum_writer.write(datum, encoder)
+
+ buffer.string
+ end
+
+ def decode(encoded, schema)
+ buffer = StringIO.new(encoded)
+ decoder = Avro::IO::BinaryDecoder.new(buffer)
+
+ datum_reader = Avro::IO::DatumReader.new(schema, schema)
+ datum_reader.read(decoder)
+ end
+
+ def assert_encode_and_decode(datum, schema)
+ encoded = encode(datum, schema)
+ assert_equal datum, decode(encoded, schema)
+ end
+
+ def assert_preencoded(datum, schema, decoded)
+ encoded = encode(datum, schema)
+ assert_equal decoded, decode(encoded, schema)
+ end
+end
diff --git a/lang/ruby/test/test_schema.rb b/lang/ruby/test/test_schema.rb
index 48fe0a5..66ea77b 100644
--- a/lang/ruby/test/test_schema.rb
+++ b/lang/ruby/test/test_schema.rb
@@ -132,6 +132,21 @@ class TestSchema < Test::Unit::TestCase
}, schema.to_avro)
end
+ def test_to_avro_includes_logical_type
+ schema = Avro::Schema.parse <<-SCHEMA
+ {"type": "record", "name": "has_logical", "fields": [
+ {"name": "dt", "type": {"type": "int", "logicalType": "date"}}]
+ }
+ SCHEMA
+
+ assert_equal schema.to_avro, {
+ 'type' => 'record', 'name' => 'has_logical',
+ 'fields' => [
+ {'name' => 'dt', 'type' => {'type' => 'int', 'logicalType' => 'date'}}
+ ]
+ }
+ end
+
def test_unknown_named_type
error = assert_raise Avro::UnknownSchemaError do
Avro::Schema.parse <<-SCHEMA