You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/10/17 21:54:55 UTC

svn commit: r585651 - in /lucene/hadoop/branches/branch-0.15: ./ src/java/org/apache/hadoop/io/ src/test/org/apache/hadoop/io/

Author: omalley
Date: Wed Oct 17 12:54:55 2007
New Revision: 585651

URL: http://svn.apache.org/viewvc?rev=585651&view=rev
Log:
Merge -r 585647:585648 from trunk to branch 15 to fix HADOOP-2033.

Modified:
    lucene/hadoop/branches/branch-0.15/CHANGES.txt
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/io/MapFile.java
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/branches/branch-0.15/src/test/org/apache/hadoop/io/TestArrayFile.java
    lucene/hadoop/branches/branch-0.15/src/test/org/apache/hadoop/io/TestSequenceFile.java
    lucene/hadoop/branches/branch-0.15/src/test/org/apache/hadoop/io/TestSetFile.java

Modified: lucene/hadoop/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/CHANGES.txt?rev=585651&r1=585650&r2=585651&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.15/CHANGES.txt Wed Oct 17 12:54:55 2007
@@ -311,6 +311,10 @@
     null was missing and hence NPE would be thrown sometimes. This issue fixes
     that problem.  (Amareshwari Sri Ramadasu via ddas) 
 
+    HADOOP-2033.  The SequenceFile.Writer.sync method was a no-op, which caused
+    very uneven splits for applications like distcp that count on them.
+    (omalley)
+
   IMPROVEMENTS
 
     HADOOP-1908. Restructure data node code so that block sending and 

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/io/MapFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/io/MapFile.java?rev=585651&r1=585650&r2=585651&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/io/MapFile.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/io/MapFile.java Wed Oct 17 12:54:55 2007
@@ -19,6 +19,9 @@
 package org.apache.hadoop.io;
 
 import java.io.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.Progressable;
@@ -44,6 +47,8 @@
  * SequenceFile.Sorter}.
  */
 public class MapFile {
+  private static final Log LOG = LogFactory.getLog(MapFile.class);
+
   /** The name of the index file. */
   public static final String INDEX_FILE_NAME = "index";
 
@@ -302,7 +307,7 @@
           count++;
         }
       } catch (EOFException e) {
-        SequenceFile.LOG.warn("Unexpected EOF reading " + index +
+        LOG.warn("Unexpected EOF reading " + index +
                               " at entry #" + count + ".  Ignoring.");
       } finally {
 	indexClosed = true;

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/io/SequenceFile.java?rev=585651&r1=585650&r2=585651&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/io/SequenceFile.java Wed Oct 17 12:54:55 2007
@@ -20,7 +20,6 @@
 
 import java.io.*;
 import java.util.*;
-import java.net.InetAddress;
 import java.rmi.server.UID;
 import java.security.MessageDigest;
 import org.apache.commons.logging.*;
@@ -43,8 +42,7 @@
 
 /** Support for flat files of binary key/value pairs. */
 public class SequenceFile {
-  public static final Log LOG =
-    LogFactory.getLog("org.apache.hadoop.io.SequenceFile");
+  private static final Log LOG = LogFactory.getLog(SequenceFile.class);
 
   private SequenceFile() {}                         // no public ctor
 
@@ -757,6 +755,11 @@
     
     /** create a sync point */
     public void sync() throws IOException {
+      if (sync != null && lastSyncPos != out.getPos()) {
+        out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
+        out.write(sync);                          // write sync
+        lastSyncPos = out.getPos();               // update lastSyncPos
+      }
     }
 
     /** Returns the configuration of this file. */
@@ -780,10 +783,7 @@
     synchronized void checkAndWriteSync() throws IOException {
       if (sync != null &&
           out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
-        lastSyncPos = out.getPos();               // update lastSyncPos
-        //LOG.info("sync@"+lastSyncPos);
-        out.writeInt(SYNC_ESCAPE);                // escape it
-        out.write(sync);                          // write sync
+        sync();
       }
     }
 
@@ -955,10 +955,6 @@
       val.writeCompressedBytes(out);              // 'value' data
     }
     
-
-    public void sync() throws IOException {
-    }
-   
   } // RecordCompressionWriter
 
   /** Write compressed key/value blocks to a sequence-format file. */
@@ -1045,13 +1041,9 @@
     }
     
     /** Compress and flush contents to dfs */
-    private synchronized void writeBlock() throws IOException {
+    public synchronized void sync() throws IOException {
       if (noBufferedRecords > 0) {
-        // Write 'sync' marker
-        if (sync != null) {
-          out.writeInt(SYNC_ESCAPE);
-          out.write(sync);
-        }
+        super.sync();
         
         // No. of records
         WritableUtils.writeVInt(out, noBufferedRecords);
@@ -1080,15 +1072,11 @@
     /** Close the file. */
     public synchronized void close() throws IOException {
       if (out != null) {
-        writeBlock();
+        sync();
       }
       super.close();
     }
 
-    public void sync() throws IOException {
-      writeBlock();
-    }
-
     /** Append a key/value pair. */
     public synchronized void append(Writable key, Writable val)
       throws IOException {
@@ -1116,7 +1104,7 @@
       // Compress and flush?
       int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
       if (currentBlockSize >= compressionBlockSize) {
-        writeBlock();
+        sync();
       }
     }
     
@@ -1144,7 +1132,7 @@
       // Compress and flush?
       int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
       if (currentBlockSize >= compressionBlockSize) {
-        writeBlock();
+        sync();
       }
     }
   
@@ -1586,15 +1574,26 @@
       return more;
     }
     
-    private synchronized int checkAndReadSync(int length) 
-      throws IOException {
+    /**
+     * Read and return the next record length, potentially skipping over 
+     * a sync block.
+     * @return the length of the next record or -1 if there is no next record
+     * @throws IOException
+     */
+    private synchronized int readRecordLength() throws IOException {
+      if (in.getPos() >= end) {
+        return -1;
+      }      
+      int length = in.readInt();
       if (version > 1 && sync != null &&
           length == SYNC_ESCAPE) {              // process a sync entry
-        //LOG.info("sync@"+in.getPos());
         in.readFully(syncCheck);                // read syncCheck
         if (!Arrays.equals(sync, syncCheck))    // check it
           throw new IOException("File is corrupt!");
         syncSeen = true;
+        if (in.getPos() >= end) {
+          return -1;
+        }
         length = in.readInt();                  // re-read length
       } else {
         syncSeen = false;
@@ -1614,11 +1613,11 @@
         throw new IOException("Unsupported call for block-compressed" +
                               " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
       }
-      if (in.getPos() >= end)
-        return -1;
-
       try {
-        int length = checkAndReadSync(in.readInt());
+        int length = readRecordLength();
+        if (length == -1) {
+          return -1;
+        }
         int keyLength = in.readInt();
         buffer.write(in, length);
         return keyLength;
@@ -1642,16 +1641,16 @@
      * Read 'raw' records.
      * @param key - The buffer into which the key is read
      * @param val - The 'raw' value
-     * @return Returns the total record length
+     * @return Returns the total record length or -1 for end of file
      * @throws IOException
      */
     public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
       throws IOException {
       if (!blockCompressed) {
-        if (in.getPos() >= end) 
+        int length = readRecordLength();
+        if (length == -1) {
           return -1;
-
-        int length = checkAndReadSync(in.readInt());
+        }
         int keyLength = in.readInt();
         int valLength = length - keyLength;
         key.write(in, keyLength);
@@ -1701,16 +1700,16 @@
     /**
      * Read 'raw' keys.
      * @param key - The buffer into which the key is read
-     * @return Returns the key length
+     * @return Returns the key length or -1 for end of file
      * @throws IOException
      */
     public int nextRawKey(DataOutputBuffer key) 
       throws IOException {
       if (!blockCompressed) {
-        if (in.getPos() >= end) 
+        recordLength = readRecordLength();
+        if (recordLength == -1) {
           return -1;
-
-        recordLength = checkAndReadSync(in.readInt());
+        }
         keyLength = in.readInt();
         key.write(in, keyLength);
         return keyLength;

Modified: lucene/hadoop/branches/branch-0.15/src/test/org/apache/hadoop/io/TestArrayFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/test/org/apache/hadoop/io/TestArrayFile.java?rev=585651&r1=585650&r2=585651&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/test/org/apache/hadoop/io/TestArrayFile.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/test/org/apache/hadoop/io/TestArrayFile.java Wed Oct 17 12:54:55 2007
@@ -28,7 +28,7 @@
 
 /** Support for flat files of binary key/value pairs. */
 public class TestArrayFile extends TestCase {
-  private static Log LOG = SequenceFile.LOG;
+  private static final Log LOG = LogFactory.getLog(TestArrayFile.class);
   private static String FILE =
     System.getProperty("test.build.data",".") + "/test.array";
 

Modified: lucene/hadoop/branches/branch-0.15/src/test/org/apache/hadoop/io/TestSequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/test/org/apache/hadoop/io/TestSequenceFile.java?rev=585651&r1=585650&r2=585651&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/test/org/apache/hadoop/io/TestSequenceFile.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/test/org/apache/hadoop/io/TestSequenceFile.java Wed Oct 17 12:54:55 2007
@@ -35,7 +35,7 @@
 
 /** Support for flat files of binary key/value pairs. */
 public class TestSequenceFile extends TestCase {
-  private static Log LOG = SequenceFile.LOG;
+  private static final Log LOG = LogFactory.getLog(TestSequenceFile.class);
 
   private static Configuration conf = new Configuration();
   

Modified: lucene/hadoop/branches/branch-0.15/src/test/org/apache/hadoop/io/TestSetFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/test/org/apache/hadoop/io/TestSetFile.java?rev=585651&r1=585650&r2=585651&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/test/org/apache/hadoop/io/TestSetFile.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/test/org/apache/hadoop/io/TestSetFile.java Wed Oct 17 12:54:55 2007
@@ -30,7 +30,7 @@
 
 /** Support for flat files of binary key/value pairs. */
 public class TestSetFile extends TestCase {
-  private static Log LOG = SequenceFile.LOG;
+  private static final Log LOG = LogFactory.getLog(TestSetFile.class);
   private static String FILE =
     System.getProperty("test.build.data",".") + "/test.set";