You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 20:53:40 UTC

svn commit: r885142 [6/6] - in /hadoop/common/branches/HADOOP-6194: ./ .eclipse.templates/ bin/ ivy/ lib/jdiff/ src/ src/contrib/ src/contrib/ec2/ src/docs/ src/docs/src/documentation/ src/docs/src/documentation/content/xdocs/ src/docs/src/documentatio...

Modified: hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/compress/TestCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/compress/TestCodec.java?rev=885142&r1=885141&r2=885142&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/compress/TestCodec.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/compress/TestCodec.java Sat Nov 28 19:53:33 2009
@@ -19,31 +19,42 @@
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Random;
 
-import junit.framework.TestCase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.RandomDatum;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
+import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
-public class TestCodec extends TestCase {
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestCodec {
 
   private static final Log LOG= 
     LogFactory.getLog(TestCodec.class);
@@ -51,22 +62,34 @@
   private Configuration conf = new Configuration();
   private int count = 10000;
   private int seed = new Random().nextInt();
-  
+
+  @Test
   public void testDefaultCodec() throws IOException {
     codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.DefaultCodec");
     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.DefaultCodec");
   }
-  
+
+  @Test
   public void testGzipCodec() throws IOException {
     codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec");
     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
   }
-  
+
+  @Test
   public void testBZip2Codec() throws IOException {
     codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.BZip2Codec");
     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");
   }
 
+  @Test
+  public void testGzipCodecWithParam() throws IOException {
+    Configuration conf = new Configuration(this.conf);
+    ZlibFactory.setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION);
+    ZlibFactory.setCompressionStrategy(conf, CompressionStrategy.HUFFMAN_ONLY);
+    codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec");
+    codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
+  }
+
   private static void codecTest(Configuration conf, int seed, int count, 
                                 String codecClass) 
     throws IOException {
@@ -133,6 +156,109 @@
     LOG.info("SUCCESS! Completed checking " + count + " records");
   }
 
+  @Test
+  public void testSplitableCodecs() throws Exception {
+    testSplitableCodec(BZip2Codec.class);
+  }
+
+  private void testSplitableCodec(
+      Class<? extends SplittableCompressionCodec> codecClass)
+      throws IOException {
+    final long DEFLBYTES = 2 * 1024 * 1024;
+    final Configuration conf = new Configuration();
+    final Random rand = new Random();
+    final long seed = rand.nextLong();
+    LOG.info("seed: " + seed);
+    rand.setSeed(seed);
+    SplittableCompressionCodec codec =
+      ReflectionUtils.newInstance(codecClass, conf);
+    final FileSystem fs = FileSystem.getLocal(conf);
+    final FileStatus infile =
+      fs.getFileStatus(writeSplitTestFile(fs, rand, codec, DEFLBYTES));
+    if (infile.getLen() > Integer.MAX_VALUE) {
+      fail("Unexpected compression: " + DEFLBYTES + " -> " + infile.getLen());
+    }
+    final int flen = (int) infile.getLen();
+    final Text line = new Text();
+    final Decompressor dcmp = CodecPool.getDecompressor(codec);
+    try {
+      for (int pos = 0; pos < infile.getLen(); pos += rand.nextInt(flen / 8)) {
+        // read from random positions, verifying that there exist two sequential
+        // lines as written in writeSplitTestFile
+        final SplitCompressionInputStream in =
+          codec.createInputStream(fs.open(infile.getPath()), dcmp,
+              pos, flen, SplittableCompressionCodec.READ_MODE.BYBLOCK);
+        if (in.getAdjustedStart() >= flen) {
+          break;
+        }
+        LOG.info("SAMPLE " + in.getAdjustedStart() + "," + in.getAdjustedEnd());
+        final LineReader lreader = new LineReader(in);
+        lreader.readLine(line); // ignore; likely partial
+        if (in.getPos() >= flen) {
+          break;
+        }
+        lreader.readLine(line);
+        final int seq1 = readLeadingInt(line);
+        lreader.readLine(line);
+        if (in.getPos() >= flen) {
+          break;
+        }
+        final int seq2 = readLeadingInt(line);
+        assertEquals("Mismatched lines", seq1 + 1, seq2);
+      }
+    } finally {
+      CodecPool.returnDecompressor(dcmp);
+    }
+    // remove on success
+    fs.delete(infile.getPath().getParent(), true);
+  }
+
+  private static int readLeadingInt(Text txt) throws IOException {
+    DataInputStream in =
+      new DataInputStream(new ByteArrayInputStream(txt.getBytes()));
+    return in.readInt();
+  }
+
+  /** Write infLen bytes (deflated) to file in test dir using codec.
+   * Records are of the form
+   * &lt;i&gt;&lt;b64 rand&gt;&lt;i+i&gt;&lt;b64 rand&gt;
+   */
+  private static Path writeSplitTestFile(FileSystem fs, Random rand,
+      CompressionCodec codec, long infLen) throws IOException {
+    final int REC_SIZE = 1024;
+    final Path wd = new Path(new Path(
+          System.getProperty("test.build.data", "/tmp")).makeQualified(fs),
+        codec.getClass().getSimpleName());
+    final Path file = new Path(wd, "test" + codec.getDefaultExtension());
+    final byte[] b = new byte[REC_SIZE];
+    final Base64 b64 = new Base64();
+    DataOutputStream fout = null;
+    Compressor cmp = CodecPool.getCompressor(codec);
+    try {
+      fout = new DataOutputStream(codec.createOutputStream(
+            fs.create(file, true), cmp));
+      final DataOutputBuffer dob = new DataOutputBuffer(REC_SIZE * 4 / 3 + 4);
+      int seq = 0;
+      while (infLen > 0) {
+        rand.nextBytes(b);
+        final byte[] b64enc = b64.encode(b); // ensures rand printable, no LF
+        dob.reset();
+        dob.writeInt(seq);
+        System.arraycopy(dob.getData(), 0, b64enc, 0, dob.getLength());
+        fout.write(b64enc);
+        fout.write('\n');
+        ++seq;
+        infLen -= b64enc.length;
+      }
+      LOG.info("Wrote " + seq + " records to " + file);
+    } finally {
+      IOUtils.cleanup(LOG, fout);
+      CodecPool.returnCompressor(cmp);
+    }
+    return file;
+  }
+
+  @Test
   public void testCodecPoolGzipReuse() throws Exception {
     Configuration conf = new Configuration();
     conf.setBoolean("hadoop.native.lib", true);
@@ -149,19 +275,69 @@
     assertTrue("Got mismatched ZlibCompressor", c2 != CodecPool.getCompressor(gzc));
   }
 
-  public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundException, 
+  private static void gzipReinitTest(Configuration conf, CompressionCodec codec)
+      throws IOException {
+    // Add codec to cache
+    ZlibFactory.setCompressionLevel(conf, CompressionLevel.BEST_COMPRESSION);
+    ZlibFactory.setCompressionStrategy(conf,
+        CompressionStrategy.DEFAULT_STRATEGY);
+    Compressor c1 = CodecPool.getCompressor(codec);
+    CodecPool.returnCompressor(c1);
+    // reset compressor's compression level to perform no compression
+    ZlibFactory.setCompressionLevel(conf, CompressionLevel.NO_COMPRESSION);
+    Compressor c2 = CodecPool.getCompressor(codec, conf);
+    // ensure same compressor placed earlier
+    assertTrue("Got mismatched ZlibCompressor", c1 == c2);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    CompressionOutputStream cos = null;
+    // write trivially compressable data
+    byte[] b = new byte[1 << 15];
+    Arrays.fill(b, (byte) 43);
+    try {
+      cos = codec.createOutputStream(bos, c2);
+      cos.write(b);
+    } finally {
+      if (cos != null) {
+        cos.close();
+      }
+      CodecPool.returnCompressor(c2);
+    }
+    byte[] outbytes = bos.toByteArray();
+    // verify data were not compressed
+    assertTrue("Compressed bytes contrary to configuration",
+               outbytes.length >= b.length);
+  }
+
+  @Test
+  public void testCodecPoolCompressorReinit() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean("hadoop.native.lib", true);
+    if (ZlibFactory.isNativeZlibLoaded(conf)) {
+      GzipCodec gzc = ReflectionUtils.newInstance(GzipCodec.class, conf);
+      gzipReinitTest(conf, gzc);
+    } else {
+      LOG.warn("testCodecPoolCompressorReinit skipped: native libs not loaded");
+    }
+    conf.setBoolean("hadoop.native.lib", false);
+    DefaultCodec dfc = ReflectionUtils.newInstance(DefaultCodec.class, conf);
+    gzipReinitTest(conf, dfc);
+  }
+
+  @Test
+  public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundException,
       InstantiationException, IllegalAccessException {
     sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.DefaultCodec", 100);
     sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.DefaultCodec", 1000000);
   }
-  
-  public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundException, 
+
+  @Test
+  public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundException,
       InstantiationException, IllegalAccessException {
     sequenceFileCodecTest(conf, 0, "org.apache.hadoop.io.compress.BZip2Codec", 100);
     sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.BZip2Codec", 100);
     sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.BZip2Codec", 1000000);
   }
-  
+
   private static void sequenceFileCodecTest(Configuration conf, int lines, 
                                 String codecClass, int blockSize) 
     throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
@@ -242,8 +418,4 @@
     
   }
 
-  public TestCodec(String name) {
-    super(name);
-  }
-
 }

Modified: hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java?rev=885142&r1=885141&r2=885142&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java Sat Nov 28 19:53:33 2009
@@ -319,7 +319,7 @@
 
     scanner.close();
     // test for a range of scanner
-    scanner = reader.createScanner(getSomeKey(10), getSomeKey(60));
+    scanner = reader.createScannerByKey(getSomeKey(10), getSomeKey(60));
     readAndCheckbytes(scanner, 10, 50);
     assertFalse(scanner.advance());
     scanner.close();

Modified: hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java?rev=885142&r1=885141&r2=885142&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java Sat Nov 28 19:53:33 2009
@@ -673,7 +673,7 @@
     Reader reader =
         new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
     Scanner scanner =
-        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
+        reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
             .getBytes(), null);
 
     try {
@@ -698,7 +698,7 @@
       throws IOException {
     Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
     Scanner scanner =
-        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
+        reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
             .getBytes(), null);
 
     try {
@@ -729,7 +729,7 @@
     Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
 
     Scanner scanner =
-        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
+        reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
             .getBytes(), null);
 
     byte[] vbuf1 = new byte[BUF_SIZE];
@@ -753,7 +753,7 @@
     Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
 
     Scanner scanner =
-        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
+        reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
             .getBytes(), null);
 
     // read the indexed key

Modified: hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileSplit.java?rev=885142&r1=885141&r2=885142&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileSplit.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileSplit.java Sat Nov 28 19:53:33 2009
@@ -17,6 +17,7 @@
 package org.apache.hadoop.io.file.tfile;
 
 import java.io.IOException;
+import java.util.Random;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
@@ -42,6 +43,7 @@
   private FileSystem fs;
   private Configuration conf;
   private Path path;
+  private Random random = new Random();
 
   private String comparator = "memcmp";
   private String outputFile = "TestTFileSplit";
@@ -74,7 +76,7 @@
     long rowCount = 0;
     BytesWritable key, value;
     for (int i = 0; i < numSplit; ++i, offset += splitSize) {
-      Scanner scanner = reader.createScanner(offset, splitSize);
+      Scanner scanner = reader.createScannerByByteRange(offset, splitSize);
       int count = 0;
       key = new BytesWritable();
       value = new BytesWritable();
@@ -90,18 +92,101 @@
     Assert.assertEquals(rowCount, reader.getEntryCount());
     reader.close();
   }
+
+  /* Similar to readFile(), tests the scanner created 
+   * by record numbers rather than the offsets.
+   */
+  void readRowSplits(int numSplits) throws IOException {
+
+    Reader reader =
+      new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    
+    long totalRecords = reader.getEntryCount();
+    for (int i=0; i<numSplits; i++) {
+      long startRec = i*totalRecords/numSplits;
+      long endRec = (i+1)*totalRecords/numSplits;
+      if (i == numSplits-1) {
+        endRec = totalRecords;
+      }
+      Scanner scanner = reader.createScannerByRecordNum(startRec, endRec);
+      int count = 0;
+      BytesWritable key = new BytesWritable();
+      BytesWritable value = new BytesWritable();
+      long x=startRec;
+      while (!scanner.atEnd()) {
+        assertEquals("Incorrect RecNum returned by scanner", scanner.getRecordNum(), x);
+        scanner.entry().get(key, value);
+        ++count;
+        assertEquals("Incorrect RecNum returned by scanner", scanner.getRecordNum(), x);
+        scanner.advance();
+        ++x;
+      }
+      scanner.close();
+      Assert.assertTrue(count == (endRec - startRec));
+    }
+    // make sure specifying range at the end gives zero records.
+    Scanner scanner = reader.createScannerByRecordNum(totalRecords, -1);
+    Assert.assertTrue(scanner.atEnd());
+  }
   
   static String composeSortedKey(String prefix, int total, int value) {
     return String.format("%s%010d", prefix, value);
   }
   
+  void checkRecNums() throws IOException {
+    long fileLen = fs.getFileStatus(path).getLen();
+    Reader reader = new Reader(fs.open(path), fileLen, conf);
+    long totalRecs = reader.getEntryCount();
+    long begin = random.nextLong() % (totalRecs / 2);
+    if (begin < 0)
+      begin += (totalRecs / 2);
+    long end = random.nextLong() % (totalRecs / 2);
+    if (end < 0)
+      end += (totalRecs / 2);
+    end += (totalRecs / 2) + 1;
+
+    assertEquals("RecNum for offset=0 should be 0", 0, reader
+        .getRecordNumNear(0));
+    for (long x : new long[] { fileLen, fileLen + 1, 2 * fileLen }) {
+      assertEquals("RecNum for offset>=fileLen should be total entries",
+          totalRecs, reader.getRecordNumNear(x));
+    }
+
+    for (long i = 0; i < 100; ++i) {
+      assertEquals("Locaton to RecNum conversion not symmetric", i, reader
+          .getRecordNumByLocation(reader.getLocationByRecordNum(i)));
+    }
+
+    for (long i = 1; i < 100; ++i) {
+      long x = totalRecs - i;
+      assertEquals("Locaton to RecNum conversion not symmetric", x, reader
+          .getRecordNumByLocation(reader.getLocationByRecordNum(x)));
+    }
+
+    for (long i = begin; i < end; ++i) {
+      assertEquals("Locaton to RecNum conversion not symmetric", i, reader
+          .getRecordNumByLocation(reader.getLocationByRecordNum(i)));
+    }
+
+    for (int i = 0; i < 1000; ++i) {
+      long x = random.nextLong() % totalRecs;
+      if (x < 0) x += totalRecs;
+      assertEquals("Locaton to RecNum conversion not symmetric", x, reader
+          .getRecordNumByLocation(reader.getLocationByRecordNum(x)));
+    }
+  }
+  
   public void testSplit() throws IOException {
     System.out.println("testSplit");
     createFile(100000, Compression.Algorithm.NONE.getName());
+    checkRecNums();   
     readFile();
+    readRowSplits(10);
     fs.delete(path, true);
     createFile(500000, Compression.Algorithm.GZ.getName());
+    checkRecNums();
     readFile();
+    readRowSplits(83);
     fs.delete(path, true);
   }
 }

Modified: hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java?rev=885142&r1=885141&r2=885142&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java Sat Nov 28 19:53:33 2009
@@ -89,7 +89,7 @@
 
     try {
       Scanner scanner =
-          reader.createScanner("aaa".getBytes(), "zzz".getBytes());
+          reader.createScannerByKey("aaa".getBytes(), "zzz".getBytes());
       Assert
           .fail("Failed to catch creating scanner with keys on unsorted file.");
     }

Modified: hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java?rev=885142&r1=885141&r2=885142&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/serializer/SerializationTestUtil.java Sat Nov 28 19:53:33 2009
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.io.serializer;
 
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -33,19 +35,33 @@
    * @return deserialized item
    */
   public static<K> K testSerialization(Configuration conf, K before) 
-    throws Exception {
-    
+      throws Exception {
+    Map<String, String> metadata =
+      SerializationBase.getMetadataFromClass(GenericsUtil.getClass(before));
+    return testSerialization(conf, metadata, before);
+  }
+  
+  /**
+   * A utility that tests serialization/deserialization. 
+   * @param conf configuration to use, "io.serializations" is read to 
+   * determine the serialization
+   * @param metadata the metadata to pass to the serializer/deserializer
+   * @param <K> the class of the item
+   * @param before item to (de)serialize
+   * @return deserialized item
+   */
+  public static <K> K testSerialization(Configuration conf, 
+      Map<String, String> metadata, K before) throws Exception {
+
     SerializationFactory factory = new SerializationFactory(conf);
-    Serializer<K> serializer 
-      = factory.getSerializer(GenericsUtil.getClass(before));
-    Deserializer<K> deserializer 
-      = factory.getDeserializer(GenericsUtil.getClass(before));
-   
+    SerializerBase<K> serializer = factory.getSerializer(metadata);
+    DeserializerBase<K> deserializer = factory.getDeserializer(metadata);
+
     DataOutputBuffer out = new DataOutputBuffer();
     serializer.open(out);
     serializer.serialize(before);
     serializer.close();
-    
+
     DataInputBuffer in = new DataInputBuffer();
     in.reset(out.getData(), out.getLength());
     deserializer.open(in);

Modified: hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java?rev=885142&r1=885141&r2=885142&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/io/serializer/avro/TestAvroSerialization.java Sat Nov 28 19:53:33 2009
@@ -18,9 +18,14 @@
 
 package org.apache.hadoop.io.serializer.avro;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import junit.framework.TestCase;
 
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serializer.SerializationBase;
 import org.apache.hadoop.io.serializer.SerializationTestUtil;
 
 public class TestAvroSerialization extends TestCase {
@@ -59,6 +64,16 @@
       SerializationTestUtil.testSerialization(conf, before);
     assertEquals(before, after);
   }
+  
+  public void testGeneric() throws Exception {
+    Utf8 before = new Utf8("hadoop");
+    Map<String, String> metadata = new HashMap<String, String>();
+    metadata.put(SerializationBase.SERIALIZATION_KEY,
+      AvroGenericSerialization.class.getName());
+    metadata.put(AvroSerialization.AVRO_SCHEMA_KEY, "\"string\"");
+    Utf8 after = SerializationTestUtil.testSerialization(conf, metadata, before);
+    assertEquals(before, after);
+  }
 
   public static class InnerRecord {
     public int x = 7;

Modified: hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/util/TestGenericsUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/util/TestGenericsUtil.java?rev=885142&r1=885141&r2=885142&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/util/TestGenericsUtil.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/util/TestGenericsUtil.java Sat Nov 28 19:53:33 2009
@@ -103,6 +103,12 @@
      GenericOptionsParser parser = new GenericOptionsParser(
         new Configuration(), new String[] {"-jt"});
     assertEquals(parser.getRemainingArgs().length, 0);
+    
+    //  test if -D accepts -Dx=y=z
+    parser = 
+      new GenericOptionsParser(new Configuration(), 
+                               new String[] {"-Dx=y=z"});
+    assertEquals(parser.getConfiguration().get("x"), "y=z");
   }
   
   public void testGetClass() {

Modified: hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/util/TestPureJavaCrc32.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/util/TestPureJavaCrc32.java?rev=885142&r1=885141&r2=885142&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/util/TestPureJavaCrc32.java (original)
+++ hadoop/common/branches/HADOOP-6194/src/test/core/org/apache/hadoop/util/TestPureJavaCrc32.java Sat Nov 28 19:53:33 2009
@@ -17,26 +17,27 @@
  */
 package org.apache.hadoop.util;
 
-import junit.framework.TestCase;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Random;
+import org.junit.Assert;
+import org.junit.Test;
 
 /**
  * Unit test to verify that the pure-Java CRC32 algorithm gives
  * the same results as the built-in implementation.
  */
-public class TestPureJavaCrc32 extends TestCase {
-  private CRC32 theirs;
-  private PureJavaCrc32 ours;
-
-  public void setUp() {
-    theirs = new CRC32();
-    ours = new PureJavaCrc32();
-  }
+public class TestPureJavaCrc32 {
+  private final CRC32 theirs = new CRC32();
+  private final PureJavaCrc32 ours = new PureJavaCrc32();
 
+  @Test
   public void testCorrectness() throws Exception {
     checkSame();
 
@@ -92,10 +93,93 @@
   }
 
   private void checkSame() {
-    assertEquals(theirs.getValue(), ours.getValue());
+    Assert.assertEquals(theirs.getValue(), ours.getValue());
   }
 
   /**
+   * Generate a table to perform checksums based on the same CRC-32 polynomial
+   * that java.util.zip.CRC32 uses.
+   */
+  public static class Table {
+    private static final int polynomial = 0xEDB88320;
+
+    private final int[][] tables;
+
+    private Table(final int nBits, final int nTables) {
+      tables = new int[nTables][];
+      final int size = 1 << nBits;
+      for(int i = 0; i < tables.length; i++) {
+        tables[i] = new int[size];
+      }
+
+      //compute the first table
+      final int[] first = tables[0];
+      for (int i = 0; i < first.length; i++) {
+        int crc = i;
+        for (int j = 0; j < nBits; j++) {
+          if ((crc & 1) == 1) {
+            crc >>>= 1;
+            crc ^= polynomial;
+          } else {
+            crc >>>= 1;
+          }
+        }
+        first[i] = crc;
+      }
+
+      //compute the remaining tables
+      final int mask = first.length - 1;
+      for(int j = 1; j < tables.length; j++) {
+        final int[] previous = tables[j-1];
+        final int[] current = tables[j];
+        for (int i = 0; i < current.length; i++) {
+          current[i] = (previous[i] >>> nBits) ^ first[previous[i] & mask];
+        }
+      }
+    }
+
+    String[] toStrings(String nameformat) {
+      final String[] s = new String[tables.length];
+      for (int j = 0; j < tables.length; j++) {
+        final int[] t = tables[j];
+        final StringBuilder b = new StringBuilder();
+        b.append(String.format("  static final int[] " + nameformat
+            + " = new int[] {", j));
+        for (int i = 0; i < t.length;) {
+          b.append("\n    ");
+          for(int k = 0; k < 4; k++) {
+            b.append(String.format("0x%08X, ", t[i++]));
+          }
+        }
+        b.setCharAt(b.length() - 2, '\n');
+        s[j] = b.toString() + " };\n";
+      }
+      return s;
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+      final StringBuilder b = new StringBuilder();
+      for(String s : toStrings(String.format("T%d_",
+          Integer.numberOfTrailingZeros(tables[0].length)) + "%d")) {
+        b.append(s);
+      }
+      return b.toString();
+    }
+
+    /** Generate CRC-32 lookup tables */
+    public static void main(String[] args) throws FileNotFoundException {
+      int i = 8;
+      final PrintStream out = new PrintStream(
+          new FileOutputStream("table" + i + ".txt"), true);
+      final Table t = new Table(i, 16);
+      final String s = t.toString();
+      System.out.println(s);
+      out.println(s);
+    }
+  }
+  
+  /**
    * Performance tests to compare performance of the Pure Java implementation
    * to the built-in java.util.zip implementation. This can be run from the
    * command line with:
@@ -109,62 +193,108 @@
     public static final int MAX_LEN = 32*1024*1024; // up to 32MB chunks
     public static final int BYTES_PER_SIZE = MAX_LEN * 4;
 
-    public static LinkedHashMap<String, Checksum> getImplsToTest() {
-      LinkedHashMap<String, Checksum> impls =
-        new LinkedHashMap<String, Checksum>();
-      impls.put("BuiltIn", new CRC32());
-      impls.put("PureJava", new PureJavaCrc32());
-      return impls;
-    }
+    static final Checksum zip = new CRC32(); 
+    static final Checksum[] CRCS = {new PureJavaCrc32()};
 
     public static void main(String args[]) {
-      LinkedHashMap<String, Checksum> impls = getImplsToTest();
+      printSystemProperties(System.out);
+      doBench(CRCS, System.out);
+    }
+
+    private static void printCell(String s, int width, PrintStream out) {
+      final int w = s.length() > width? s.length(): width;
+      out.printf(" %" + w + "s |", s);
+    }
 
-      Random rand = new Random();
-      byte[] bytes = new byte[MAX_LEN];
-      rand.nextBytes(bytes);
+    private static void doBench(final Checksum[] crcs, final PrintStream out) {
+      final ArrayList<Checksum> a = new ArrayList<Checksum>();
+      a.add(zip);
+      for (Checksum c : crcs)
+        if(c.getClass() != zip.getClass())
+          a.add(c);
+      doBench(a, out);
+    }
 
+    private static void doBench(final List<Checksum> crcs, final PrintStream out
+        ) {
+      final byte[] bytes = new byte[MAX_LEN];
+      new Random().nextBytes(bytes);
 
       // Print header
-      System.out.printf("||num bytes||");
-      for (String entry : impls.keySet()) {
-        System.out.printf(entry + " MB/sec||");
+      out.printf("\nPerformance Table (The unit is MB/sec)\n||");
+      final String title = "Num Bytes";
+      printCell("Num Bytes", 0, out);
+      for (Checksum c : crcs) {
+        out.printf("|");
+        printCell(c.getClass().getSimpleName(), 8, out);
       }
-      System.out.printf("\n");
+      out.printf("|\n");
 
       // Warm up implementations to get jit going.
-      for (Map.Entry<String, Checksum> entry : impls.entrySet()) {
-        doBench("warmUp" + entry.getKey(),
-                entry.getValue(), bytes, 2, false);
-        doBench("warmUp" + entry.getKey(),
-                entry.getValue(), bytes, 2101, false);
+      for (Checksum c : crcs) {
+        doBench(c, bytes, 2, null);
+        doBench(c, bytes, 2101, null);
       }
 
       // Test on a variety of sizes
       for (int size = 1; size < MAX_LEN; size *= 2) {
-        System.out.printf("| %d\t|", size);
+        out.printf("|");
+        printCell(String.valueOf(size), title.length()+1, out);
 
-        for (Map.Entry<String, Checksum> entry : impls.entrySet()) {
+        Long expected = null;
+        for(Checksum c : crcs) {
           System.gc();
-          doBench(entry.getKey(), entry.getValue(), bytes, size, true);
+          final long result = doBench(c, bytes, size, out);
+          if(c.getClass() == zip.getClass()) {
+            expected = result;
+          } else if (result != expected) {
+            throw new RuntimeException(c.getClass() + " has bugs!");
+          }
+            
         }
-        System.out.printf("\n");
+        out.printf("\n");
       }
     }
 
-    private static void doBench(String id, Checksum crc,
-                                byte[] bytes, int size, boolean printout) {
-      long st = System.nanoTime();
-      int trials = BYTES_PER_SIZE / size;
+    private static long doBench(Checksum crc, byte[] bytes, int size,
+        PrintStream out) {
+      final String name = crc.getClass().getSimpleName();
+      final int trials = BYTES_PER_SIZE / size;
+
+      final long st = System.nanoTime();
+      crc.reset();
       for (int i = 0; i < trials; i++) {
         crc.update(bytes, 0, size);
       }
-      long et = System.nanoTime();
+      final long result = crc.getValue();
+      final long et = System.nanoTime();
 
       double mbProcessed = trials * size / 1024.0 / 1024.0;
       double secsElapsed = (et - st) / 1000000000.0d;
-      if (printout) {
-        System.out.printf("%.3f \t|",  mbProcessed / secsElapsed);
+      if (out != null) {
+        final String s = String.format("%9.3f",  mbProcessed/secsElapsed);
+        printCell(s, name.length()+1, out);
+      }
+      return result;
+    }
+    
+    private static void printSystemProperties(PrintStream out) {
+      final String[] names = {
+          "java.version",
+          "java.runtime.name",
+          "java.runtime.version",
+          "java.vm.version",
+          "java.vm.vendor",
+          "java.vm.name",
+          "java.vm.specification.version",
+          "java.specification.version",
+          "os.arch",
+          "os.name",
+          "os.version"
+      };
+      final Properties p = System.getProperties();
+      for(String n : names) {
+        out.println(n + " = " + p.getProperty(n));
       }
     }
   }