You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ma...@apache.org on 2013/04/13 06:50:28 UTC

svn commit: r1467549 - in /avro/trunk: CHANGES.txt lang/ruby/Rakefile lang/ruby/interop/test_interop.rb lang/ruby/lib/avro.rb lang/ruby/lib/avro/data_file.rb lang/ruby/test/test_datafile.rb

Author: martinkl
Date: Sat Apr 13 04:50:28 2013
New Revision: 1467549

URL: http://svn.apache.org/r1467549
Log:
AVRO-1288. Ruby: Add support for deflate codec in data files.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/ruby/Rakefile
    avro/trunk/lang/ruby/interop/test_interop.rb
    avro/trunk/lang/ruby/lib/avro.rb
    avro/trunk/lang/ruby/lib/avro/data_file.rb
    avro/trunk/lang/ruby/test/test_datafile.rb

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1467549&r1=1467548&r2=1467549&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Sat Apr 13 04:50:28 2013
@@ -25,6 +25,9 @@ Trunk (not yet released)
     AVRO-1287. Add data file with deflate codec to the interoperability
     test suite. (martinkl)
 
+    AVRO-1288. Ruby: Add support for deflate codec in data files.
+    (martinkl)
+
   BUG FIXES
 
     AVRO-1266. Java: Fix mapred.AvroMultipleOutputs to support multiple

Modified: avro/trunk/lang/ruby/Rakefile
URL: http://svn.apache.org/viewvc/avro/trunk/lang/ruby/Rakefile?rev=1467549&r1=1467548&r2=1467549&view=diff
==============================================================================
--- avro/trunk/lang/ruby/Rakefile (original)
+++ avro/trunk/lang/ruby/Rakefile Sat Apr 13 04:50:28 2013
@@ -45,6 +45,10 @@ task :generate_interop do
   ensure
     writer.close
   end
+
+  Avro::DataFile.open(BUILD + '/interop/data/ruby_deflate.avro', 'w', schema.to_s, :deflate) do |writer|
+    20.times { writer << r.next }
+  end
 end
 
 

Modified: avro/trunk/lang/ruby/interop/test_interop.rb
URL: http://svn.apache.org/viewvc/avro/trunk/lang/ruby/interop/test_interop.rb?rev=1467549&r1=1467548&r2=1467549&view=diff
==============================================================================
--- avro/trunk/lang/ruby/interop/test_interop.rb (original)
+++ avro/trunk/lang/ruby/interop/test_interop.rb Sat Apr 13 04:50:28 2013
@@ -23,10 +23,7 @@ class TestInterop < Test::Unit::TestCase
   HERE = File.expand_path(File.dirname(__FILE__))
   SHARE = HERE + '/../../../share'
   SCHEMAS = SHARE + '/test/schemas'
-
-  # TODO this currently ignores data files with deflate codec.
-  # Remove this restriction when deflate support is added.
-  Dir[HERE + '/../../../build/interop/data/*'].reject{|fn| fn =~ /_deflate/ }.each do |fn|  
+  Dir[HERE + '/../../../build/interop/data/*'].each do |fn|
     define_method("test_read_#{File.basename(fn, 'avro')}") do
       projection = Avro::Schema.parse(File.read(SCHEMAS+'/interop.avsc'))
 

Modified: avro/trunk/lang/ruby/lib/avro.rb
URL: http://svn.apache.org/viewvc/avro/trunk/lang/ruby/lib/avro.rb?rev=1467549&r1=1467548&r2=1467549&view=diff
==============================================================================
--- avro/trunk/lang/ruby/lib/avro.rb (original)
+++ avro/trunk/lang/ruby/lib/avro.rb Sat Apr 13 04:50:28 2013
@@ -19,6 +19,7 @@ require 'set'
 require 'digest/md5'
 require 'net/http'
 require 'stringio'
+require 'zlib'
 
 module Avro
   VERSION = "FIXME"

Modified: avro/trunk/lang/ruby/lib/avro/data_file.rb
URL: http://svn.apache.org/viewvc/avro/trunk/lang/ruby/lib/avro/data_file.rb?rev=1467549&r1=1467548&r2=1467549&view=diff
==============================================================================
--- avro/trunk/lang/ruby/lib/avro/data_file.rb (original)
+++ avro/trunk/lang/ruby/lib/avro/data_file.rb Sat Apr 13 04:50:28 2013
@@ -24,19 +24,18 @@ module Avro
     SYNC_SIZE = 16
     SYNC_INTERVAL = 1000 * SYNC_SIZE
     META_SCHEMA = Schema.parse('{"type": "map", "values": "bytes"}')
-    VALID_CODECS = ['null']
     VALID_ENCODINGS = ['binary'] # not used yet
 
     class DataFileError < AvroError; end
 
-    def self.open(file_path, mode='r', schema=nil)
+    def self.open(file_path, mode='r', schema=nil, codec=nil)
       schema = Avro::Schema.parse(schema) if schema
       case mode
       when 'w'
         unless schema
           raise DataFileError, "Writing an Avro file requires a schema."
         end
-        io = open_writer(File.open(file_path, 'wb'), schema)
+        io = open_writer(File.open(file_path, 'wb'), schema, codec)
       when 'r'
         io = open_reader(File.open(file_path, 'rb'), schema)
       else
@@ -49,11 +48,34 @@ module Avro
       io.close if block_given? && io
     end
 
+    def self.codecs
+      @codecs
+    end
+
+    def self.register_codec(codec)
+      @codecs ||= {}
+      codec = codec.new if !codec.respond_to?(:codec_name) && codec.is_a?(Class)
+      @codecs[codec.codec_name.to_s] = codec
+    end
+
+    def self.get_codec(codec)
+      codec ||= 'null'
+      if codec.respond_to?(:compress) && codec.respond_to?(:decompress)
+        codec # it's a codec instance
+      elsif codec.is_a?(Class)
+        codec.new # it's a codec class
+      elsif @codecs.include?(codec.to_s)
+        @codecs[codec.to_s] # it's a string or symbol (codec name)
+      else
+        raise DataFileError, "Unknown codec: #{codec.inspect}"
+      end
+    end
+
     class << self
       private
-      def open_writer(file, schema)
+      def open_writer(file, schema, codec=nil)
         writer = Avro::IO::DatumWriter.new(schema)
-        Avro::DataFile::Writer.new(file, writer, schema)
+        Avro::DataFile::Writer.new(file, writer, schema, codec)
       end
 
       def open_reader(file, schema)
@@ -67,10 +89,10 @@ module Avro
         OpenSSL::Random.random_bytes(16)
       end
 
-      attr_reader :writer, :encoder, :datum_writer, :buffer_writer, :buffer_encoder, :sync_marker, :meta
+      attr_reader :writer, :encoder, :datum_writer, :buffer_writer, :buffer_encoder, :sync_marker, :meta, :codec
       attr_accessor :block_count
 
-      def initialize(writer, datum_writer, writers_schema=nil)
+      def initialize(writer, datum_writer, writers_schema=nil, codec=nil)
         # If writers_schema is not present, presume we're appending
         @writer = writer
         @encoder = IO::BinaryEncoder.new(@writer)
@@ -83,7 +105,8 @@ module Avro
 
         if writers_schema
           @sync_marker = Writer.generate_sync_marker
-          meta['avro.codec'] = 'null'
+          @codec = DataFile.get_codec(codec)
+          meta['avro.codec'] = @codec.codec_name.to_s
           meta['avro.schema'] = writers_schema.to_s
           datum_writer.writers_schema = writers_schema
           write_header
@@ -95,6 +118,7 @@ module Avro
           # collect metadata
           @sync_marker = dfr.sync_marker
           meta['avro.codec'] = dfr.meta['avro.codec']
+          @codec = DataFile.get_codec(meta['avro.codec'])
 
           # get schema used to write existing file
           schema_from_file = dfr.meta['avro.schema']
@@ -152,21 +176,15 @@ module Avro
 
       # TODO(jmhodges): make a schema for blocks and use datum_writer
       # TODO(jmhodges): do we really need the number of items in the block?
-      # TODO(jmhodges): use codec when writing the block contents
       def write_block
         if block_count > 0
           # write number of items in block and block size in bytes
           encoder.write_long(block_count)
-          to_write = buffer_writer.string
+          to_write = codec.compress(buffer_writer.string)
           encoder.write_long(to_write.size)
 
           # write block contents
-          if meta['avro.codec'] == 'null'
-            writer.write(to_write)
-          else
-            msg = "#{meta['avro.codec'].inspect} coded is not supported"
-            raise DataFileError, msg
-          end
+          writer.write(to_write)
 
           # write sync marker
           writer.write(sync_marker)
@@ -183,8 +201,14 @@ module Avro
     class Reader
       include ::Enumerable
 
-      attr_reader :reader, :decoder, :datum_reader, :sync_marker, :meta, :file_length
-      attr_accessor :block_count
+      # The reader and binary decoder for the raw file stream
+      attr_reader :reader, :decoder
+
+      # The binary decoder for the contents of a block (after codec decompression)
+      attr_reader :block_decoder
+
+      attr_reader :datum_reader, :sync_marker, :meta, :file_length, :codec
+      attr_accessor :block_count # records remaining in current block
 
       def initialize(reader, datum_reader)
         @reader = reader
@@ -194,11 +218,7 @@ module Avro
         # read the header: magic, meta, sync
         read_header
 
-        # ensure the codec is valid
-        codec_from_file = meta['avro.codec']
-        if codec_from_file && ! VALID_CODECS.include?(codec_from_file)
-          raise DataFileError, "Unknown codec: #{codec_from_file}"
-        end
+        @codec = DataFile.get_codec(meta['avro.codec'])
 
         # get ready to read
         @block_count = 0
@@ -220,7 +240,7 @@ module Avro
             end
           end
 
-          datum = datum_reader.read(decoder)
+          datum = datum_reader.read(block_decoder)
           self.block_count -= 1
           yield(datum)
         end
@@ -257,7 +277,9 @@ module Avro
 
       def read_block_header
         self.block_count = decoder.read_long
-        decoder.read_long # not doing anything with length in bytes
+        block_bytes = decoder.read_long
+        data = codec.decompress(reader.read(block_bytes))
+        @block_decoder = IO::BinaryDecoder.new(StringIO.new(data))
       end
 
       # read the length of the sync marker; if it matches the sync
@@ -273,5 +295,48 @@ module Avro
         end
       end
     end
+
+
+    class NullCodec
+      def codec_name; 'null'; end
+      def decompress(data); data; end
+      def compress(data); data; end
+    end
+
+    class DeflateCodec
+      attr_reader :level
+
+      def initialize(level=Zlib::DEFAULT_COMPRESSION)
+        @level = level
+      end
+
+      def codec_name; 'deflate'; end
+
+      def decompress(compressed)
+        # Passing a negative number to Inflate puts it into "raw" RFC1951 mode
+        # (without the RFC1950 header & checksum). See the docs for
+        # inflateInit2 in http://www.zlib.net/manual.html
+        zstream = Zlib::Inflate.new(-Zlib::MAX_WBITS)
+        data = zstream.inflate(compressed)
+        data << zstream.finish
+      ensure
+        zstream.close
+      end
+
+      def compress(data)
+        zstream = Zlib::Deflate.new(level, -Zlib::MAX_WBITS)
+        compressed = zstream.deflate(data)
+        compressed << zstream.finish
+      ensure
+        zstream.close
+      end
+    end
+
+    DataFile.register_codec NullCodec
+    DataFile.register_codec DeflateCodec
+
+    # TODO this constant won't be updated if you register another codec.
+    # Deprecated in favor of Avro::DataFile::codecs
+    VALID_CODECS = DataFile.codecs.keys
   end
 end

Modified: avro/trunk/lang/ruby/test/test_datafile.rb
URL: http://svn.apache.org/viewvc/avro/trunk/lang/ruby/test/test_datafile.rb?rev=1467549&r1=1467548&r2=1467549&view=diff
==============================================================================
--- avro/trunk/lang/ruby/test/test_datafile.rb (original)
+++ avro/trunk/lang/ruby/test/test_datafile.rb Sat Apr 13 04:50:28 2013
@@ -154,4 +154,35 @@ JSON
     datafile.close
   end
 
+  def test_deflate
+    Avro::DataFile.open('data.avr', 'w', '"string"', :deflate) do |writer|
+      writer << 'a' * 10_000
+    end
+    assert(File.size('data.avr') < 500)
+
+    records = []
+    Avro::DataFile.open('data.avr') do |reader|
+      reader.each {|record| records << record }
+    end
+    assert_equal records, ['a' * 10_000]
+  end
+
+  def test_append_to_deflated_file
+    schema = Avro::Schema.parse('"string"')
+    writer = Avro::IO::DatumWriter.new(schema)
+    file = Avro::DataFile::Writer.new(File.open('data.avr', 'wb'), writer, schema, :deflate)
+    file << 'a' * 10_000
+    file.close
+
+    file = Avro::DataFile::Writer.new(File.open('data.avr', 'a+b'), writer)
+    file << 'b' * 10_000
+    file.close
+    assert(File.size('data.avr') < 1_000)
+
+    records = []
+    Avro::DataFile.open('data.avr') do |reader|
+      reader.each {|record| records << record }
+    end
+    assert_equal records, ['a' * 10_000, 'b' * 10_000]
+  end
 end