You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ma...@apache.org on 2013/04/13 06:50:28 UTC
svn commit: r1467549 - in /avro/trunk: CHANGES.txt lang/ruby/Rakefile
lang/ruby/interop/test_interop.rb lang/ruby/lib/avro.rb
lang/ruby/lib/avro/data_file.rb lang/ruby/test/test_datafile.rb
Author: martinkl
Date: Sat Apr 13 04:50:28 2013
New Revision: 1467549
URL: http://svn.apache.org/r1467549
Log:
AVRO-1288. Ruby: Add support for deflate codec in data files.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/ruby/Rakefile
avro/trunk/lang/ruby/interop/test_interop.rb
avro/trunk/lang/ruby/lib/avro.rb
avro/trunk/lang/ruby/lib/avro/data_file.rb
avro/trunk/lang/ruby/test/test_datafile.rb
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1467549&r1=1467548&r2=1467549&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Sat Apr 13 04:50:28 2013
@@ -25,6 +25,9 @@ Trunk (not yet released)
AVRO-1287. Add data file with deflate codec to the interoperability
test suite. (martinkl)
+ AVRO-1288. Ruby: Add support for deflate codec in data files.
+ (martinkl)
+
BUG FIXES
AVRO-1266. Java: Fix mapred.AvroMultipleOutputs to support multiple
Modified: avro/trunk/lang/ruby/Rakefile
URL: http://svn.apache.org/viewvc/avro/trunk/lang/ruby/Rakefile?rev=1467549&r1=1467548&r2=1467549&view=diff
==============================================================================
--- avro/trunk/lang/ruby/Rakefile (original)
+++ avro/trunk/lang/ruby/Rakefile Sat Apr 13 04:50:28 2013
@@ -45,6 +45,10 @@ task :generate_interop do
ensure
writer.close
end
+
+ Avro::DataFile.open(BUILD + '/interop/data/ruby_deflate.avro', 'w', schema.to_s, :deflate) do |writer|
+ 20.times { writer << r.next }
+ end
end
Modified: avro/trunk/lang/ruby/interop/test_interop.rb
URL: http://svn.apache.org/viewvc/avro/trunk/lang/ruby/interop/test_interop.rb?rev=1467549&r1=1467548&r2=1467549&view=diff
==============================================================================
--- avro/trunk/lang/ruby/interop/test_interop.rb (original)
+++ avro/trunk/lang/ruby/interop/test_interop.rb Sat Apr 13 04:50:28 2013
@@ -23,10 +23,7 @@ class TestInterop < Test::Unit::TestCase
HERE = File.expand_path(File.dirname(__FILE__))
SHARE = HERE + '/../../../share'
SCHEMAS = SHARE + '/test/schemas'
-
- # TODO this currently ignores data files with deflate codec.
- # Remove this restriction when deflate support is added.
- Dir[HERE + '/../../../build/interop/data/*'].reject{|fn| fn =~ /_deflate/ }.each do |fn|
+ Dir[HERE + '/../../../build/interop/data/*'].each do |fn|
define_method("test_read_#{File.basename(fn, 'avro')}") do
projection = Avro::Schema.parse(File.read(SCHEMAS+'/interop.avsc'))
Modified: avro/trunk/lang/ruby/lib/avro.rb
URL: http://svn.apache.org/viewvc/avro/trunk/lang/ruby/lib/avro.rb?rev=1467549&r1=1467548&r2=1467549&view=diff
==============================================================================
--- avro/trunk/lang/ruby/lib/avro.rb (original)
+++ avro/trunk/lang/ruby/lib/avro.rb Sat Apr 13 04:50:28 2013
@@ -19,6 +19,7 @@ require 'set'
require 'digest/md5'
require 'net/http'
require 'stringio'
+require 'zlib'
module Avro
VERSION = "FIXME"
Modified: avro/trunk/lang/ruby/lib/avro/data_file.rb
URL: http://svn.apache.org/viewvc/avro/trunk/lang/ruby/lib/avro/data_file.rb?rev=1467549&r1=1467548&r2=1467549&view=diff
==============================================================================
--- avro/trunk/lang/ruby/lib/avro/data_file.rb (original)
+++ avro/trunk/lang/ruby/lib/avro/data_file.rb Sat Apr 13 04:50:28 2013
@@ -24,19 +24,18 @@ module Avro
SYNC_SIZE = 16
SYNC_INTERVAL = 1000 * SYNC_SIZE
META_SCHEMA = Schema.parse('{"type": "map", "values": "bytes"}')
- VALID_CODECS = ['null']
VALID_ENCODINGS = ['binary'] # not used yet
class DataFileError < AvroError; end
- def self.open(file_path, mode='r', schema=nil)
+ def self.open(file_path, mode='r', schema=nil, codec=nil)
schema = Avro::Schema.parse(schema) if schema
case mode
when 'w'
unless schema
raise DataFileError, "Writing an Avro file requires a schema."
end
- io = open_writer(File.open(file_path, 'wb'), schema)
+ io = open_writer(File.open(file_path, 'wb'), schema, codec)
when 'r'
io = open_reader(File.open(file_path, 'rb'), schema)
else
@@ -49,11 +48,34 @@ module Avro
io.close if block_given? && io
end
+ def self.codecs
+ @codecs
+ end
+
+ def self.register_codec(codec)
+ @codecs ||= {}
+ codec = codec.new if !codec.respond_to?(:codec_name) && codec.is_a?(Class)
+ @codecs[codec.codec_name.to_s] = codec
+ end
+
+ def self.get_codec(codec)
+ codec ||= 'null'
+ if codec.respond_to?(:compress) && codec.respond_to?(:decompress)
+ codec # it's a codec instance
+ elsif codec.is_a?(Class)
+ codec.new # it's a codec class
+ elsif @codecs.include?(codec.to_s)
+ @codecs[codec.to_s] # it's a string or symbol (codec name)
+ else
+ raise DataFileError, "Unknown codec: #{codec.inspect}"
+ end
+ end
+
class << self
private
- def open_writer(file, schema)
+ def open_writer(file, schema, codec=nil)
writer = Avro::IO::DatumWriter.new(schema)
- Avro::DataFile::Writer.new(file, writer, schema)
+ Avro::DataFile::Writer.new(file, writer, schema, codec)
end
def open_reader(file, schema)
@@ -67,10 +89,10 @@ module Avro
OpenSSL::Random.random_bytes(16)
end
- attr_reader :writer, :encoder, :datum_writer, :buffer_writer, :buffer_encoder, :sync_marker, :meta
+ attr_reader :writer, :encoder, :datum_writer, :buffer_writer, :buffer_encoder, :sync_marker, :meta, :codec
attr_accessor :block_count
- def initialize(writer, datum_writer, writers_schema=nil)
+ def initialize(writer, datum_writer, writers_schema=nil, codec=nil)
# If writers_schema is not present, presume we're appending
@writer = writer
@encoder = IO::BinaryEncoder.new(@writer)
@@ -83,7 +105,8 @@ module Avro
if writers_schema
@sync_marker = Writer.generate_sync_marker
- meta['avro.codec'] = 'null'
+ @codec = DataFile.get_codec(codec)
+ meta['avro.codec'] = @codec.codec_name.to_s
meta['avro.schema'] = writers_schema.to_s
datum_writer.writers_schema = writers_schema
write_header
@@ -95,6 +118,7 @@ module Avro
# collect metadata
@sync_marker = dfr.sync_marker
meta['avro.codec'] = dfr.meta['avro.codec']
+ @codec = DataFile.get_codec(meta['avro.codec'])
# get schema used to write existing file
schema_from_file = dfr.meta['avro.schema']
@@ -152,21 +176,15 @@ module Avro
# TODO(jmhodges): make a schema for blocks and use datum_writer
# TODO(jmhodges): do we really need the number of items in the block?
- # TODO(jmhodges): use codec when writing the block contents
def write_block
if block_count > 0
# write number of items in block and block size in bytes
encoder.write_long(block_count)
- to_write = buffer_writer.string
+ to_write = codec.compress(buffer_writer.string)
encoder.write_long(to_write.size)
# write block contents
- if meta['avro.codec'] == 'null'
- writer.write(to_write)
- else
- msg = "#{meta['avro.codec'].inspect} coded is not supported"
- raise DataFileError, msg
- end
+ writer.write(to_write)
# write sync marker
writer.write(sync_marker)
@@ -183,8 +201,14 @@ module Avro
class Reader
include ::Enumerable
- attr_reader :reader, :decoder, :datum_reader, :sync_marker, :meta, :file_length
- attr_accessor :block_count
+ # The reader and binary decoder for the raw file stream
+ attr_reader :reader, :decoder
+
+ # The binary decoder for the contents of a block (after codec decompression)
+ attr_reader :block_decoder
+
+ attr_reader :datum_reader, :sync_marker, :meta, :file_length, :codec
+ attr_accessor :block_count # records remaining in current block
def initialize(reader, datum_reader)
@reader = reader
@@ -194,11 +218,7 @@ module Avro
# read the header: magic, meta, sync
read_header
- # ensure the codec is valid
- codec_from_file = meta['avro.codec']
- if codec_from_file && ! VALID_CODECS.include?(codec_from_file)
- raise DataFileError, "Unknown codec: #{codec_from_file}"
- end
+ @codec = DataFile.get_codec(meta['avro.codec'])
# get ready to read
@block_count = 0
@@ -220,7 +240,7 @@ module Avro
end
end
- datum = datum_reader.read(decoder)
+ datum = datum_reader.read(block_decoder)
self.block_count -= 1
yield(datum)
end
@@ -257,7 +277,9 @@ module Avro
def read_block_header
self.block_count = decoder.read_long
- decoder.read_long # not doing anything with length in bytes
+ block_bytes = decoder.read_long
+ data = codec.decompress(reader.read(block_bytes))
+ @block_decoder = IO::BinaryDecoder.new(StringIO.new(data))
end
# read the length of the sync marker; if it matches the sync
@@ -273,5 +295,48 @@ module Avro
end
end
end
+
+
+ class NullCodec
+ def codec_name; 'null'; end
+ def decompress(data); data; end
+ def compress(data); data; end
+ end
+
+ class DeflateCodec
+ attr_reader :level
+
+ def initialize(level=Zlib::DEFAULT_COMPRESSION)
+ @level = level
+ end
+
+ def codec_name; 'deflate'; end
+
+ def decompress(compressed)
+ # Passing a negative number to Inflate puts it into "raw" RFC1951 mode
+ # (without the RFC1950 header & checksum). See the docs for
+ # inflateInit2 in http://www.zlib.net/manual.html
+ zstream = Zlib::Inflate.new(-Zlib::MAX_WBITS)
+ data = zstream.inflate(compressed)
+ data << zstream.finish
+ ensure
+ zstream.close
+ end
+
+ def compress(data)
+ zstream = Zlib::Deflate.new(level, -Zlib::MAX_WBITS)
+ compressed = zstream.deflate(data)
+ compressed << zstream.finish
+ ensure
+ zstream.close
+ end
+ end
+
+ DataFile.register_codec NullCodec
+ DataFile.register_codec DeflateCodec
+
+ # TODO this constant won't be updated if you register another codec.
+ # Deprecated in favor of Avro::DataFile::codecs
+ VALID_CODECS = DataFile.codecs.keys
end
end
Modified: avro/trunk/lang/ruby/test/test_datafile.rb
URL: http://svn.apache.org/viewvc/avro/trunk/lang/ruby/test/test_datafile.rb?rev=1467549&r1=1467548&r2=1467549&view=diff
==============================================================================
--- avro/trunk/lang/ruby/test/test_datafile.rb (original)
+++ avro/trunk/lang/ruby/test/test_datafile.rb Sat Apr 13 04:50:28 2013
@@ -154,4 +154,35 @@ JSON
datafile.close
end
+ def test_deflate
+ Avro::DataFile.open('data.avr', 'w', '"string"', :deflate) do |writer|
+ writer << 'a' * 10_000
+ end
+ assert(File.size('data.avr') < 500)
+
+ records = []
+ Avro::DataFile.open('data.avr') do |reader|
+ reader.each {|record| records << record }
+ end
+ assert_equal records, ['a' * 10_000]
+ end
+
+ def test_append_to_deflated_file
+ schema = Avro::Schema.parse('"string"')
+ writer = Avro::IO::DatumWriter.new(schema)
+ file = Avro::DataFile::Writer.new(File.open('data.avr', 'wb'), writer, schema, :deflate)
+ file << 'a' * 10_000
+ file.close
+
+ file = Avro::DataFile::Writer.new(File.open('data.avr', 'a+b'), writer)
+ file << 'b' * 10_000
+ file.close
+ assert(File.size('data.avr') < 1_000)
+
+ records = []
+ Avro::DataFile.open('data.avr') do |reader|
+ reader.each {|record| records << record }
+ end
+ assert_equal records, ['a' * 10_000, 'b' * 10_000]
+ end
end