You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2011/09/07 23:23:23 UTC

svn commit: r1166408 - in /avro/branches/branch-1.5: ./ CHANGES.txt lang/py/setup.py lang/py/src/avro/datafile.py lang/py/src/avro/io.py lang/py/test/test_datafile.py share/test/data/weather-snappy.avro

Author: cutting
Date: Wed Sep  7 21:23:22 2011
New Revision: 1166408

URL: http://svn.apache.org/viewvc?rev=1166408&view=rev
Log:
Merge -c 1166407 from trunk to 1.5 branch.  Fixes: AVRO-866.

Added:
    avro/branches/branch-1.5/share/test/data/weather-snappy.avro
      - copied unchanged from r1166407, avro/trunk/share/test/data/weather-snappy.avro
Modified:
    avro/branches/branch-1.5/   (props changed)
    avro/branches/branch-1.5/CHANGES.txt
    avro/branches/branch-1.5/lang/py/setup.py
    avro/branches/branch-1.5/lang/py/src/avro/datafile.py
    avro/branches/branch-1.5/lang/py/src/avro/io.py
    avro/branches/branch-1.5/lang/py/test/test_datafile.py

Propchange: avro/branches/branch-1.5/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  7 21:23:22 2011
@@ -1 +1 @@
-/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1079680,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550,1094812,1095206-1095208,1095493,1095529,1095548,1095550,1096798,1097916,1097927,1097968,1097974,1099257,1102332,1102335,1124127,1124971,1129053,1129071,1129697-1129706,1129729,1129856,1130503,1136342,1141619,1141677,1141685,1141979-1141980,1142057,1142063,1151660,1151983,1157245,1161743,1161755,1161764,1161769-1161770,1161779,1166326
+/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1079680,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550,1094812,1095206-1095208,1095493,1095529,1095548,1095550,1096798,1097916,1097927,1097968,1097974,1099257,1102332,1102335,1124127,1124971,1129053,1129071,1129697-1129706,1129729,1129856,1130503,1136342,1141619,1141677,1141685,1141979-1141980,1142057,1142063,1151660,1151983,1157245,1161743,1161755,1161764,1161769-1161770,1161779,1166326,1166407

Modified: avro/branches/branch-1.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/CHANGES.txt?rev=1166408&r1=1166407&r2=1166408&view=diff
==============================================================================
--- avro/branches/branch-1.5/CHANGES.txt (original)
+++ avro/branches/branch-1.5/CHANGES.txt Wed Sep  7 21:23:22 2011
@@ -2,6 +2,11 @@ Avro Change Log
 
 Avro 1.5.4 (unreleased)
 
+  IMPROVEMENTS
+
+    AVRO-866. Python: Add support for snappy compression codec.
+    (Tom White via cutting)
+
   BUG FIXES
 
     AVRO-884.  Java: Fix a regression in RPC so that one-way messages

Modified: avro/branches/branch-1.5/lang/py/setup.py
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/py/setup.py?rev=1166408&r1=1166407&r2=1166408&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/py/setup.py (original)
+++ avro/branches/branch-1.5/lang/py/setup.py Wed Sep  7 21:23:22 2011
@@ -22,9 +22,9 @@ except ImportError:
 
 from sys import version_info
 if version_info[:2] > (2, 5):
-    install_requires = []
+    install_requires = ['python-snappy']
 else:
-    install_requires = ['simplejson >= 2.0.9']
+    install_requires = ['python-snappy', 'simplejson >= 2.0.9']
 
 setup(
   name = 'avro',

Modified: avro/branches/branch-1.5/lang/py/src/avro/datafile.py
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/py/src/avro/datafile.py?rev=1166408&r1=1166407&r2=1166408&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/py/src/avro/datafile.py (original)
+++ avro/branches/branch-1.5/lang/py/src/avro/datafile.py Wed Sep  7 21:23:22 2011
@@ -23,7 +23,10 @@ except ImportError:
   from StringIO import StringIO
 from avro import schema
 from avro import io
-
+try:
+  import snappy
+except:
+  pass # fail later if snappy is used
 #
 # Constants
 #
@@ -40,7 +43,7 @@ META_SCHEMA = schema.parse("""\
    {"name": "meta", "type": {"type": "map", "values": "bytes"}},
    {"name": "sync", "type": {"type": "fixed", "name": "sync", "size": %d}}]}
 """ % (MAGIC_SIZE, SYNC_SIZE))
-VALID_CODECS = ['null', 'deflate']
+VALID_CODECS = ['null', 'deflate', 'snappy']
 VALID_ENCODINGS = ['binary'] # not used yet
 
 CODEC_KEY = "avro.codec"
@@ -142,19 +145,28 @@ class DataFileWriter(object):
       uncompressed_data = self.buffer_writer.getvalue()
       if self.get_meta(CODEC_KEY) == 'null':
         compressed_data = uncompressed_data
+        compressed_data_length = len(compressed_data)
       elif self.get_meta(CODEC_KEY) == 'deflate':
         # The first two characters and last character are zlib
         # wrappers around deflate data.
         compressed_data = zlib.compress(uncompressed_data)[2:-1]
+        compressed_data_length = len(compressed_data)
+      elif self.get_meta(CODEC_KEY) == 'snappy':
+        compressed_data = snappy.compress(uncompressed_data)
+        compressed_data_length = len(compressed_data) + 4 # crc32
       else:
         fail_msg = '"%s" codec is not supported.' % self.get_meta(CODEC_KEY)
         raise DataFileException(fail_msg)
 
       # Write length of block
-      self.encoder.write_long(len(compressed_data))
+      self.encoder.write_long(compressed_data_length)
 
       # Write block
       self.writer.write(compressed_data)
+      
+      # Write CRC32 checksum for Snappy
+      if self.get_meta(CODEC_KEY) == 'snappy':
+        self.encoder.write_crc32(uncompressed_data)
 
       # write sync marker
       self.writer.write(self.sync_marker)
@@ -280,7 +292,7 @@ class DataFileReader(object):
       # Skip a long; we don't need to use the length.
       self.raw_decoder.skip_long()
       self._datum_decoder = self._raw_decoder
-    else:
+    elif self.codec == 'deflate':
       # Compressed data is stored as (length, data), which
       # corresponds to how the "bytes" type is encoded.
       data = self.raw_decoder.read_bytes()
@@ -288,6 +300,15 @@ class DataFileReader(object):
       # "raw" (no zlib headers) decompression.  See zlib.h.
       uncompressed = zlib.decompress(data, -15)
       self._datum_decoder = io.BinaryDecoder(StringIO(uncompressed))
+    elif self.codec == 'snappy':
+      # Compressed data includes a 4-byte CRC32 checksum
+      length = self.raw_decoder.read_long()
+      data = self.raw_decoder.read(length - 4)
+      uncompressed = snappy.decompress(data)
+      self._datum_decoder = io.BinaryDecoder(StringIO(uncompressed))
+      self.raw_decoder.check_crc32(uncompressed);
+    else:
+      raise DataFileException("Unknown codec: %r" % self.codec)
 
   def _skip_sync(self):
     """

Modified: avro/branches/branch-1.5/lang/py/src/avro/io.py
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/py/src/avro/io.py?rev=1166408&r1=1166407&r2=1166408&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/py/src/avro/io.py (original)
+++ avro/branches/branch-1.5/lang/py/src/avro/io.py Wed Sep  7 21:23:22 2011
@@ -39,6 +39,7 @@ uses the following mapping:
 import struct
 from avro import schema
 import sys
+from binascii import crc32
 
 try:
 	import json
@@ -71,6 +72,7 @@ STRUCT_INT = struct_class('!I')     # bi
 STRUCT_LONG = struct_class('!Q')    # big-endian unsigned long long
 STRUCT_FLOAT = struct_class('!f')   # big-endian float
 STRUCT_DOUBLE = struct_class('!d')  # big-endian double
+STRUCT_CRC32 = struct_class('>I')   # big-endian unsigned int
 
 #
 # Exceptions
@@ -230,6 +232,11 @@ class BinaryDecoder(object):
     """
     return unicode(self.read_bytes(), "utf-8")
 
+  def check_crc32(self, bytes):
+    checksum = STRUCT_CRC32.unpack(self.read(4))[0];
+    if crc32(bytes) & 0xffffffff != checksum:
+      raise schema.AvroException("Checksum failure")
+
   def skip_null(self):
     pass
 
@@ -349,6 +356,12 @@ class BinaryEncoder(object):
     datum = datum.encode("utf-8")
     self.write_bytes(datum)
 
+  def write_crc32(self, bytes):
+    """
+    A 4-byte, big-endian CRC32 checksum
+    """
+    self.write(STRUCT_CRC32.pack(crc32(bytes)));
+
 #
 # DatumReader/Writer
 #

Modified: avro/branches/branch-1.5/lang/py/test/test_datafile.py
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/py/test/test_datafile.py?rev=1166408&r1=1166407&r2=1166408&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/py/test/test_datafile.py (original)
+++ avro/branches/branch-1.5/lang/py/test/test_datafile.py Wed Sep  7 21:23:22 2011
@@ -51,7 +51,7 @@ SCHEMAS_TO_VALIDATE = (
 )
 
 FILENAME = 'test_datafile.out'
-CODECS_TO_VALIDATE = ('null', 'deflate')
+CODECS_TO_VALIDATE = ('null', 'deflate', 'snappy')
 
 # TODO(hammer): clean up written files with ant, not os.remove
 class TestDataFile(unittest.TestCase):
@@ -63,6 +63,13 @@ class TestDataFile(unittest.TestCase):
     correct = 0
     for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
       for codec in CODECS_TO_VALIDATE:
+        if (codec == 'snappy'):
+          try:
+            import snappy
+          except:
+            print 'Snappy not present. Skipping.'
+            correct += 1
+            continue
         print ''
         print 'SCHEMA NUMBER %d' % (i + 1)
         print '================'
@@ -105,6 +112,13 @@ class TestDataFile(unittest.TestCase):
     correct = 0
     for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
       for codec in CODECS_TO_VALIDATE:
+        if (codec == 'snappy'):
+          try:
+            import snappy
+          except:
+            print 'Snappy not present. Skipping.'
+            correct += 1
+            continue
         print ''
         print 'SCHEMA NUMBER %d' % (i + 1)
         print '================'