You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/08/10 20:19:43 UTC

svn commit: r564704 - in /lucene/hadoop/branches/branch-0.14: CHANGES.txt src/java/org/apache/hadoop/dfs/DFSClient.java src/java/org/apache/hadoop/dfs/DataNode.java src/java/org/apache/hadoop/fs/FileUtil.java

Author: dhruba
Date: Fri Aug 10 11:19:40 2007
New Revision: 564704

URL: http://svn.apache.org/viewvc?view=rev&rev=564704
Log:
Remove performance regression introduced by Block CRC.
(Raghu Angadi via dhruba)
merge -c 564687 from trunk to 0.14 release.


Modified:
    lucene/hadoop/branches/branch-0.14/CHANGES.txt
    lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/fs/FileUtil.java

Modified: lucene/hadoop/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/CHANGES.txt?view=diff&rev=564704&r1=564703&r2=564704
==============================================================================
--- lucene/hadoop/branches/branch-0.14/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.14/CHANGES.txt Fri Aug 10 11:19:40 2007
@@ -457,6 +457,9 @@
 146. HADOOP-1666.  FsShell object can be used for multiple fs commands.
      Contributed by Dhruba Borthakur.
 
+147. HADOOP-1654.  Remove performance regression introduced by Block CRC.
+     (Raghu Angadi via dhruba)
+
 Release 0.13.0 - 2007-06-08
 
  1. HADOOP-1047.  Fix TestReplication to succeed more reliably.

Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=564704&r1=564703&r2=564704
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DFSClient.java Fri Aug 10 11:19:40 2007
@@ -1638,8 +1638,12 @@
         int checksumSize = checksum.getChecksumSize(); 
         byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
 
-        InputStream in = (bytesLeft > 0) ? 
-                         new FileInputStream(backupFile) : null;    
+        InputStream in = null;
+        if ( bytesLeft > 0 ) { 
+          in = new BufferedInputStream(new FileInputStream(backupFile),
+                                       buffersize);
+        }
+        
         try {
 
           while ( bytesLeft >= 0 ) {

Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=564704&r1=564703&r2=564704
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/dfs/DataNode.java Fri Aug 10 11:19:40 2007
@@ -77,13 +77,6 @@
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
 
   /**
-   * A buffer size small enough that read/writes while reading headers 
-   * don't result in multiple io calls but reading larger amount of data 
-   * like one checksum size does not result in extra copy. 
-   */
-  public static final int SMALL_HDR_BUFFER_SIZE = 64;
-
-  /**
    * Util method to build socket addr from either:
    *   <host>:<post>
    *   <fs>://<host>:<port>/<path>
@@ -718,7 +711,7 @@
     public void run() {
       try {
         DataInputStream in = new DataInputStream(
-           new BufferedInputStream(s.getInputStream(), SMALL_HDR_BUFFER_SIZE));
+           new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
         short version = in.readShort();
         if ( version != DATA_TRANFER_VERSION ) {
           throw new IOException( "Version Mismatch" );
@@ -827,8 +820,10 @@
         // Open local disk out
         //
         FSDataset.BlockWriteStreams streams = data.writeToBlock( block );
-        out = new DataOutputStream(streams.dataOut);
-        checksumOut = new DataOutputStream(streams.checksumOut);
+        out = new DataOutputStream(
+                  new BufferedOutputStream(streams.dataOut, BUFFER_SIZE));        
+        checksumOut = new DataOutputStream(
+                  new BufferedOutputStream(streams.checksumOut, BUFFER_SIZE));
         
         InetSocketAddress mirrorTarget = null;
         String mirrorNode = null;
@@ -846,7 +841,7 @@
             mirrorSock.setSoTimeout(READ_TIMEOUT);
             mirrorOut = new DataOutputStream( 
                         new BufferedOutputStream(mirrorSock.getOutputStream(),
-                                                 SMALL_HDR_BUFFER_SIZE));
+                                                 BUFFER_SIZE));
             mirrorIn = new DataInputStream( mirrorSock.getInputStream() );
             //Copied from DFSClient.java!
             mirrorOut.writeShort( DATA_TRANFER_VERSION );
@@ -918,6 +913,9 @@
             try {
               mirrorOut.writeInt( len );
               mirrorOut.write( buf, 0, len + checksumSize );
+              if (len == 0) {
+                mirrorOut.flush();
+              }
             } catch (IOException ioe) {
               LOG.info( "Exception writing to mirror " + mirrorNode + 
                         "\n" + StringUtils.stringifyException(ioe) );
@@ -1092,15 +1090,14 @@
   long sendBlock(Socket sock, Block block,
                  long startOffset, long length, DatanodeInfo targets[] )
                  throws IOException {
-    // May be we should just use io.file.buffer.size.
     DataOutputStream out = new DataOutputStream(
                            new BufferedOutputStream(sock.getOutputStream(), 
-                                                    SMALL_HDR_BUFFER_SIZE));
-    DataInputStream in = null;
+                                                    BUFFER_SIZE));
+    RandomAccessFile blockInFile = null;
+    DataInputStream blockIn = null;
     DataInputStream checksumIn = null;
     long totalRead = 0;    
 
-
     /* XXX This will affect inter datanode transfers during 
      * a CRC upgrade. There should not be any replication
      * during crc upgrade since we are in safe mode, right?
@@ -1109,13 +1106,15 @@
 
     try {
       File blockFile = data.getBlockFile( block );
-      in = new DataInputStream( new FileInputStream( blockFile ) );
+      blockInFile = new RandomAccessFile(blockFile, "r");
 
       File checksumFile = FSDataset.getMetaFile( blockFile );
       DataChecksum checksum = null;
 
       if ( !corruptChecksumOk || checksumFile.exists() ) {
-        checksumIn = new DataInputStream( new FileInputStream(checksumFile) );
+        checksumIn = new DataInputStream(
+                     new BufferedInputStream(new FileInputStream(checksumFile),
+                                             BUFFER_SIZE));
           
         //read and handle the common header here. For now just a version
         short version = checksumIn.readShort();
@@ -1169,17 +1168,17 @@
       // seek to the right offsets
       if ( offset > 0 ) {
         long checksumSkip = ( offset / bytesPerChecksum ) * checksumSize ;
-        /* XXX skip() could be very inefficent. Should be seek(). 
-         * at least skipFully
-         */
-        if ( in.skip( offset ) != offset || 
-            ( checksumSkip > 0 && 
-                checksumIn.skip( checksumSkip ) != checksumSkip ) ) {
-          throw new IOException( "Could not seek to right position while " +
-                                 "reading for " + block );
+        blockInFile.seek(offset);
+        if (checksumSkip > 0) {
+          //Should we use seek() for checksum file as well?
+          FileUtil.skipFully(checksumIn, checksumSkip);
         }
       }
       
+      blockIn = new DataInputStream(new BufferedInputStream(
+                                      new FileInputStream(blockInFile.getFD()), 
+                                      BUFFER_SIZE));
+      
       if ( targets != null ) {
         //
         // Header info
@@ -1205,7 +1204,7 @@
         // Write one data chunk per loop.
         int len = (int) Math.min( endOffset - offset, bytesPerChecksum );
         if ( len > 0 ) {
-          in.readFully( buf, 0, len );
+          blockIn.readFully( buf, 0, len );
           totalRead += len;
           
           if ( checksumSize > 0 && checksumIn != null ) {
@@ -1239,8 +1238,9 @@
         offset += len;
       }
     } finally {
+      FileUtil.closeStream( blockInFile );
       FileUtil.closeStream( checksumIn );
-      FileUtil.closeStream( in );
+      FileUtil.closeStream( blockIn );
       FileUtil.closeStream( out );
     }
     

Modified: lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=564704&r1=564703&r2=564704
==============================================================================
--- lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ lucene/hadoop/branches/branch-0.14/src/java/org/apache/hadoop/fs/FileUtil.java Fri Aug 10 11:19:40 2007
@@ -279,21 +279,6 @@
     return dst;
   }
 
-  private static File checkDest(String srcName, File dst)
-    throws IOException {
-    if (dst.exists()) {
-      if (!dst.isDirectory()) {
-        throw new IOException("Target " + dst + " already exists");
-      } else {
-        dst = new File(dst, srcName);
-        if (dst.exists()) {
-          throw new IOException("Target " + dst + " already exists");
-        }
-      }
-    }
-    return dst;
-  }
-  
   /**
    * This class is only used on windows to invoke the cygpath command.
    */
@@ -532,29 +517,32 @@
     }
   }
   
-  public static void closeSocket( Socket sock ) {
-    // avoids try { close() } dance
-    if ( sock != null ) {
-      try {
-       sock.close();
-      } catch ( IOException ignored ) {
+  public static void skipFully( InputStream in, long len ) throws IOException {
+    long toSkip = len;
+    while ( toSkip > 0 ) {
+      long ret = in.skip( toSkip );
+      if ( ret < 0 ) {
+        throw new IOException( "Premeture EOF from inputStream");
       }
+      toSkip -= ret;
     }
   }
-  public static void closeStream( InputStream in ) {
+  
+  public static void closeSocket( Socket sock ) {
     // avoids try { close() } dance
-    if ( in != null ) {
+    if ( sock != null ) {
       try {
-        in.close();
+       sock.close();
       } catch ( IOException ignored ) {
       }
     }
   }
-  public static void closeStream( OutputStream out ) {
+
+  public static void closeStream(Closeable closeable ) {
     // avoids try { close() } dance
-    if ( out != null ) {
+    if ( closeable != null ) {
       try {
-        out.close();
+        closeable.close();
       } catch ( IOException ignored ) {
       }
     }