You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by fo...@apache.org on 2018/11/07 21:50:15 UTC

[avro] branch master updated: AVRO-1695: Ruby support for logical types revisited (#116)

This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new 33f7177  AVRO-1695: Ruby support for logical types revisited (#116)
33f7177 is described below

commit 33f7177e66e04c4843676e3d7a9cf739ba812304
Author: Tim Perkins <tj...@users.noreply.github.com>
AuthorDate: Wed Nov 7 16:50:10 2018 -0500

    AVRO-1695: Ruby support for logical types revisited (#116)
    
    * AVRO-1695: Ruby support for logical types
    * Support values for logical types that were already encoded
---
 lang/ruby/Manifest                     |   2 +
 lang/ruby/lib/avro/io.rb               |  10 ++-
 lang/ruby/lib/avro/logical_types.rb    |  90 +++++++++++++++++++++++++
 lang/ruby/lib/avro/schema.rb           |  38 +++++++----
 lang/ruby/lib/avro/schema_validator.rb |  12 +++-
 lang/ruby/test/random_data.rb          |  23 ++++++-
 lang/ruby/test/test_io.rb              |  14 +++-
 lang/ruby/test/test_logical_types.rb   | 120 +++++++++++++++++++++++++++++++++
 lang/ruby/test/test_schema.rb          |  15 +++++
 9 files changed, 302 insertions(+), 22 deletions(-)

diff --git a/lang/ruby/Manifest b/lang/ruby/Manifest
index 87bfd98..9fc48c2 100644
--- a/lang/ruby/Manifest
+++ b/lang/ruby/Manifest
@@ -9,6 +9,7 @@ lib/avro.rb
 lib/avro/data_file.rb
 lib/avro/io.rb
 lib/avro/ipc.rb
+lib/avro/logical_types.rb
 lib/avro/protocol.rb
 lib/avro/schema.rb
 lib/avro/schema_compatibility.rb
@@ -24,6 +25,7 @@ test/test_datafile.rb
 test/test_fingerprints.rb
 test/test_help.rb
 test/test_io.rb
+test/test_logical_types.rb
 test/test_protocol.rb
 test/test_schema.rb
 test/test_schema_compatibility.rb
diff --git a/lang/ruby/lib/avro/io.rb b/lang/ruby/lib/avro/io.rb
index b04a19a..26bda97 100644
--- a/lang/ruby/lib/avro/io.rb
+++ b/lang/ruby/lib/avro/io.rb
@@ -254,7 +254,7 @@ module Avro
 
         # function dispatch for reading data based on type of writer's
         # schema
-        case writers_schema.type_sym
+        datum = case writers_schema.type_sym
         when :null;    decoder.read_null
         when :boolean; decoder.read_boolean
         when :string;  decoder.read_string
@@ -272,6 +272,8 @@ module Avro
         else
           raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}"
         end
+
+        readers_schema.type_adapter.decode(datum)
       end
 
       def read_fixed(writers_schema, readers_schema, decoder)
@@ -499,8 +501,10 @@ module Avro
         write_data(writers_schema, datum, encoder)
       end
 
-      def write_data(writers_schema, datum, encoder)
-        unless Schema.validate(writers_schema, datum)
+      def write_data(writers_schema, logical_datum, encoder)
+        datum = writers_schema.type_adapter.encode(logical_datum)
+
+        unless Schema.validate(writers_schema, datum, encoded = true)
           raise AvroTypeError.new(writers_schema, datum)
         end
 
diff --git a/lang/ruby/lib/avro/logical_types.rb b/lang/ruby/lib/avro/logical_types.rb
new file mode 100644
index 0000000..e1b219d
--- /dev/null
+++ b/lang/ruby/lib/avro/logical_types.rb
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'date'
+
+module Avro
+  module LogicalTypes
+    module IntDate
+      EPOCH_START = Date.new(1970, 1, 1)
+
+      def self.encode(date)
+        return date.to_i if date.is_a?(Numeric)
+
+        (date - EPOCH_START).to_i
+      end
+
+      def self.decode(int)
+        EPOCH_START + int
+      end
+    end
+
+    module TimestampMillis
+      def self.encode(value)
+        return value.to_i if value.is_a?(Numeric)
+
+        time = value.to_time
+        time.to_i * 1000 + time.usec / 1000
+      end
+
+      def self.decode(int)
+        s, ms = int / 1000, int % 1000
+        Time.at(s, ms * 1000).utc
+      end
+    end
+
+    module TimestampMicros
+      def self.encode(value)
+        return value.to_i if value.is_a?(Numeric)
+
+        time = value.to_time
+        time.to_i * 1000_000 + time.usec
+      end
+
+      def self.decode(int)
+        s, us = int / 1000_000, int % 1000_000
+        Time.at(s, us).utc
+      end
+    end
+
+    module Identity
+      def self.encode(datum)
+        datum
+      end
+
+      def self.decode(datum)
+        datum
+      end
+    end
+
+    TYPES = {
+      "int" => {
+        "date" => IntDate
+      },
+      "long" => {
+        "timestamp-millis" => TimestampMillis,
+        "timestamp-micros" => TimestampMicros
+      },
+    }.freeze
+
+    def self.type_adapter(type, logical_type)
+      return unless logical_type
+
+      TYPES.fetch(type, {}.freeze).fetch(logical_type, Identity)
+    end
+  end
+end
diff --git a/lang/ruby/lib/avro/schema.rb b/lang/ruby/lib/avro/schema.rb
index 024d562..3acd07b 100644
--- a/lang/ruby/lib/avro/schema.rb
+++ b/lang/ruby/lib/avro/schema.rb
@@ -14,6 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+require 'avro/logical_types'
+
 module Avro
   class Schema
     # Sets of strings, for backwards compatibility. See below for sets of symbols,
@@ -40,6 +42,7 @@ module Avro
     def self.real_parse(json_obj, names=nil, default_namespace=nil)
       if json_obj.is_a? Hash
         type = json_obj['type']
+        logical_type = json_obj['logicalType']
         raise SchemaParseError, %Q(No "type" property: #{json_obj}) if type.nil?
 
         # Check that the type is valid before calling #to_sym, since symbols are never garbage
@@ -50,7 +53,7 @@ module Avro
 
         type_sym = type.to_sym
         if PRIMITIVE_TYPES_SYM.include?(type_sym)
-          return PrimitiveSchema.new(type_sym)
+          return PrimitiveSchema.new(type_sym, logical_type)
 
         elsif NAMED_TYPES_SYM.include? type_sym
           name = json_obj['name']
@@ -58,7 +61,7 @@ module Avro
           case type_sym
           when :fixed
             size = json_obj['size']
-            return FixedSchema.new(name, namespace, size, names)
+            return FixedSchema.new(name, namespace, size, names, logical_type)
           when :enum
             symbols = json_obj['symbols']
             doc     = json_obj['doc']
@@ -93,23 +96,29 @@ module Avro
     end
 
     # Determine if a ruby datum is an instance of a schema
-    def self.validate(expected_schema, datum)
-      SchemaValidator.validate!(expected_schema, datum)
+    def self.validate(expected_schema, logical_datum, encoded = false)
+      SchemaValidator.validate!(expected_schema, logical_datum, encoded)
       true
     rescue SchemaValidator::ValidationError
       false
     end
 
-    def initialize(type)
+    def initialize(type, logical_type=nil)
       @type_sym = type.is_a?(Symbol) ? type : type.to_sym
+      @logical_type = logical_type
     end
 
     attr_reader :type_sym
+    attr_reader :logical_type
 
     # Returns the type as a string (rather than a symbol), for backwards compatibility.
     # Deprecated in favor of {#type_sym}.
     def type; @type_sym.to_s; end
 
+    def type_adapter
+      @type_adapter ||= LogicalTypes.type_adapter(type, logical_type) || LogicalTypes::Identity
+    end
+
     # Returns the MD5 fingerprint of the schema as an Integer.
     def md5_fingerprint
       parsing_form = SchemaNormalization.to_parsing_form(self)
@@ -157,7 +166,9 @@ module Avro
     end
 
     def to_avro(names=nil)
-      {'type' => type}
+      props = {'type' => type}
+      props['logicalType'] = logical_type if logical_type
+      props
     end
 
     def to_s
@@ -166,8 +177,9 @@ module Avro
 
     class NamedSchema < Schema
       attr_reader :name, :namespace
-      def initialize(type, name, namespace=nil, names=nil, doc=nil)
-        super(type)
+
+      def initialize(type, name, namespace=nil, names=nil, doc=nil, logical_type=nil)
+        super(type, logical_type)
         @name, @namespace = Name.extract_namespace(name, namespace)
         @doc  = doc
         names = Name.add_name(names, self)
@@ -318,11 +330,11 @@ module Avro
 
     # Valid primitive types are in PRIMITIVE_TYPES.
     class PrimitiveSchema < Schema
-      def initialize(type)
+      def initialize(type, logical_type=nil)
         if PRIMITIVE_TYPES_SYM.include?(type)
-          super(type)
+          super(type, logical_type)
         elsif PRIMITIVE_TYPES.include?(type)
-          super(type.to_sym)
+          super(type.to_sym, logical_type)
         else
           raise AvroError.new("#{type} is not a valid primitive type.")
         end
@@ -336,12 +348,12 @@ module Avro
 
     class FixedSchema < NamedSchema
       attr_reader :size
-      def initialize(name, space, size, names=nil)
+      def initialize(name, space, size, names=nil, logical_type=nil)
         # Ensure valid cto args
         unless size.is_a?(Integer)
           raise AvroError, 'Fixed Schema requires a valid integer for size property.'
         end
-        super(:fixed, name, space, names)
+        super(:fixed, name, space, names, logical_type)
         @size = size
       end
 
diff --git a/lang/ruby/lib/avro/schema_validator.rb b/lang/ruby/lib/avro/schema_validator.rb
index 89b0a9c..67464fb 100644
--- a/lang/ruby/lib/avro/schema_validator.rb
+++ b/lang/ruby/lib/avro/schema_validator.rb
@@ -62,16 +62,22 @@ module Avro
     TypeMismatchError = Class.new(ValidationError)
 
     class << self
-      def validate!(expected_schema, datum)
+      def validate!(expected_schema, logical_datum, encoded = false)
         result = Result.new
-        validate_recursive(expected_schema, datum, ROOT_IDENTIFIER, result)
+        validate_recursive(expected_schema, logical_datum, ROOT_IDENTIFIER, result, encoded)
         fail ValidationError, result if result.failure?
         result
       end
 
       private
 
-      def validate_recursive(expected_schema, datum, path, result)
+      def validate_recursive(expected_schema, logical_datum, path, result, encoded = false)
+        datum = if encoded
+                  logical_datum
+                else
+                  expected_schema.type_adapter.encode(logical_datum) rescue nil
+                end
+
         case expected_schema.type_sym
         when :null
           fail TypeMismatchError unless datum.nil?
diff --git a/lang/ruby/test/random_data.rb b/lang/ruby/test/random_data.rb
index 9d276f7..54fa878 100644
--- a/lang/ruby/test/random_data.rb
+++ b/lang/ruby/test/random_data.rb
@@ -27,15 +27,17 @@ class RandomData
   end
 
   def nextdata(schm, d=0)
+    return logical_nextdata(schm, d=0) unless schm.type_adapter.eql?(Avro::LogicalTypes::Identity)
+
     case schm.type_sym
     when :boolean
       rand > 0.5
     when :string
       randstr()
     when :int
-      rand(Avro::Schema::INT_MAX_VALUE - Avro::Schema::INT_MIN_VALUE) + Avro::Schema::INT_MIN_VALUE
+      rand_int
     when :long
-      rand(Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) + Avro::Schema::LONG_MIN_VALUE
+      rand_long
     when :float
       (-1024 + 2048 * rand).round.to_f
     when :double
@@ -79,6 +81,15 @@ class RandomData
     end
   end
 
+  def logical_nextdata(schm, _d=0)
+    case schm.logical_type
+    when 'date'
+      Avro::LogicalTypes::IntDate.decode(rand_int)
+    when 'timestamp-millis', 'timestamp-micros'
+      Avro::LogicalTypes::TimestampMicros.decode(rand_long)
+    end
+  end
+
   CHARPOOL = 'abcdefghjkmnpqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ23456789'
   BYTEPOOL = '12345abcd'
 
@@ -87,4 +98,12 @@ class RandomData
     rand(length+1).times { str << chars[rand(chars.size)] }
     str
   end
+
+  def rand_int
+    rand(Avro::Schema::INT_MAX_VALUE - Avro::Schema::INT_MIN_VALUE) + Avro::Schema::INT_MIN_VALUE
+  end
+
+  def rand_long
+    rand(Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) + Avro::Schema::LONG_MIN_VALUE
+  end
 end
diff --git a/lang/ruby/test/test_io.rb b/lang/ruby/test/test_io.rb
index fc0088b..70bb4d6 100644
--- a/lang/ruby/test/test_io.rb
+++ b/lang/ruby/test/test_io.rb
@@ -84,6 +84,17 @@ EOS
     check_default(record_schema, '{"f": 11}', {"f" => 11})
   end
 
+  def test_record_with_logical_type
+    record_schema = <<EOS
+      {"type": "record",
+       "name": "Test",
+       "fields": [{"name": "ts",
+                   "type": {"type": "long",
+                            "logicalType": "timestamp-micros"}}]}
+EOS
+    check(record_schema)
+  end
+
   def test_error
     error_schema = <<EOS
       {"type": "error",
@@ -115,6 +126,7 @@ EOS
   def test_union
     union_schema = <<EOS
       ["string",
+       {"type": "int", "logicalType": "date"},
        "null",
        "long",
        {"type": "record",
@@ -451,7 +463,7 @@ EOS
 
   def checkser(schm, randomdata)
     datum = randomdata.next
-    assert validate(schm, datum)
+    assert validate(schm, datum), 'datum is not valid for schema'
     w = Avro::IO::DatumWriter.new(schm)
     writer = StringIO.new "", "w"
     w.write(datum, Avro::IO::BinaryEncoder.new(writer))
diff --git a/lang/ruby/test/test_logical_types.rb b/lang/ruby/test/test_logical_types.rb
new file mode 100644
index 0000000..5416e11
--- /dev/null
+++ b/lang/ruby/test/test_logical_types.rb
@@ -0,0 +1,120 @@
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'test_help'
+
+class TestLogicalTypes < Test::Unit::TestCase
+  def test_int_date
+    schema = Avro::Schema.parse <<-SCHEMA
+      { "type": "int", "logicalType": "date" }
+    SCHEMA
+
+    assert_equal 'date', schema.logical_type
+    today = Date.today
+    assert_encode_and_decode today, schema
+    assert_preencoded Avro::LogicalTypes::IntDate.encode(today), schema, today
+  end
+
+  def test_int_date_conversion
+    type = Avro::LogicalTypes::IntDate
+
+    assert_equal 5, type.encode(Date.new(1970, 1, 6))
+    assert_equal 0, type.encode(Date.new(1970, 1, 1))
+    assert_equal -5, type.encode(Date.new(1969, 12, 27))
+
+    assert_equal Date.new(1970, 1, 6), type.decode(5)
+    assert_equal Date.new(1970, 1, 1), type.decode(0)
+    assert_equal Date.new(1969, 12, 27), type.decode(-5)
+  end
+
+  def test_timestamp_millis_long
+    schema = Avro::Schema.parse <<-SCHEMA
+      { "type": "long", "logicalType": "timestamp-millis" }
+    SCHEMA
+
+    # The Time.at format is (seconds, microseconds) since Epoch.
+    time = Time.at(628232400, 12000)
+
+    assert_equal 'timestamp-millis', schema.logical_type
+    assert_encode_and_decode time, schema
+    assert_preencoded Avro::LogicalTypes::TimestampMillis.encode(time), schema, time.utc
+  end
+
+  def test_timestamp_millis_long_conversion
+    type = Avro::LogicalTypes::TimestampMillis
+
+    now = Time.now.utc
+    now_millis = Time.utc(now.year, now.month, now.day, now.hour, now.min, now.sec, now.usec / 1000 * 1000)
+
+    assert_equal now_millis, type.decode(type.encode(now_millis))
+    assert_equal 1432849613221, type.encode(Time.utc(2015, 5, 28, 21, 46, 53, 221000))
+    assert_equal 1432849613221, type.encode(DateTime.new(2015, 5, 28, 21, 46, 53.221))
+    assert_equal Time.utc(2015, 5, 28, 21, 46, 53, 221000), type.decode(1432849613221)
+  end
+
+  def test_timestamp_micros_long
+    schema = Avro::Schema.parse <<-SCHEMA
+      { "type": "long", "logicalType": "timestamp-micros" }
+    SCHEMA
+
+    # The Time.at format is (seconds, microseconds) since Epoch.
+    time = Time.at(628232400, 12345)
+
+    assert_equal 'timestamp-micros', schema.logical_type
+    assert_encode_and_decode time, schema
+    assert_preencoded Avro::LogicalTypes::TimestampMicros.encode(time), schema, time.utc
+  end
+
+  def test_timestamp_micros_long_conversion
+    type = Avro::LogicalTypes::TimestampMicros
+
+    now = Time.now.utc
+
+    assert_equal Time.utc(now.year, now.month, now.day, now.hour, now.min, now.sec, now.usec), type.decode(type.encode(now))
+    assert_equal 1432849613221843, type.encode(Time.utc(2015, 5, 28, 21, 46, 53, 221843))
+    assert_equal 1432849613221843, type.encode(DateTime.new(2015, 5, 28, 21, 46, 53.221843))
+    assert_equal Time.utc(2015, 5, 28, 21, 46, 53, 221843), type.decode(1432849613221843)
+  end
+
+  def encode(datum, schema)
+    buffer = StringIO.new("")
+    encoder = Avro::IO::BinaryEncoder.new(buffer)
+
+    datum_writer = Avro::IO::DatumWriter.new(schema)
+    datum_writer.write(datum, encoder)
+
+    buffer.string
+  end
+
+  def decode(encoded, schema)
+    buffer = StringIO.new(encoded)
+    decoder = Avro::IO::BinaryDecoder.new(buffer)
+
+    datum_reader = Avro::IO::DatumReader.new(schema, schema)
+    datum_reader.read(decoder)
+  end
+
+  def assert_encode_and_decode(datum, schema)
+    encoded = encode(datum, schema)
+    assert_equal datum, decode(encoded, schema)
+  end
+
+  def assert_preencoded(datum, schema, decoded)
+    encoded = encode(datum, schema)
+    assert_equal decoded, decode(encoded, schema)
+  end
+end
diff --git a/lang/ruby/test/test_schema.rb b/lang/ruby/test/test_schema.rb
index 48fe0a5..66ea77b 100644
--- a/lang/ruby/test/test_schema.rb
+++ b/lang/ruby/test/test_schema.rb
@@ -132,6 +132,21 @@ class TestSchema < Test::Unit::TestCase
     }, schema.to_avro)
   end
 
+  def test_to_avro_includes_logical_type
+    schema = Avro::Schema.parse <<-SCHEMA
+      {"type": "record", "name": "has_logical", "fields": [
+        {"name": "dt", "type": {"type": "int", "logicalType": "date"}}]
+      }
+    SCHEMA
+
+    assert_equal schema.to_avro, {
+      'type' => 'record', 'name' => 'has_logical',
+      'fields' => [
+        {'name' => 'dt', 'type' => {'type' => 'int', 'logicalType' => 'date'}}
+      ]
+    }
+  end
+
   def test_unknown_named_type
     error = assert_raise Avro::UnknownSchemaError do
       Avro::Schema.parse <<-SCHEMA