You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2011/09/07 23:23:23 UTC
svn commit: r1166408 - in /avro/branches/branch-1.5: ./ CHANGES.txt
lang/py/setup.py lang/py/src/avro/datafile.py lang/py/src/avro/io.py
lang/py/test/test_datafile.py share/test/data/weather-snappy.avro
Author: cutting
Date: Wed Sep 7 21:23:22 2011
New Revision: 1166408
URL: http://svn.apache.org/viewvc?rev=1166408&view=rev
Log:
Merge -c 1166407 from trunk to 1.5 branch. Fixes: AVRO-866.
Added:
avro/branches/branch-1.5/share/test/data/weather-snappy.avro
- copied unchanged from r1166407, avro/trunk/share/test/data/weather-snappy.avro
Modified:
avro/branches/branch-1.5/ (props changed)
avro/branches/branch-1.5/CHANGES.txt
avro/branches/branch-1.5/lang/py/setup.py
avro/branches/branch-1.5/lang/py/src/avro/datafile.py
avro/branches/branch-1.5/lang/py/src/avro/io.py
avro/branches/branch-1.5/lang/py/test/test_datafile.py
Propchange: avro/branches/branch-1.5/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 7 21:23:22 2011
@@ -1 +1 @@
-/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1079680,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550,1094812,1095206-1095208,1095493,1095529,1095548,1095550,1096798,1097916,1097927,1097968,1097974,1099257,1102332,1102335,1124127,1124971,1129053,1129071,1129697-1129706,1129729,1129856,1130503,1136342,1141619,1141677,1141685,1141979-1141980,1142057,1142063,1151660,1151983,1157245,1161743,1161755,1161764,1161769-1161770,1161779,1166326
+/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1079680,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550,1094812,1095206-1095208,1095493,1095529,1095548,1095550,1096798,1097916,1097927,1097968,1097974,1099257,1102332,1102335,1124127,1124971,1129053,1129071,1129697-1129706,1129729,1129856,1130503,1136342,1141619,1141677,1141685,1141979-1141980,1142057,1142063,1151660,1151983,1157245,1161743,1161755,1161764,1161769-1161770,1161779,1166326,1166407
Modified: avro/branches/branch-1.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/CHANGES.txt?rev=1166408&r1=1166407&r2=1166408&view=diff
==============================================================================
--- avro/branches/branch-1.5/CHANGES.txt (original)
+++ avro/branches/branch-1.5/CHANGES.txt Wed Sep 7 21:23:22 2011
@@ -2,6 +2,11 @@ Avro Change Log
Avro 1.5.4 (unreleased)
+ IMPROVEMENTS
+
+ AVRO-866. Python: Add support for snappy compression codec.
+ (Tom White via cutting)
+
BUG FIXES
AVRO-884. Java: Fix a regression in RPC so that one-way messages
Modified: avro/branches/branch-1.5/lang/py/setup.py
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/py/setup.py?rev=1166408&r1=1166407&r2=1166408&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/py/setup.py (original)
+++ avro/branches/branch-1.5/lang/py/setup.py Wed Sep 7 21:23:22 2011
@@ -22,9 +22,9 @@ except ImportError:
from sys import version_info
if version_info[:2] > (2, 5):
- install_requires = []
+ install_requires = ['python-snappy']
else:
- install_requires = ['simplejson >= 2.0.9']
+ install_requires = ['python-snappy', 'simplejson >= 2.0.9']
setup(
name = 'avro',
Modified: avro/branches/branch-1.5/lang/py/src/avro/datafile.py
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/py/src/avro/datafile.py?rev=1166408&r1=1166407&r2=1166408&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/py/src/avro/datafile.py (original)
+++ avro/branches/branch-1.5/lang/py/src/avro/datafile.py Wed Sep 7 21:23:22 2011
@@ -23,7 +23,10 @@ except ImportError:
from StringIO import StringIO
from avro import schema
from avro import io
-
+try:
+ import snappy
+except:
+ pass # fail later if snappy is used
#
# Constants
#
@@ -40,7 +43,7 @@ META_SCHEMA = schema.parse("""\
{"name": "meta", "type": {"type": "map", "values": "bytes"}},
{"name": "sync", "type": {"type": "fixed", "name": "sync", "size": %d}}]}
""" % (MAGIC_SIZE, SYNC_SIZE))
-VALID_CODECS = ['null', 'deflate']
+VALID_CODECS = ['null', 'deflate', 'snappy']
VALID_ENCODINGS = ['binary'] # not used yet
CODEC_KEY = "avro.codec"
@@ -142,19 +145,28 @@ class DataFileWriter(object):
uncompressed_data = self.buffer_writer.getvalue()
if self.get_meta(CODEC_KEY) == 'null':
compressed_data = uncompressed_data
+ compressed_data_length = len(compressed_data)
elif self.get_meta(CODEC_KEY) == 'deflate':
# The first two characters and last character are zlib
# wrappers around deflate data.
compressed_data = zlib.compress(uncompressed_data)[2:-1]
+ compressed_data_length = len(compressed_data)
+ elif self.get_meta(CODEC_KEY) == 'snappy':
+ compressed_data = snappy.compress(uncompressed_data)
+ compressed_data_length = len(compressed_data) + 4 # crc32
else:
fail_msg = '"%s" codec is not supported.' % self.get_meta(CODEC_KEY)
raise DataFileException(fail_msg)
# Write length of block
- self.encoder.write_long(len(compressed_data))
+ self.encoder.write_long(compressed_data_length)
# Write block
self.writer.write(compressed_data)
+
+ # Write CRC32 checksum for Snappy
+ if self.get_meta(CODEC_KEY) == 'snappy':
+ self.encoder.write_crc32(uncompressed_data)
# write sync marker
self.writer.write(self.sync_marker)
@@ -280,7 +292,7 @@ class DataFileReader(object):
# Skip a long; we don't need to use the length.
self.raw_decoder.skip_long()
self._datum_decoder = self._raw_decoder
- else:
+ elif self.codec == 'deflate':
# Compressed data is stored as (length, data), which
# corresponds to how the "bytes" type is encoded.
data = self.raw_decoder.read_bytes()
@@ -288,6 +300,15 @@ class DataFileReader(object):
# "raw" (no zlib headers) decompression. See zlib.h.
uncompressed = zlib.decompress(data, -15)
self._datum_decoder = io.BinaryDecoder(StringIO(uncompressed))
+ elif self.codec == 'snappy':
+ # Compressed data includes a 4-byte CRC32 checksum
+ length = self.raw_decoder.read_long()
+ data = self.raw_decoder.read(length - 4)
+ uncompressed = snappy.decompress(data)
+ self._datum_decoder = io.BinaryDecoder(StringIO(uncompressed))
+ self.raw_decoder.check_crc32(uncompressed);
+ else:
+ raise DataFileException("Unknown codec: %r" % self.codec)
def _skip_sync(self):
"""
Modified: avro/branches/branch-1.5/lang/py/src/avro/io.py
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/py/src/avro/io.py?rev=1166408&r1=1166407&r2=1166408&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/py/src/avro/io.py (original)
+++ avro/branches/branch-1.5/lang/py/src/avro/io.py Wed Sep 7 21:23:22 2011
@@ -39,6 +39,7 @@ uses the following mapping:
import struct
from avro import schema
import sys
+from binascii import crc32
try:
import json
@@ -71,6 +72,7 @@ STRUCT_INT = struct_class('!I') # bi
STRUCT_LONG = struct_class('!Q') # big-endian unsigned long long
STRUCT_FLOAT = struct_class('!f') # big-endian float
STRUCT_DOUBLE = struct_class('!d') # big-endian double
+STRUCT_CRC32 = struct_class('>I') # big-endian unsigned int
#
# Exceptions
@@ -230,6 +232,11 @@ class BinaryDecoder(object):
"""
return unicode(self.read_bytes(), "utf-8")
+ def check_crc32(self, bytes):
+ checksum = STRUCT_CRC32.unpack(self.read(4))[0];
+ if crc32(bytes) & 0xffffffff != checksum:
+ raise schema.AvroException("Checksum failure")
+
def skip_null(self):
pass
@@ -349,6 +356,12 @@ class BinaryEncoder(object):
datum = datum.encode("utf-8")
self.write_bytes(datum)
+ def write_crc32(self, bytes):
+ """
+ A 4-byte, big-endian CRC32 checksum
+ """
+ self.write(STRUCT_CRC32.pack(crc32(bytes)));
+
#
# DatumReader/Writer
#
Modified: avro/branches/branch-1.5/lang/py/test/test_datafile.py
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/py/test/test_datafile.py?rev=1166408&r1=1166407&r2=1166408&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/py/test/test_datafile.py (original)
+++ avro/branches/branch-1.5/lang/py/test/test_datafile.py Wed Sep 7 21:23:22 2011
@@ -51,7 +51,7 @@ SCHEMAS_TO_VALIDATE = (
)
FILENAME = 'test_datafile.out'
-CODECS_TO_VALIDATE = ('null', 'deflate')
+CODECS_TO_VALIDATE = ('null', 'deflate', 'snappy')
# TODO(hammer): clean up written files with ant, not os.remove
class TestDataFile(unittest.TestCase):
@@ -63,6 +63,13 @@ class TestDataFile(unittest.TestCase):
correct = 0
for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
for codec in CODECS_TO_VALIDATE:
+ if (codec == 'snappy'):
+ try:
+ import snappy
+ except:
+ print 'Snappy not present. Skipping.'
+ correct += 1
+ continue
print ''
print 'SCHEMA NUMBER %d' % (i + 1)
print '================'
@@ -105,6 +112,13 @@ class TestDataFile(unittest.TestCase):
correct = 0
for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
for codec in CODECS_TO_VALIDATE:
+ if (codec == 'snappy'):
+ try:
+ import snappy
+ except:
+ print 'Snappy not present. Skipping.'
+ correct += 1
+ continue
print ''
print 'SCHEMA NUMBER %d' % (i + 1)
print '================'