You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/09/26 18:58:52 UTC

svn commit: r579716 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/

Author: dhruba
Date: Wed Sep 26 09:58:48 2007
New Revision: 579716

URL: http://svn.apache.org/viewvc?rev=579716&view=rev
Log:
HADOOP-1908. Restructure data node code so that block sending and
receiving are seperated from data transfer header handling.
(Hairong Kuang via dhruba)


Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=579716&r1=579715&r2=579716&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Sep 26 09:58:48 2007
@@ -193,6 +193,10 @@
 
   IMPROVEMENTS
 
+    HADOOP-1908. Restructure data node code so that block sending and 
+    receiving are seperated from data transfer header handling.
+    (Hairong Kuang via dhruba)
+
     HADOOP-1921. Save the configuration of completed/failed jobs and make them
     available via the web-ui. (Amar Kamat via devaraj)
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=579716&r1=579715&r2=579716&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Sep 26 09:58:48 2007
@@ -1619,24 +1619,20 @@
         
         try {
 
-          while ( bytesLeft >= 0 ) {
+          while ( bytesLeft > 0 ) {
             int len = (int) Math.min( bytesLeft, bytesPerChecksum );
-            if ( len > 0 ) {
-              IOUtils.readFully( in, buf, 0, len + checksumSize);
-            }
+            IOUtils.readFully( in, buf, 0, len + checksumSize);
 
             blockStream.writeInt( len );
             blockStream.write( buf, 0, len + checksumSize );
 
-            if ( bytesLeft == 0 ) {
-              break;
-            }
-              
             bytesLeft -= len;
 
             if (progress != null) { progress.progress(); }
           }
-          
+
+          // write 0 to mark the end of a block
+          blockStream.writeInt(0);
           blockStream.flush();
           
           numSuccessfulWrites++;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=579716&r1=579715&r2=579716&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Sep 26 09:58:48 2007
@@ -744,9 +744,10 @@
      * Read/write data from/to the DataXceiveServer.
      */
     public void run() {
+      DataInputStream in=null; 
       try {
-        DataInputStream in = new DataInputStream(
-           new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
+        in = new DataInputStream(
+            new BufferedInputStream(s.getInputStream(), BUFFER_SIZE));
         short version = in.readShort();
         if ( version != DATA_TRANFER_VERSION ) {
           throw new IOException( "Version Mismatch" );
@@ -770,12 +771,10 @@
        } catch (Throwable t) {
         LOG.error("DataXceiver: " + StringUtils.stringifyException(t));
       } finally {
-        try {
-          xceiverCount.decr();
-          LOG.debug("Number of active connections is: "+xceiverCount);
-          s.close();
-        } catch (IOException ie2) {
-        }
+        xceiverCount.decr();
+        LOG.debug("Number of active connections is: "+xceiverCount);
+        IOUtils.closeStream(in);
+        IOUtils.closeSocket(s);
       }
     }
 
@@ -793,11 +792,23 @@
 
       long startOffset = in.readLong();
       long length = in.readLong();
-      
+
+      // send the block
+      DataOutputStream out = new DataOutputStream(
+          new BufferedOutputStream(s.getOutputStream(), BUFFER_SIZE));
+      BlockSender blockSender = null;
       try {
-        //XXX Buffered output stream?
-        long read = sendBlock(s, block, startOffset, length, null );
-        myMetrics.readBytes((int)read);
+        try {
+          blockSender = new BlockSender(block, startOffset, length, true, true);
+        } catch(IOException e) {
+          out.writeShort(OP_STATUS_ERROR);
+          throw e;
+        }
+
+        out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status
+        long read = blockSender.sendBlock(out, null); // send data
+
+        myMetrics.readBytes((int) read);
         myMetrics.readBlocks(1);
         LOG.info("Served block " + block + " to " + s.getInetAddress());
       } catch ( SocketException ignored ) {
@@ -808,14 +819,18 @@
          * Earlier version shutdown() datanode if there is disk error.
          */
         LOG.warn( "Got exception while serving " + block + " to " +
-                  s.getInetAddress() + ": " + 
+                  s.getInetAddress() + ":\n" + 
                   StringUtils.stringifyException(ioe) );
         throw ioe;
+      } finally {
+        IOUtils.closeStream(out);
+        IOUtils.closeStream(blockSender);
       }
     }
 
     /**
      * Write a block to disk.
+     * 
      * @param in The stream to read from
      * @throws IOException
      */
@@ -823,62 +838,45 @@
       //
       // Read in the header
       //
-      DataOutputStream reply = new DataOutputStream(s.getOutputStream());
-      DataOutputStream out = null;
-      DataOutputStream checksumOut = null;
-      Socket mirrorSock = null;
-      DataOutputStream mirrorOut = null;
-      DataInputStream mirrorIn = null;
-      
+      Block block = new Block(in.readLong(), 0);
+      int numTargets = in.readInt();
+      if (numTargets < 0) {
+        throw new IOException("Mislabelled incoming datastream.");
+      }
+      DatanodeInfo targets[] = new DatanodeInfo[numTargets];
+      for (int i = 0; i < targets.length; i++) {
+        DatanodeInfo tmp = new DatanodeInfo();
+        tmp.readFields(in);
+        targets[i] = tmp;
+      }
+
+      short opStatus = OP_STATUS_SUCCESS; // write operation status
+      DataOutputStream mirrorOut = null;  // stream to next target
+      Socket mirrorSock = null;           // socket to next target
+      BlockReceiver blockReceiver = null; // responsible for data handling
       try {
-        /* We need an estimate for block size to check if the 
-         * disk partition has enough space. For now we just increment
-         * FSDataset.reserved by configured dfs.block.size
-         * Other alternative is to include the block size in the header
-         * sent by DFSClient.
-         */
-        Block block = new Block( in.readLong(), 0 );
-        int numTargets = in.readInt();
-        if ( numTargets < 0 ) {
-          throw new IOException("Mislabelled incoming datastream.");
-        }
-        DatanodeInfo targets[] = new DatanodeInfo[numTargets];
-        for (int i = 0; i < targets.length; i++) {
-          DatanodeInfo tmp = new DatanodeInfo();
-          tmp.readFields(in);
-          targets[i] = tmp;
-        }
-            
-        DataChecksum checksum = DataChecksum.newDataChecksum( in );
+        // open a block receiver and check if the block does not exist
+        blockReceiver = new BlockReceiver(block, in, 
+            s.getRemoteSocketAddress().toString());
 
         //
-        // Open local disk out
-        //
-        FSDataset.BlockWriteStreams streams = data.writeToBlock( block );
-        out = new DataOutputStream(
-                  new BufferedOutputStream(streams.dataOut, BUFFER_SIZE));        
-        checksumOut = new DataOutputStream(
-                  new BufferedOutputStream(streams.checksumOut, BUFFER_SIZE));
-        
-        InetSocketAddress mirrorTarget = null;
-        String mirrorNode = null;
-        //
         // Open network conn to backup machine, if 
         // appropriate
         //
         if (targets.length > 0) {
+          InetSocketAddress mirrorTarget = null;
+          String mirrorNode = null;
           // Connect to backup machine
           mirrorNode = targets[0].getName();
           mirrorTarget = createSocketAddr(mirrorNode);
+          mirrorSock = new Socket();
           try {
-            mirrorSock = new Socket();
             mirrorSock.connect(mirrorTarget, READ_TIMEOUT);
-            mirrorSock.setSoTimeout(READ_TIMEOUT);
+            mirrorSock.setSoTimeout(numTargets*READ_TIMEOUT);
             mirrorOut = new DataOutputStream( 
                         new BufferedOutputStream(mirrorSock.getOutputStream(),
                                                  BUFFER_SIZE));
-            mirrorIn = new DataInputStream( mirrorSock.getInputStream() );
-            //Copied from DFSClient.java!
+            // Write header: Copied from DFSClient.java!
             mirrorOut.writeShort( DATA_TRANFER_VERSION );
             mirrorOut.write( OP_WRITE_BLOCK );
             mirrorOut.writeLong( block.getBlockId() );
@@ -886,196 +884,73 @@
             for ( int i = 1; i < targets.length; i++ ) {
               targets[i].write( mirrorOut );
             }
-            checksum.writeHeader( mirrorOut );
-            myMetrics.replicatedBlocks(1);
-          } catch (IOException ie) {
-            if (mirrorOut != null) {
-              LOG.info("Exception connecting to mirror " + mirrorNode 
-                       + "\n" + StringUtils.stringifyException(ie));
-              mirrorOut = null;
-            }
+          } catch (IOException e) {
+            IOUtils.closeStream(mirrorOut);
+            mirrorOut = null;
+            IOUtils.closeSocket(mirrorSock);
+            mirrorSock = null;
           }
         }
-        
-        // XXX The following code is similar on both sides...
-        
-        int bytesPerChecksum = checksum.getBytesPerChecksum();
-        int checksumSize = checksum.getChecksumSize();
-        byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
-        long blockLen = 0;
-        long lastOffset = 0;
-        long lastLen = 0;
-        short status = -1;
-        boolean headerWritten = false;
-        
-        while ( true ) {
-          // Read one data chunk in each loop.
-          
-          long offset = lastOffset + lastLen;
-          int len = in.readInt();
-          if ( len < 0 || len > bytesPerChecksum ) {
-            LOG.warn( "Got wrong length during writeBlock(" +
-                      block + ") from " + s.getRemoteSocketAddress() +
-                      " at offset " + offset + ": " + len + 
-                      " expected <= " + bytesPerChecksum );
-            status = OP_STATUS_ERROR;
-            break;
-          }
 
-          in.readFully( buf, 0, len + checksumSize );
-          
-          if ( len > 0 && checksumSize > 0 ) {
-            /*
-             * Verification is not included in the initial design.
-             * For now, it at least catches some bugs. Later, we can 
-             * include this after showing that it does not affect 
-             * performance much.
-             */
-            checksum.update( buf, 0, len  );
-            
-            if ( ! checksum.compare( buf, len ) ) {
-              throw new IOException( "Unexpected checksum mismatch " +
-                                     "while writing " + block + 
-                                     " from " +
-                                     s.getRemoteSocketAddress() );
-            }
-            
-            checksum.reset();
-          }
+        String mirrorAddr = (mirrorSock == null) ? null : 
+            mirrorSock.getRemoteSocketAddress().toString();
+        blockReceiver.receiveBlock(mirrorOut, mirrorAddr, null);
+
+        /*
+         * Informing the name node could take a long long time! Should we wait
+         * till namenode is informed before responding with success to the
+         * client? For now we don't.
+         */
+        synchronized (receivedBlockList) {
+          receivedBlockList.add(block);
+          receivedBlockList.notifyAll();
+        }
 
-          // First write to remote node before writing locally.
-          if (mirrorOut != null) {
-            try {
-              mirrorOut.writeInt( len );
-              mirrorOut.write( buf, 0, len + checksumSize );
-              if (len == 0) {
-                mirrorOut.flush();
-              }
-            } catch (IOException ioe) {
-              LOG.info( "Exception writing to mirror " + mirrorNode + 
-                        "\n" + StringUtils.stringifyException(ioe) );
-              //
-              // If stream-copy fails, continue 
-              // writing to disk.  We shouldn't 
-              // interrupt client write.
-              //
-              mirrorOut = null;
-            }
-          }
+        String msg = "Received block " + block + " from " +
+                     s.getRemoteSocketAddress();
 
+        /* read response from next target in the pipeline. 
+         * ignore the response for now. Will fix it in HADOOP-1927
+         */
+        if( mirrorSock != null ) {
+          short result = OP_STATUS_ERROR;
+          DataInputStream mirrorIn = null;
           try {
-            if ( !headerWritten ) { 
-              // First DATA_CHUNK. 
-              // Write the header even if checksumSize is 0.
-              checksumOut.writeShort( FSDataset.METADATA_VERSION );
-              checksum.writeHeader( checksumOut );
-              headerWritten = true;
-            }
-            
-            if ( len > 0 ) {
-              out.write( buf, 0, len );
-              // Write checksum
-              checksumOut.write( buf, len, checksumSize );
-              myMetrics.wroteBytes( len );
-            } else {
-              /* Should we sync() files here? It can add many millisecs of
-               * latency. We did not sync before HADOOP-1134 either.
-               */ 
-              out.close();
-              out = null;
-              checksumOut.close();
-              checksumOut = null;
-            }
-            
-          } catch (IOException iex) {
-            checkDiskError(iex);
-            throw iex;
+            mirrorIn = new DataInputStream( mirrorSock.getInputStream() );
+            result = mirrorIn.readShort();
+          } catch (IOException ignored) {
+          } finally {
+            IOUtils.closeStream(mirrorIn);
           }
-          
-          if ( len == 0 ) {
-
-            // We already have one successful write here. Should we
-            // wait for response from next target? We will skip for now.
 
-            block.setNumBytes( blockLen );
-            
-            //Does this fsync()?
-            data.finalizeBlock( block );
-            myMetrics.wroteBlocks(1);
-            
-            status = OP_STATUS_SUCCESS;
-            
-            break;
-          }
-          
-          if ( lastLen > 0 && lastLen != bytesPerChecksum ) {
-            LOG.warn( "Got wrong length during writeBlock(" +
-                      block + ") from " + s.getRemoteSocketAddress() +
-                      " : " + " got " + lastLen + " instead of " +
-                      bytesPerChecksum );
-            status = OP_STATUS_ERROR;
-            break;
-          }
-          
-          lastOffset = offset;
-          lastLen = len;
-          blockLen += len;
-        }
-        // done with reading the data.
-        
-        if ( status == OP_STATUS_SUCCESS ) {
-          /* Informing the name node could take a long long time!
-             Should we wait till namenode is informed before responding
-             with success to the client? For now we don't.
-          */
-          synchronized ( receivedBlockList ) {
-            receivedBlockList.add( block );
-            receivedBlockList.notifyAll();
-          }
-          
-          String msg = "Received block " + block + " from " + 
-                       s.getInetAddress();
-          
-          if ( mirrorOut != null ) {
-            //Wait for the remote reply
-            mirrorOut.flush();
-            short result = OP_STATUS_ERROR; 
-            try {
-              result = mirrorIn.readShort();
-            } catch ( IOException ignored ) {}
-
-            msg += " and " +  (( result != OP_STATUS_SUCCESS ) ? 
-                                "failed to mirror to " : " mirrored to ") +
-                   mirrorTarget;
-            
-            mirrorOut = null;
-          }
-          
-          LOG.info(msg);
-        }
-            
-        if ( status >= 0 ) {
-          try {
-            reply.writeShort( status );
-            reply.flush();
-          } catch ( IOException ignored ) {}
+          msg += " and " +  (( result != OP_STATUS_SUCCESS ) ?
+                               "failed to mirror to " : " mirrored to ") +
+                 mirrorAddr;
         }
-        
+
+        LOG.info(msg);
+      } catch (IOException ioe) {
+        opStatus = OP_STATUS_ERROR;
+        throw ioe;
       } finally {
+        // send back reply
+        DataOutputStream reply = new DataOutputStream(s.getOutputStream());
         try {
-          if ( out != null )
-            out.close();
-          if ( checksumOut != null )
-            checksumOut.close();
-          if ( mirrorSock != null )
-            mirrorSock.close();
-        } catch (IOException iex) {
-          shutdown();
-          throw iex;
-        }
+          reply.writeShort(opStatus);
+          reply.flush();
+        } catch (IOException ioe) {
+          LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress()
+              + "for writing block " + block );
+          LOG.warn(StringUtils.stringifyException(ioe));
+        }
+        // close all opened streams
+        IOUtils.closeStream(reply);
+        IOUtils.closeStream(mirrorOut);
+        IOUtils.closeSocket(mirrorSock);
+        IOUtils.closeStream(blockReceiver);
       }
     }
-    
+
     /**
      * Reads the metadata and sends the data in one 'DATA_CHUNK'
      * @param in
@@ -1113,173 +988,401 @@
       }
     }
   }
+    
+  /* An interface to throttle the block transfers */
+  private interface Throttler {
+    /** Given the numOfBytes sent/received since last time throttle was called,
+     * make the current thread sleep if I/O rate is too fast 
+     * compared to the given bandwidth
+     * 
+     * @param numOfBytes 
+     *     number of bytes sent/received since last time throttle was called
+     */ 
+    void throttle(int numOfBytes);
+  }
 
-  /** sendBlock() is used to read block and its metadata and stream
-   * the data to either a client or to another datanode.
-   * If argument targets is null, then it is assumed to be replying
-   * to a client request (OP_BLOCK_READ). Otherwise, we are replicating
-   * to another datanode.
-   * 
-   * returns total bytes reads, including crc.
-   */
-  long sendBlock(Socket sock, Block block,
-                 long startOffset, long length, DatanodeInfo targets[] )
-                 throws IOException {
-    DataOutputStream out = new DataOutputStream(
-                           new BufferedOutputStream(sock.getOutputStream(), 
-                                                    BUFFER_SIZE));
-    RandomAccessFile blockInFile = null;
-    DataInputStream blockIn = null;
-    DataInputStream checksumIn = null;
-    long totalRead = 0;    
-
-    /* XXX This will affect inter datanode transfers during 
-     * a CRC upgrade. There should not be any replication
-     * during crc upgrade since we are in safe mode, right?
-     */    
-    boolean corruptChecksumOk = targets == null; 
-
-    try {
-      File blockFile = data.getBlockFile( block );
-      blockInFile = new RandomAccessFile(blockFile, "r");
-
-      File checksumFile = FSDataset.getMetaFile( blockFile );
-      DataChecksum checksum = null;
-
-      if ( !corruptChecksumOk || checksumFile.exists() ) {
-        checksumIn = new DataInputStream(
-                     new BufferedInputStream(new FileInputStream(checksumFile),
-                                             BUFFER_SIZE));
-          
-        //read and handle the common header here. For now just a version
-        short version = checksumIn.readShort();
-        if ( version != FSDataset.METADATA_VERSION ) {
-          LOG.warn( "Wrong version (" + version + 
-                    ") for metadata file for " + block + " ignoring ..." );
-        }
-        checksum = DataChecksum.newDataChecksum( checksumIn ) ;
-      } else {
-        LOG.warn( "Could not find metadata file for " + block );
-        // This only decides the buffer size. Use BUFFER_SIZE?
-        checksum = DataChecksum.newDataChecksum( DataChecksum.CHECKSUM_NULL,
-                                                 16*1024 );
-      }
+  private class BlockSender implements java.io.Closeable {
+    private Block block; // the block to read from
+    private DataInputStream blockIn; // data strean
+    private DataInputStream checksumIn; // checksum datastream
+    private DataChecksum checksum; // checksum stream
+    private long offset; // starting position to read
+    private long endOffset; // ending position
+    private byte buf[]; // buffer to store data read from the block file & crc
+    private int bytesPerChecksum; // chunk size
+    private int checksumSize; // checksum size
+    private boolean corruptChecksumOk; // if need to verify checksum
+    private boolean chunkOffsetOK; // if need to send chunk offset
+
+    private Throttler throttler;
+    private DataOutputStream out;
+
+    BlockSender(Block block, long startOffset, long length,
+        boolean corruptChecksumOk, boolean chunkOffsetOK) throws IOException {
+      RandomAccessFile blockInFile = null;
 
-      int bytesPerChecksum = checksum.getBytesPerChecksum();
-      int checksumSize = checksum.getChecksumSize();
-      
-      if (length < 0) {
-        length = data.getLength(block);
-      }
+      try {
+        this.block = block;
+        this.chunkOffsetOK = chunkOffsetOK;
+        this.corruptChecksumOk = corruptChecksumOk;
+        File blockFile = data.getBlockFile(block);
+        blockInFile = new RandomAccessFile(blockFile, "r");
+
+        File checksumFile = FSDataset.getMetaFile(blockFile);
+
+        if (!corruptChecksumOk || checksumFile.exists()) {
+          checksumIn = new DataInputStream(new BufferedInputStream(
+              new FileInputStream(checksumFile), BUFFER_SIZE));
+
+          // read and handle the common header here. For now just a version
+          short version = checksumIn.readShort();
+          if (version != FSDataset.METADATA_VERSION) {
+            LOG.warn("Wrong version (" + version + ") for metadata file for "
+                + block + " ignoring ...");
+          }
+          checksum = DataChecksum.newDataChecksum(checksumIn);
+        } else {
+          LOG.warn("Could not find metadata file for " + block);
+          // This only decides the buffer size. Use BUFFER_SIZE?
+          checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
+              16 * 1024);
+        }
+
+        bytesPerChecksum = checksum.getBytesPerChecksum();
+        checksumSize = checksum.getChecksumSize();
 
-      long endOffset = data.getLength( block );
-      if ( startOffset < 0 || startOffset > endOffset ||
-          (length + startOffset) > endOffset ) {
-        String msg = " Offset " + startOffset + " and length " + length + 
-                     " don't match block " + block +  " ( blockLen " + 
-                     endOffset + " )"; 
-        LOG.warn( "sendBlock() : " + msg );
-        if ( targets != null ) {
+        if (length < 0) {
+          length = data.getLength(block);
+        }
+
+        endOffset = data.getLength(block);
+        if (startOffset < 0 || startOffset > endOffset
+            || (length + startOffset) > endOffset) {
+          String msg = " Offset " + startOffset + " and length " + length
+          + " don't match block " + block + " ( blockLen " + endOffset + " )";
+          LOG.warn("sendBlock() : " + msg);
           throw new IOException(msg);
-        } else {
-          out.writeShort( OP_STATUS_ERROR_INVALID );
-          return totalRead;
         }
+
+        buf = new byte[bytesPerChecksum + checksumSize];
+        offset = (startOffset - (startOffset % bytesPerChecksum));
+        if (length >= 0) {
+          // Make sure endOffset points to end of a checksumed chunk.
+          long tmpLen = startOffset + length + (startOffset - offset);
+          if (tmpLen % bytesPerChecksum != 0) {
+            tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
+          }
+          if (tmpLen < endOffset) {
+            endOffset = tmpLen;
+          }
+        }
+
+        // seek to the right offsets
+        if (offset > 0) {
+          long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
+          blockInFile.seek(offset);
+          if (checksumSkip > 0) {
+            // Should we use seek() for checksum file as well?
+            IOUtils.skipFully(checksumIn, checksumSkip);
+          }
+        }
+
+        blockIn = new DataInputStream(new BufferedInputStream(
+            new FileInputStream(blockInFile.getFD()), BUFFER_SIZE));
+      } catch (IOException ioe) {
+        IOUtils.closeStream(this);
+        IOUtils.closeStream(blockInFile);
+        throw ioe;
       }
+    }
 
-      byte buf[] = new byte[ bytesPerChecksum + checksumSize ];
-      long offset = (startOffset - (startOffset % bytesPerChecksum));
-      if ( length >= 0 ) {
-        // Make sure endOffset points to end of a checksumed chunk. 
-        long tmpLen = startOffset + length + (startOffset - offset);
-        if ( tmpLen % bytesPerChecksum != 0 ) { 
-          tmpLen += ( bytesPerChecksum - tmpLen % bytesPerChecksum );
+    // close opened files
+    public void close() throws IOException {
+      IOException ioe = null;
+      // close checksum file
+      if(checksumIn!=null) {
+        try {
+          checksumIn.close();
+        } catch (IOException e) {
+          ioe = e;
         }
-        if ( tmpLen < endOffset ) {
-          endOffset = tmpLen;
+        checksumIn = null;
+      }
+      // close data file
+      if(blockIn!=null) {
+        try {
+          blockIn.close();
+        } catch (IOException e) {
+          ioe = e;
         }
+        blockIn = null;
       }
+      // throw IOException if there is any
+      if(ioe!= null) {
+        throw ioe;
+      }
+    }
+    
+    private int sendChunk()
+        throws IOException {
+      int len = (int) Math.min(endOffset - offset, bytesPerChecksum);
+      if (len == 0)
+        return 0;
+      blockIn.readFully(buf, 0, len);
 
-      // seek to the right offsets
-      if ( offset > 0 ) {
-        long checksumSkip = ( offset / bytesPerChecksum ) * checksumSize ;
-        blockInFile.seek(offset);
-        if (checksumSkip > 0) {
-          //Should we use seek() for checksum file as well?
-          IOUtils.skipFully(checksumIn, checksumSkip);
+      if (checksumSize > 0 && checksumIn != null) {
+        try {
+          checksumIn.readFully(buf, len, checksumSize);
+        } catch (IOException e) {
+          LOG.warn(" Could not read checksum for data at offset " + offset
+              + " for block " + block + " got : "
+              + StringUtils.stringifyException(e));
+          IOUtils.closeStream(checksumIn);
+          checksumIn = null;
+          if (corruptChecksumOk) {
+            // Just fill the array with zeros.
+            Arrays.fill(buf, len, len + checksumSize, (byte) 0);
+          } else {
+            throw e;
+          }
         }
       }
-      
-      blockIn = new DataInputStream(new BufferedInputStream(
-                                      new FileInputStream(blockInFile.getFD()), 
-                                      BUFFER_SIZE));
-      
-      if ( targets != null ) {
+
+      out.writeInt(len);
+      out.write(buf, 0, len + checksumSize);
+
+      if (throttler != null) { // rebalancing so throttle
+        throttler.throttle(len + checksumSize);
+      }
+
+      return len;
+    }
+
+    /**
+     * sendBlock() is used to read block and its metadata and stream the data to
+     * either a client or to another datanode. 
+     * 
+     * @param out  stream to which the block is written to
+     * returns total bytes reads, including crc.
+     */
+    long sendBlock(DataOutputStream out, Throttler throttler)
+        throws IOException {
+      if( out == null ) {
+        throw new IOException( "out stream is null" );
+      }
+      this.out = out;
+      this.throttler = throttler;
+
+      long totalRead = 0;
+      try {
+        checksum.writeHeader(out);
+        if ( chunkOffsetOK ) {
+          out.writeLong( offset );
+        }
+
+        while (endOffset > offset) {
+          // Write one data chunk per loop.
+          long len = sendChunk();
+          offset += len;
+          totalRead += len + checksumSize;
+        }
+        out.writeInt(0); // mark the end of block
+        out.flush();
+      } finally {
+        close();
+      }
+
+      return totalRead;
+    }
+  }
+
+  /* A class that receives a block and wites to its own disk, meanwhile
+   * may copies it to another site. If a throttler is provided,
+   * streaming throttling is also supported. 
+   * */
+  private class BlockReceiver implements java.io.Closeable {
+    private Block block; // the block to receive
+    private DataInputStream in = null; // from where data are read
+    private DataChecksum checksum; // from where chunks of a block can be read
+    private DataOutputStream out = null; // to block file at local disk
+    private DataOutputStream checksumOut = null; // to crc file at local disk
+    private int bytesPerChecksum;
+    private int checksumSize;
+    private byte buf[];
+    private long offset;
+    final private String inAddr;
+    private String mirrorAddr;
+    private DataOutputStream mirrorOut;
+    private Throttler throttler;
+    private int lastLen = -1;
+    private int curLen = -1;
+
+    BlockReceiver(Block block, DataInputStream in, String inAddr)
+        throws IOException {
+      try{
+        this.block = block;
+        this.in = in;
+        this.inAddr = inAddr;
+        this.checksum = DataChecksum.newDataChecksum(in);
+        this.bytesPerChecksum = checksum.getBytesPerChecksum();
+        this.checksumSize = checksum.getChecksumSize();
+        this.buf = new byte[bytesPerChecksum + checksumSize];
+
         //
-        // Header info
+        // Open local disk out
         //
-        out.writeShort( DATA_TRANFER_VERSION );
-        out.writeByte( OP_WRITE_BLOCK );
-        out.writeLong( block.getBlockId() );
-        out.writeInt(targets.length-1);
-        for (int i = 1; i < targets.length; i++) {
-          targets[i].write( out );
+        FSDataset.BlockWriteStreams streams = data.writeToBlock(block);
+        this.out = new DataOutputStream(new BufferedOutputStream(
+          streams.dataOut, BUFFER_SIZE));
+        this.checksumOut = new DataOutputStream(new BufferedOutputStream(
+          streams.checksumOut, BUFFER_SIZE));
+      } catch(IOException ioe) {
+        IOUtils.closeStream(this);
+        throw ioe;
+      }
+    }
+
+    // close files
+    public void close() throws IOException {
+      IOException ioe = null;
+      // close checksum file
+      try {
+        if (checksumOut != null) {
+          checksumOut.close();
+          checksumOut = null;
         }
-      } else {
-        out.writeShort( OP_STATUS_SUCCESS );          
+      } catch(IOException e) {
+        ioe = e;
+      }
+      // close block file
+      try {
+        if (out != null) {
+          out.close();
+          out = null;
+        }
+      } catch (IOException e) {
+        ioe = e;
+      }
+      // disk check
+      if(ioe != null) {
+        checkDiskError(ioe);
+        throw ioe;
+      }
+    }
+    
+    /* receive a chunk: write it to disk & mirror it to another stream */
+    private void receiveChunk( int len ) throws IOException {
+      if (len <= 0 || len > bytesPerChecksum) {
+        throw new IOException("Got wrong length during writeBlock(" + block
+            + ") from " + inAddr + " at offset " + offset + ": " + len
+            + " expected <= " + bytesPerChecksum);
       }
 
-      checksum.writeHeader( out );
-      
-      if ( targets == null ) {
-        out.writeLong( offset );
+      if (lastLen > 0 && lastLen != bytesPerChecksum) {
+        throw new IOException("Got wrong length during receiveBlock(" + block
+          + ") from " + inAddr + " : " + " got " + lastLen + " instead of "
+          + bytesPerChecksum);
       }
-      
-      while ( endOffset >= offset ) {
-        // Write one data chunk per loop.
-        int len = (int) Math.min( endOffset - offset, bytesPerChecksum );
-        if ( len > 0 ) {
-          blockIn.readFully( buf, 0, len );
-          totalRead += len;
-          
-          if ( checksumSize > 0 && checksumIn != null ) {
-            try {
-              checksumIn.readFully( buf, len, checksumSize );
-              totalRead += checksumSize;
-            } catch ( IOException e ) {
-              LOG.warn( " Could not read checksum for data at offset " +
-                        offset + " for block " + block + " got : " + 
-                        StringUtils.stringifyException(e) );
-              IOUtils.closeStream( checksumIn );
-              checksumIn = null;
-              if ( corruptChecksumOk ) {
-                // Just fill the array with zeros.
-                Arrays.fill( buf, len, len + checksumSize, (byte)0 );
-              } else {
-                throw e;
-              }
-            }
-          }
-        }
 
-        out.writeInt( len );
-        out.write( buf, 0, len + checksumSize );
-        
-        if ( offset == endOffset ) {
-          out.flush();
-          // We are not waiting for response from target.
-          break;
+      lastLen = curLen;
+      curLen = len;
+
+      in.readFully(buf, 0, len + checksumSize);
+
+      /*
+       * Verification is not included in the initial design. For now, it at
+       * least catches some bugs. Later, we can include this after showing that
+       * it does not affect performance much.
+       */
+      checksum.update(buf, 0, len);
+
+      if (!checksum.compare(buf, len)) {
+        throw new IOException("Unexpected checksum mismatch "
+            + "while writing " + block + " from " + inAddr);
+      }
+
+      checksum.reset();
+
+      // First write to remote node before writing locally.
+      if (mirrorOut != null) {
+        try {
+          mirrorOut.writeInt(len);
+          mirrorOut.write(buf, 0, len + checksumSize);
+        } catch (IOException ioe) {
+          LOG.info("Exception writing to mirror " + mirrorAddr + "\n"
+              + StringUtils.stringifyException(ioe));
+          //
+          // If stream-copy fails, continue
+          // writing to disk. We shouldn't
+          // interrupt client write.
+          //
+          mirrorOut = null;
         }
-        offset += len;
       }
-    } finally {
-      IOUtils.closeStream( blockInFile );
-      IOUtils.closeStream( checksumIn );
-      IOUtils.closeStream( blockIn );
-      IOUtils.closeStream( out );
+
+      try {
+        out.write(buf, 0, len);
+        // Write checksum
+        checksumOut.write(buf, len, checksumSize);
+        myMetrics.wroteBytes(len);
+      } catch (IOException iex) {
+        checkDiskError(iex);
+        throw iex;
+      }
+
+      if (throttler != null) { // throttle I/O
+        throttler.throttle(len + checksumSize);
+      }
+    }
+
+    public void receiveBlock(DataOutputStream mirrorOut,
+        String mirrorAddr, Throttler throttler) throws IOException {
+
+        this.mirrorOut = mirrorOut;
+        this.mirrorAddr = mirrorAddr;
+        this.throttler = throttler;
+
+      /*
+       * We need an estimate for block size to check if the disk partition has
+       * enough space. For now we just increment FSDataset.reserved by
+       * configured dfs.block.size Other alternative is to include the block
+       * size in the header sent by DFSClient.
+       */
+
+      try {
+        // write data chunk header
+        checksumOut.writeShort(FSDataset.METADATA_VERSION);
+        checksum.writeHeader(checksumOut);
+        if (mirrorOut != null) {
+          checksum.writeHeader(mirrorOut);
+          this.mirrorAddr = mirrorAddr;
+        }
+
+        int len = in.readInt();
+        while (len != 0) {
+          receiveChunk( len );
+          offset += len;
+          len = in.readInt();
+        }
+
+        // flush the mirror out
+        if (mirrorOut != null) {
+          mirrorOut.writeInt(0); // mark the end of the stream
+          mirrorOut.flush();
+        }
+
+        // close the block/crc files
+        close();
+
+        // Finalize the block. Does this fsync()?
+        block.setNumBytes(offset);
+        data.finalizeBlock(block);
+        myMetrics.wroteBlocks(1);
+      } catch (IOException ioe) {
+        IOUtils.closeStream(this);
+        throw ioe;
+      }
     }
-    
-    return totalRead;
   }
 
   /**
@@ -1305,21 +1408,40 @@
     public void run() {
       xmitsInProgress++;
       Socket sock = null;
+      DataOutputStream out = null;
+      BlockSender blockSender = null;
       
       try {
         InetSocketAddress curTarget = 
           createSocketAddr(targets[0].getName());
         sock = new Socket();  
         sock.connect(curTarget, READ_TIMEOUT);
-        sock.setSoTimeout(READ_TIMEOUT);
-        sendBlock( sock, b, 0, -1, targets );
-        LOG.info( "Transmitted block " + b + " to " + curTarget );
-
-      } catch ( IOException ie ) {
-        LOG.warn( "Failed to transfer " + b + " to " + 
-                  targets[0].getName() + " got " + 
-                  StringUtils.stringifyException( ie ) );
+        sock.setSoTimeout(targets.length*READ_TIMEOUT);
+
+        out = new DataOutputStream(new BufferedOutputStream(
+            sock.getOutputStream(), BUFFER_SIZE));
+        blockSender = new BlockSender(b, 0, -1, false, false);
+
+        //
+        // Header info
+        //
+        out.writeShort(DATA_TRANFER_VERSION);
+        out.writeByte(OP_WRITE_BLOCK);
+        out.writeLong(b.getBlockId());
+        // write targets
+        out.writeInt(targets.length - 1);
+        for (int i = 1; i < targets.length; i++) {
+          targets[i].write(out);
+        }
+        // send data & checksum
+        blockSender.sendBlock(out, null);
+        LOG.info("Transmitted block " + b + " to " + curTarget);
+      } catch (IOException ie) {
+        LOG.warn("Failed to transfer " + b + " to " + targets[0].getName()
+            + " got " + StringUtils.stringifyException(ie));
       } finally {
+        IOUtils.closeStream(blockSender);
+        IOUtils.closeStream(out);
         IOUtils.closeSocket(sock);
         xmitsInProgress--;
       }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=579716&r1=579715&r2=579716&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Wed Sep 26 09:58:48 2007
@@ -97,7 +97,13 @@
    * This should change when serialization of DatanodeInfo, not just
    * when protocol changes. It is not very obvious. 
    */
-  public static final int DATA_TRANFER_VERSION = 5; //Should it be 1?
+  /* Version 6: 
+   * 0 marks the end of a block not an EMPTY_CHUNK
+   * OP_READ_BLOCK: return OP_STATUS_ERROR if received an invalid block id
+   *                return OP_STATUS_ERROR if received an invalid length
+   * OP_WRITE_BLOCK: return OP_STATUS_ERROR if illegal bytesPerChecksum
+   */
+  public static final int DATA_TRANFER_VERSION = 6; //Should it be 1?
 
   // Return codes for file create
   public static final int OPERATION_FAILED = 0;

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java?rev=579716&r1=579715&r2=579716&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java Wed Sep 26 09:58:48 2007
@@ -171,7 +171,9 @@
     byteBuf.put((byte)DataChecksum.CHECKSUM_CRC32);
     
     byteBuf.putInt(-1-random.nextInt(oneMil));
-    sendRecvData("wrong bytesPerChecksum while writing", true);
+    recvByteBuf.position(0);
+    recvByteBuf.putShort((short)FSConstants.OP_STATUS_ERROR);
+    sendRecvData("wrong bytesPerChecksum while writing", false);
     byteBuf.putInt(checksumPos+1, 512);
     
     byteBuf.putInt(targetPos, -1-random.nextInt(oneMil));
@@ -204,14 +206,11 @@
     byteBuf.putLong(0L);
     int lenPos = byteBuf.position();
     byteBuf.putLong(fileLen);
-    /* We should change DataNode to return ERROR_INVALID instead of closing 
-     * the connection.
-     */
-    sendRecvData("Wrong block ID for read", true); 
+    recvByteBuf.position(0);
+    recvByteBuf.putShort((short)FSConstants.OP_STATUS_ERROR);
+    sendRecvData("Wrong block ID for read", false); 
     byteBuf.putLong(blockPos, firstBlock.getBlockId());
     
-    recvByteBuf.position(0);
-    recvByteBuf.putShort((short)FSConstants.OP_STATUS_ERROR_INVALID);
     byteBuf.putLong(startOffsetPos, -1-random.nextInt(oneMil));
     sendRecvData("Negative start-offset for read", false);
     
@@ -224,7 +223,7 @@
     byteBuf.putLong(lenPos, -1-random.nextInt(oneMil));
     sendRecvData("Negative length for read", false);
     
-    recvByteBuf.putShort(0, (short)FSConstants.OP_STATUS_ERROR_INVALID);
+    recvByteBuf.putShort(0, (short)FSConstants.OP_STATUS_ERROR);
     byteBuf.putLong(lenPos, fileLen+1);
     sendRecvData("Wrong length for read", false);
     byteBuf.putLong(lenPos, fileLen);