You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/15 02:13:33 UTC

svn commit: r443532 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/io/SequenceFile.java src/test/org/apache/hadoop/io/RandomDatum.java src/test/org/apache/hadoop/io/TestSequenceFile.java

Author: cutting
Date: Thu Sep 14 17:13:32 2006
New Revision: 443532

URL: http://svn.apache.org/viewvc?view=rev&rev=443532
Log:
HADOOP-532.  Fix a bug reading value-compressed sequence files, where an exception was thrown reporting that the full value had not been read.  Contributed by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/RandomDatum.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=443532&r1=443531&r2=443532
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Sep 14 17:13:32 2006
@@ -17,6 +17,10 @@
 4. HADOOP-288.  Add a file caching system and use it in MapReduce to
    cache job jar files on slave nodes.  (Mahadev Konar via cutting)
 
+5. HADOOP-532.  Fix a bug reading value-compressed sequence files,
+   where an exception was thrown reporting that the full value had not
+   been read.  (omalley via cutting)
+
 
 Release 0.6.1 - 2006-08-13
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=443532&r1=443531&r2=443532
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Thu Sep 14 17:13:32 2006
@@ -976,10 +976,14 @@
 
       if (version > 2) {                          // if version > 2
         this.decompress = in.readBoolean();       // is compressed?
+      } else {
+        decompress = false;
       }
 
       if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
         this.blockCompressed = in.readBoolean();  // is block-compressed?
+      } else {
+        blockCompressed = false;
       }
       
       // if version >= 5
@@ -1008,9 +1012,9 @@
       valBuffer = new DataInputBuffer();
       if (decompress) {
         valInFilter = this.codec.createInputStream(valBuffer);
-        valIn = new DataInputStream(new BufferedInputStream(valInFilter));
+        valIn = new DataInputStream(valInFilter);
       } else {
-        valIn = new DataInputStream(new BufferedInputStream(valBuffer));
+        valIn = valBuffer;
       }
       
       if (blockCompressed) {
@@ -1113,10 +1117,11 @@
      * corresponding to the 'current' key 
      */
     private synchronized void seekToCurrentValue() throws IOException {
-      if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+      if (!blockCompressed) {
         if (decompress) {
           valInFilter.resetState();
         }
+        valBuffer.reset();
       } else {
         // Check if this is the first value in the 'block' to be read
         if (lazyDecompress && !valuesDecompressed) {
@@ -1160,13 +1165,15 @@
       // Position stream to 'current' value
       seekToCurrentValue();
 
-      if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+      if (!blockCompressed) {
         val.readFields(valIn);
         
-        if (valBuffer.getPosition() != valBuffer.getLength())
+        if (valIn.read() > 0) {
+          LOG.info("available bytes: " + valIn.available());
           throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
               + " bytes, should read " +
               (valBuffer.getLength()-keyLength));
+        }
       } else {
         // Get the value
         int valLength = WritableUtils.readVInt(valLenIn);
@@ -1190,7 +1197,7 @@
         throw new IOException("wrong key class: "+key.getClass().getName()
             +" is not "+keyClass);
 
-      if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+      if (!blockCompressed) {
         outBuf.reset();
         
         keyLength = next(outBuf);
@@ -1200,6 +1207,7 @@
         valBuffer.reset(outBuf.getData(), outBuf.getLength());
         
         key.readFields(valBuffer);
+        valBuffer.mark(0);
         if (valBuffer.getPosition() != keyLength)
           throw new IOException(key + " read " + valBuffer.getPosition()
               + " bytes, should read " + keyLength);
@@ -1271,7 +1279,7 @@
     /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
     public synchronized int next(DataOutputBuffer buffer) throws IOException {
       // Unsupported for block-compressed sequence files
-      if (version >= BLOCK_COMPRESS_VERSION && blockCompressed) {
+      if (blockCompressed) {
         throw new IOException("Unsupported call for block-compressed" +
             " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
       }
@@ -1308,7 +1316,7 @@
      */
     public int nextRaw(DataOutputBuffer key, ValueBytes val) 
     throws IOException {
-      if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+      if (!blockCompressed) {
         if (in.getPos() >= end) 
           return -1;
 

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/RandomDatum.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/RandomDatum.java?view=diff&rev=443532&r1=443531&r2=443532
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/RandomDatum.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/RandomDatum.java Thu Sep 14 17:13:32 2006
@@ -26,11 +26,15 @@
   public RandomDatum() {}
 
   public RandomDatum(Random random) {
-    length = 10 + random.nextInt(100);
+    length = 10 + (int) Math.pow(10.0, random.nextFloat() * 3.0);
     data = new byte[length];
     random.nextBytes(data);
   }
 
+  public int getLength() {
+    return length;
+  }
+  
   public void write(DataOutput out) throws IOException {
     out.writeInt(length);
     out.write(data);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java?view=diff&rev=443532&r1=443531&r2=443532
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java Thu Sep 14 17:13:32 2006
@@ -47,6 +47,7 @@
       new Path(System.getProperty("test.build.data",".")+"/test.bc.seq");
  
     int seed = new Random().nextInt();
+    LOG.info("Seed = " + seed);
 
     FileSystem fs = new LocalFileSystem(conf);
     try {
@@ -115,7 +116,8 @@
       CompressionType compressionType)
     throws IOException {
     fs.delete(file);
-    LOG.debug("creating with " + count + " records");
+    LOG.info("creating " + count + " records with " + compressionType +
+              " compression");
     SequenceFile.Writer writer = 
       SequenceFile.createWriter(fs, conf, file, 
           RandomDatum.class, RandomDatum.class, compressionType);
@@ -146,25 +148,36 @@
       RandomDatum key = generator.getKey();
       RandomDatum value = generator.getValue();
 
-      if ((i%5) == 10) {
-        // Testing 'raw' apis
-        rawKey.reset();
-        reader.nextRaw(rawKey, rawValue);
-      } else {
-        // Testing 'non-raw' apis 
-        if ((i%2) == 0) {
-          reader.next(k);
-          reader.getCurrentValue(v);
+      try {
+        if ((i%5) == 10) {
+          // Testing 'raw' apis
+          rawKey.reset();
+          reader.nextRaw(rawKey, rawValue);
         } else {
-          reader.next(k, v);
+          // Testing 'non-raw' apis 
+          if ((i%2) == 0) {
+            reader.next(k);
+            reader.getCurrentValue(v);
+          } else {
+            reader.next(k, v);
+          }
+          // Sanity check
+          if (!k.equals(key))
+            throw new RuntimeException("wrong key at " + i);
+          if (!v.equals(value))
+            throw new RuntimeException("wrong value at " + i);
         }
-
-        // Sanity check
-        if (!k.equals(key))
-          throw new RuntimeException("wrong key at " + i);
-        if (!v.equals(value))
-          throw new RuntimeException("wrong value at " + i);
+      } catch (IOException ioe) {
+        LOG.info("Problem on row " + i);
+        LOG.info("Expected value = " + value);
+        LOG.info("Expected len = " + value.getLength());
+        LOG.info("Actual value = " + v);
+        LOG.info("Actual len = " + v.getLength());
+        LOG.info("Key equals: " + k.equals(key));
+        LOG.info("value equals: " + v.equals(value));
+        throw ioe;
       }
+
     }
     reader.close();
   }
@@ -284,9 +297,11 @@
     boolean merge = false;
     String compressType = "NONE";
     Path file = null;
+    int seed = new Random().nextInt();
 
     String usage = "Usage: SequenceFile (-local | -dfs <namenode:port>) " +
-        "[-count N] " + "[-check] [-compressType <NONE|RECORD|BLOCK>] " +
+        "[-count N] " + 
+        "[-seed #] [-check] [-compressType <NONE|RECORD|BLOCK>] " +
         "[[-rwonly] | {[-megabytes M] [-factor F] [-nocreate] [-fast] [-merge]}] " +
         " file";
     if (args.length == 0) {
@@ -304,7 +319,9 @@
           } else if (args[i].equals("-megabytes")) {
               megabytes = Integer.parseInt(args[++i]);
           } else if (args[i].equals("-factor")) {
-              factor = Integer.parseInt(args[++i]);
+            factor = Integer.parseInt(args[++i]);
+          } else if (args[i].equals("-seed")) {
+            seed = Integer.parseInt(args[++i]);
           } else if (args[i].equals("-rwonly")) {
               rwonly = true;
           } else if (args[i].equals("-nocreate")) {
@@ -326,6 +343,7 @@
         LOG.info("megabytes = " + megabytes);
         LOG.info("factor = " + factor);
         LOG.info("create = " + create);
+        LOG.info("seed = " + seed);
         LOG.info("rwonly = " + rwonly);
         LOG.info("check = " + check);
         LOG.info("fast = " + fast);
@@ -338,7 +356,6 @@
           System.exit(-1);
         }
 
-        int seed = 0;
         CompressionType compressionType = 
           CompressionType.valueOf(compressType);