You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2011/10/06 03:17:05 UTC

svn commit: r1179484 [1/2] - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/server/datano...

Author: atm
Date: Thu Oct  6 01:16:48 2011
New Revision: 1179484

URL: http://svn.apache.org/viewvc?rev=1179484&view=rev
Log:
Merging trunk to HDFS-1623 branch.

Added:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
      - copied unchanged from r1179483, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/deb/init.d/hadoop-secondarynamenode
      - copied unchanged from r1179483, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/deb/init.d/hadoop-secondarynamenode
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/rpm/init.d/hadoop-secondarynamenode
      - copied unchanged from r1179483, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/rpm/init.d/hadoop-secondarynamenode
Modified:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/native/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/deb/hadoop.control/preinst
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/rpm/spec/hadoop-hdfs.spec
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestJsonUtil.java

Propchange: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct  6 01:16:48 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:1152502-1177128
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:1152502-1179483
 /hadoop/core/branches/branch-0.19/hdfs:713112
 /hadoop/hdfs/branches/HDFS-1052:987665-1095512
 /hadoop/hdfs/branches/HDFS-265:796829-820463

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Oct  6 01:16:48 2011
@@ -19,9 +19,13 @@ Trunk (unreleased changes)
     HDFS-2340. Support getFileBlockLocations and getDelegationToken in webhdfs.
     (szetszwo)
 
+    HDFS-2348. Support getContentSummary and getFileChecksum in webhdfs.
+    (szetszwo)
+
   IMPROVEMENTS
 
-    HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
+    HADOOP-7524 Change RPC to allow multiple protocols including multuple 
+    versions of the same protocol (sanjay Radia)
 
     HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->
                HdfsConstants. (Harsh J Chouraria via atm)
@@ -50,6 +54,13 @@ Trunk (unreleased changes)
     HDFS-2355. Federation: enable using the same configuration file across 
     all the nodes in the cluster. (suresh)
 
+    HDFS-2371. Refactor BlockSender.java for better readability. (suresh)
+
+    HDFS-2158. Add JournalSet to manage the set of journals. (jitendra)
+
+    HDFS-2395. Add a root element in the JSON responses of webhdfs.
+    (szetszwo)
+
   BUG FIXES
     HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)
 
@@ -83,6 +94,9 @@ Trunk (unreleased changes)
 
     HDFS-2361. hftp is broken, fixed username checks in JspHelper. (jitendra)
 
+    HDFS-2298. Fix TestDfsOverAvroRpc by changing ClientProtocol to
+    not include multiple methods of the same name. (cutting)
+
 Release 0.23.0 - Unreleased
 
   INCOMPATIBLE CHANGES
@@ -1145,6 +1159,39 @@ Release 0.23.0 - Unreleased
                (todd)
     HDFS-2027. Image inspector should return finalized logs before unfinalized
                logs. (todd)
+    HDFS-2074. Determine edit log validity by truly reading and validating
+               transactions. (todd)
+    HDFS-2085. Finalize in-progress edit logs at startup. (todd)
+    HDFS-2026. SecondaryNameNode should properly handle the case where the
+               NameNode is reformatted. (todd)
+    HDFS-2077. Address checkpoint upload when one of the storage dirs is failed
+               (todd)
+    HDFS-2078. NameNode should not clear directory when restoring removed
+               storage. (todd)
+    HDFS-2088. Move edits log archiving logic into FSEditLog/JournalManager
+               (todd)
+    HDFS-2093. Handle case where an entirely empty log is left during NN crash
+               (todd)
+    HDFS-2102. Zero-pad edits filename to make them lexically sortable. (Ivan
+               Kelly via todd)
+    HDFS-2010. Fix NameNode to exit if all edit streams become inaccessible.
+               (atm via todd)
+    HDFS-2123. Checkpoint interval should be based on txn count, not size.
+               (todd)
+    HDFS-1979. Fix backupnode for new edits/image layout. (todd)
+    HDFS-2101. Fix remaining unit tests for new storage filenames. (todd)
+    HDFS-2133. Address remaining TODOs and pre-merge cleanup on HDFS-1073
+               branch.  (todd)
+    HDFS-1780. Reduce need to rewrite FSImage on startup. (todd)
+    HDFS-2104. Add a flag to the 2NN to format its checkpoint dirs on startup.
+               (todd)
+    HDFS-2135. Fix regression of HDFS-1955 in HDFS-1073 branch. (todd)
+    HDFS-2160. Fix CreateEditsLog test tool in HDFS-1073 branch. (todd)
+    HDFS-2168. Reenable TestEditLog.testFailedOpen and fix exposed bug. (todd)
+    HDFS-2169. Clean up TestCheckpoint and remove TODOs (todd)
+    HDFS-2170. Address remaining TODOs in HDFS-1073 branch. (todd)
+    HDFS-2172. Address findbugs and javadoc warnings in HDFS-1073 branch. 
+               (todd)
 
 Release 0.22.0 - Unreleased
 

Propchange: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct  6 01:16:48 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:1159757-1177128
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:1159757-1179483
 /hadoop/core/branches/branch-0.19/hdfs/src/java:713112
 /hadoop/core/branches/branch-0.19/hdfs/src/main/java:713112
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Thu Oct  6 01:16:48 2011
@@ -874,13 +874,13 @@ public class DFSClient implements java.i
   }
   /**
    * Rename file or directory.
-   * @see ClientProtocol#rename(String, String, Options.Rename...)
+   * @see ClientProtocol#rename2(String, String, Options.Rename...)
    */
   public void rename(String src, String dst, Options.Rename... options)
       throws IOException {
     checkOpen();
     try {
-      namenode.rename(src, dst, options);
+      namenode.rename2(src, dst, options);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
                                      DSQuotaExceededException.class,

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Thu Oct  6 01:16:48 2011
@@ -67,9 +67,9 @@ public interface ClientProtocol extends 
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 68: Add Balancer Bandwidth Command protocol
+   * 69: Eliminate overloaded method names.
    */
-  public static final long versionID = 68L;
+  public static final long versionID = 69L;
   
   ///////////////////////////////////////
   // File contents
@@ -419,7 +419,7 @@ public interface ClientProtocol extends 
    *           <code>dst</code> contains a symlink
    * @throws IOException If an I/O error occurred
    */
-  public void rename(String src, String dst, Options.Rename... options)
+  public void rename2(String src, String dst, Options.Rename... options)
       throws AccessControlException, DSQuotaExceededException,
       FileAlreadyExistsException, FileNotFoundException,
       NSQuotaExceededException, ParentNotDirectoryException, SafeModeException,
@@ -428,21 +428,6 @@ public interface ClientProtocol extends 
   /**
    * Delete the given file or directory from the file system.
    * <p>
-   * Any blocks belonging to the deleted files will be garbage-collected.
-   * 
-   * @param src existing name.
-   * @return true only if the existing file or directory was actually removed 
-   * from the file system. 
-   * @throws UnresolvedLinkException if <code>src</code> contains a symlink. 
-   * @deprecated use {@link #delete(String, boolean)} istead.
-   */
-  @Deprecated
-  public boolean delete(String src) 
-      throws IOException, UnresolvedLinkException;
-
-  /**
-   * Delete the given file or directory from the file system.
-   * <p>
    * same as delete but provides a way to avoid accidentally 
    * deleting non empty directories programmatically. 
    * @param src existing name

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Thu Oct  6 01:16:48 2011
@@ -404,7 +404,7 @@ class BlockPoolSliceScanner {
         adjustThrottler();
         
         blockSender = new BlockSender(block, 0, -1, false, false, true,
-            datanode);
+            datanode, null);
 
         DataOutputStream out = 
                 new DataOutputStream(new IOUtils.NullOutputStream());

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Oct  6 01:16:48 2011
@@ -41,191 +41,230 @@ import org.apache.hadoop.util.DataChecks
 
 /**
  * Reads a block from the disk and sends it to a recipient.
+ * 
+ * Data sent from the BlockeSender in the following format:
+ * <br><b>Data format:</b> <pre>
+ *    +--------------------------------------------------+
+ *    | ChecksumHeader | Sequence of data PACKETS...     |
+ *    +--------------------------------------------------+ 
+ * </pre>   
+ * <b>ChecksumHeader format:</b> <pre>
+ *    +--------------------------------------------------+
+ *    | 1 byte CHECKSUM_TYPE | 4 byte BYTES_PER_CHECKSUM |
+ *    +--------------------------------------------------+ 
+ * </pre>   
+ * An empty packet is sent to mark the end of block and read completion.
+ * 
+ *  PACKET Contains a packet header, checksum and data. Amount of data
+ *  carried is set by BUFFER_SIZE.
+ *  <pre>
+ *    +-----------------------------------------------------+
+ *    | 4 byte packet length (excluding packet header)      |
+ *    +-----------------------------------------------------+
+ *    | 8 byte offset in the block | 8 byte sequence number |
+ *    +-----------------------------------------------------+
+ *    | 1 byte isLastPacketInBlock                          |
+ *    +-----------------------------------------------------+
+ *    | 4 byte Length of actual data                        |
+ *    +-----------------------------------------------------+
+ *    | x byte checksum data. x is defined below            |
+ *    +-----------------------------------------------------+
+ *    | actual data ......                                  |
+ *    +-----------------------------------------------------+
+ *    
+ *    Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
+ *    A checksum is calculated for each chunk.
+ *    
+ *    x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
+ *        CHECKSUM_SIZE
+ *        
+ *    CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) 
+ *    </pre>
+ *  
+ *  The client reads data until it receives a packet with 
+ *  "LastPacketInBlock" set to true or with a zero length. If there is 
+ *  no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK: 
+ *  <pre>
+ *    +------------------------------+
+ *    | 2 byte OP_STATUS_CHECKSUM_OK |
+ *    +------------------------------+
+ *  </pre>
  */
 class BlockSender implements java.io.Closeable {
-  public static final Log LOG = DataNode.LOG;
+  static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
-  
-  private ExtendedBlock block; // the block to read from
-
-  /** the replica to read from */
-  private final Replica replica;
-  /** The visible length of a replica. */
-  private final long replicaVisibleLength;
-
-  private static final boolean is32Bit = System.getProperty("sun.arch.data.model").equals("32");
-
-  private InputStream blockIn; // data stream
-  private long blockInPosition = -1; // updated while using transferTo().
-  private DataInputStream checksumIn; // checksum datastream
-  private DataChecksum checksum; // checksum stream
-  private long offset; // starting position to read
-  private long endOffset; // ending position
-  private int bytesPerChecksum; // chunk size
-  private int checksumSize; // checksum size
-  private boolean corruptChecksumOk; // if need to verify checksum
-  private boolean chunkOffsetOK; // if need to send chunk offset
-  private long seqno; // sequence number of packet
-
-  private boolean transferToAllowed = true;
-  // set once entire requested byte range has been sent to the client
-  private boolean sentEntireByteRange;
-  private boolean verifyChecksum; //if true, check is verified while reading
-  private DataTransferThrottler throttler;
-  private final String clientTraceFmt; // format of client trace log message
-
+  private static final boolean is32Bit = 
+      System.getProperty("sun.arch.data.model").equals("32");
   /**
    * Minimum buffer used while sending data to clients. Used only if
    * transferTo() is enabled. 64KB is not that large. It could be larger, but
    * not sure if there will be much more improvement.
    */
   private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
+  private static final int TRANSFERTO_BUFFER_SIZE = Math.max(
+      HdfsConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);
+  
+  /** the block to read from */
+  private final ExtendedBlock block;
+  /** the replica to read from */
+  private final Replica replica;
+  /** The visible length of a replica. */
+  private final long replicaVisibleLength;
+  /** Stream to read block data from */
+  private InputStream blockIn;
+  /** updated while using transferTo() */
+  private long blockInPosition = -1;
+  /** Stream to read checksum */
+  private DataInputStream checksumIn;
+  /** Checksum utility */
+  private final DataChecksum checksum;
+  /** Starting position to read */
+  private long offset;
+  /** Position of last byte to read from block file */
+  private final long endOffset;
+  /** Number of bytes in chunk used for computing checksum */
+  private final int chunkSize;
+  /** Number bytes of checksum computed for a chunk */
+  private final int checksumSize;
+  /** If true, failure to read checksum is ignored */
+  private final boolean corruptChecksumOk;
+  /** true if chunk offset is needed to be sent in Checksum header */
+  private final boolean chunkOffsetOK;
+  /** Sequence number of packet being sent */
+  private long seqno;
+  /** Set to true if transferTo is allowed for sending data to the client */
+  private final boolean transferToAllowed;
+  /** Set to true once entire requested byte range has been sent to the client */
+  private boolean sentEntireByteRange;
+  /** When true, verify checksum while reading from checksum file */
+  private final boolean verifyChecksum;
+  /** Format used to print client trace log messages */
+  private final String clientTraceFmt;
   private volatile ChunkChecksum lastChunkChecksum = null;
-
   
-  BlockSender(ExtendedBlock block, long startOffset, long length,
-              boolean corruptChecksumOk, boolean chunkOffsetOK,
-              boolean verifyChecksum, DataNode datanode) throws IOException {
-    this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK,
-         verifyChecksum, datanode, null);
-  }
-
+  /**
+   * Constructor
+   * 
+   * @param block Block that is being read
+   * @param startOffset starting offset to read from
+   * @param length length of data to read
+   * @param corruptChecksumOk
+   * @param chunkOffsetOK need to send check offset in checksum header
+   * @param verifyChecksum verify checksum while reading the data
+   * @param datanode datanode from which the block is being read
+   * @param clientTraceFmt format string used to print client trace logs
+   * @throws IOException
+   */
   BlockSender(ExtendedBlock block, long startOffset, long length,
               boolean corruptChecksumOk, boolean chunkOffsetOK,
               boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
       throws IOException {
     try {
       this.block = block;
+      this.chunkOffsetOK = chunkOffsetOK;
+      this.corruptChecksumOk = corruptChecksumOk;
+      this.verifyChecksum = verifyChecksum;
+      this.clientTraceFmt = clientTraceFmt;
+      
       synchronized(datanode.data) { 
-        this.replica = datanode.data.getReplica(block.getBlockPoolId(), 
-            block.getBlockId());
-        if (replica == null) {
-          throw new ReplicaNotFoundException(block);
-        }
+        this.replica = getReplica(block, datanode);
         this.replicaVisibleLength = replica.getVisibleLength();
       }
-      long minEndOffset = startOffset + length;
-      // if this is a write in progress
+      // if there is a write in progress
       ChunkChecksum chunkChecksum = null;
       if (replica instanceof ReplicaBeingWritten) {
-        for (int i = 0; i < 30 && replica.getBytesOnDisk() < minEndOffset; i++) {
-          try {
-            Thread.sleep(100);
-          } catch (InterruptedException ie) {
-            throw new IOException(ie);
-          }
-        }
-
-        long currentBytesOnDisk = replica.getBytesOnDisk();
-        
-        if (currentBytesOnDisk < minEndOffset) {
-          throw new IOException(String.format(
-            "need %d bytes, but only %d bytes available",
-            minEndOffset,
-            currentBytesOnDisk
-          ));
-        }
-
+        long minEndOffset = startOffset + length;
+        waitForMinLength((ReplicaBeingWritten)replica, minEndOffset);
         ReplicaInPipeline rip = (ReplicaInPipeline) replica;
         chunkChecksum = rip.getLastChecksumAndDataLen();
       }
 
       if (replica.getGenerationStamp() < block.getGenerationStamp()) {
-        throw new IOException(
-            "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+        throw new IOException("Replica gen stamp < block genstamp, block="
             + block + ", replica=" + replica);
       }
       if (replicaVisibleLength < 0) {
-        throw new IOException("The replica is not readable, block="
+        throw new IOException("Replica is not readable, block="
             + block + ", replica=" + replica);
       }
       if (DataNode.LOG.isDebugEnabled()) {
         DataNode.LOG.debug("block=" + block + ", replica=" + replica);
       }
-      
-      this.chunkOffsetOK = chunkOffsetOK;
-      this.corruptChecksumOk = corruptChecksumOk;
-      this.verifyChecksum = verifyChecksum;
 
       // transferToFully() fails on 32 bit platforms for block sizes >= 2GB,
       // use normal transfer in those cases
       this.transferToAllowed = datanode.transferToAllowed &&
-        (!is32Bit || length < (long) Integer.MAX_VALUE);
-      this.clientTraceFmt = clientTraceFmt;
+        (!is32Bit || length <= Integer.MAX_VALUE);
 
-      if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
+      DataChecksum csum;
+      if (!corruptChecksumOk || datanode.data.metaFileExists(block)) {
         checksumIn = new DataInputStream(new BufferedInputStream(datanode.data
             .getMetaDataInputStream(block), HdfsConstants.IO_FILE_BUFFER_SIZE));
 
         // read and handle the common header here. For now just a version
-       BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
-       short version = header.getVersion();
-
+        BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+        short version = header.getVersion();
         if (version != FSDataset.METADATA_VERSION) {
           LOG.warn("Wrong version (" + version + ") for metadata file for "
               + block + " ignoring ...");
         }
-        checksum = header.getChecksum();
+        csum = header.getChecksum();
       } else {
         LOG.warn("Could not find metadata file for " + block);
         // This only decides the buffer size. Use BUFFER_SIZE?
-        checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
+        csum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
             16 * 1024);
       }
 
-      /* If bytesPerChecksum is very large, then the metadata file
-       * is mostly corrupted. For now just truncate bytesPerchecksum to
-       * blockLength.
-       */        
-      bytesPerChecksum = checksum.getBytesPerChecksum();
-      if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) {
-        checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
+      /*
+       * If chunkSize is very large, then the metadata file is mostly
+       * corrupted. For now just truncate bytesPerchecksum to blockLength.
+       */       
+      int size = csum.getBytesPerChecksum();
+      if (size > 10*1024*1024 && size > replicaVisibleLength) {
+        csum = DataChecksum.newDataChecksum(csum.getChecksumType(),
             Math.max((int)replicaVisibleLength, 10*1024*1024));
-        bytesPerChecksum = checksum.getBytesPerChecksum();        
+        size = csum.getBytesPerChecksum();        
       }
+      chunkSize = size;
+      checksum = csum;
       checksumSize = checksum.getChecksumSize();
-
-      if (length < 0) {
-        length = replicaVisibleLength;
-      }
+      length = length < 0 ? replicaVisibleLength : length;
 
       // end is either last byte on disk or the length for which we have a 
       // checksum
-      if (chunkChecksum != null) {
-        endOffset = chunkChecksum.getDataLength();
-      } else {
-        endOffset = replica.getBytesOnDisk();
-      }
-      
-      if (startOffset < 0 || startOffset > endOffset
-          || (length + startOffset) > endOffset) {
+      long end = chunkChecksum != null ? chunkChecksum.getDataLength()
+          : replica.getBytesOnDisk();
+      if (startOffset < 0 || startOffset > end
+          || (length + startOffset) > end) {
         String msg = " Offset " + startOffset + " and length " + length
-        + " don't match block " + block + " ( blockLen " + endOffset + " )";
+        + " don't match block " + block + " ( blockLen " + end + " )";
         LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) +
             ":sendBlock() : " + msg);
         throw new IOException(msg);
       }
       
-      offset = (startOffset - (startOffset % bytesPerChecksum));
+      // Ensure read offset is position at the beginning of chunk
+      offset = startOffset - (startOffset % chunkSize);
       if (length >= 0) {
-        // Make sure endOffset points to end of a checksumed chunk.
+        // Ensure endOffset points to end of chunk.
         long tmpLen = startOffset + length;
-        if (tmpLen % bytesPerChecksum != 0) {
-          tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
+        if (tmpLen % chunkSize != 0) {
+          tmpLen += (chunkSize - tmpLen % chunkSize);
         }
-        if (tmpLen < endOffset) {
+        if (tmpLen < end) {
           // will use on-disk checksum here since the end is a stable chunk
-          endOffset = tmpLen;
+          end = tmpLen;
         } else if (chunkChecksum != null) {
-          //in last chunk which is changing. flag that we need to use in-memory 
-          // checksum 
+          // last chunk is changing. flag that we need to use in-memory checksum 
           this.lastChunkChecksum = chunkChecksum;
         }
       }
+      endOffset = end;
 
       // seek to the right offsets
       if (offset > 0) {
-        long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
+        long checksumSkip = (offset / chunkSize) * checksumSize;
         // note blockInStream is seeked when created below
         if (checksumSkip > 0) {
           // Should we use seek() for checksum file as well?
@@ -237,7 +276,6 @@ class BlockSender implements java.io.Clo
       if (DataNode.LOG.isDebugEnabled()) {
         DataNode.LOG.debug("replica=" + replica);
       }
-
       blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
     } catch (IOException ioe) {
       IOUtils.closeStream(this);
@@ -251,19 +289,17 @@ class BlockSender implements java.io.Clo
    */
   public void close() throws IOException {
     IOException ioe = null;
-    // close checksum file
     if(checksumIn!=null) {
       try {
-        checksumIn.close();
+        checksumIn.close(); // close checksum file
       } catch (IOException e) {
         ioe = e;
       }
       checksumIn = null;
-    }
-    // close data file
+    }   
     if(blockIn!=null) {
       try {
-        blockIn.close();
+        blockIn.close(); // close data file
       } catch (IOException e) {
         ioe = e;
       }
@@ -274,7 +310,41 @@ class BlockSender implements java.io.Clo
       throw ioe;
     }
   }
-
+  
+  private static Replica getReplica(ExtendedBlock block, DataNode datanode)
+      throws ReplicaNotFoundException {
+    Replica replica = datanode.data.getReplica(block.getBlockPoolId(),
+        block.getBlockId());
+    if (replica == null) {
+      throw new ReplicaNotFoundException(block);
+    }
+    return replica;
+  }
+  
+  /**
+   * Wait for rbw replica to reach the length
+   * @param rbw replica that is being written to
+   * @param len minimum length to reach
+   * @throws IOException on failing to reach the len in given wait time
+   */
+  private static void waitForMinLength(ReplicaBeingWritten rbw, long len)
+      throws IOException {
+    // Wait for 3 seconds for rbw replica to reach the minimum length
+    for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
+    }
+    long bytesOnDisk = rbw.getBytesOnDisk();
+    if (bytesOnDisk < len) {
+      throw new IOException(
+          String.format("Need %d bytes, but only %d bytes available", len,
+              bytesOnDisk));
+    }
+  }
+  
   /**
    * Converts an IOExcpetion (not subclasses) to SocketException.
    * This is typically done to indicate to upper layers that the error 
@@ -296,54 +366,43 @@ class BlockSender implements java.io.Clo
   }
 
   /**
-   * Sends upto maxChunks chunks of data.
+   * @param datalen Length of data 
+   * @return number of chunks for data of given size
+   */
+  private int numberOfChunks(long datalen) {
+    return (int) ((datalen + chunkSize - 1)/chunkSize);
+  }
+  
+  /**
+   * Sends a packet with up to maxChunks chunks of data.
    * 
-   * When blockInPosition is >= 0, assumes 'out' is a 
-   * {@link SocketOutputStream} and tries 
-   * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
-   * send data (and updates blockInPosition).
-   */
-  private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) 
-                         throws IOException {
-    // Sends multiple chunks in one packet with a single write().
-
-    int len = (int) Math.min(endOffset - offset,
-                             (((long) bytesPerChecksum) * ((long) maxChunks)));
-    int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
-    int packetLen = len + numChunks*checksumSize + 4;
-    boolean lastDataPacket = offset + len == endOffset && len > 0;
-    pkt.clear();
-
+   * @param pkt buffer used for writing packet data
+   * @param maxChunks maximum number of chunks to send
+   * @param out stream to send data to
+   * @param transferTo use transferTo to send data
+   * @param throttler used for throttling data transfer bandwidth
+   */
+  private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
+      boolean transferTo, DataTransferThrottler throttler) throws IOException {
+    int dataLen = (int) Math.min(endOffset - offset,
+                             (chunkSize * (long) maxChunks));
+    
+    int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the packet
+    int checksumDataLen = numChunks * checksumSize;
+    int packetLen = dataLen + checksumDataLen + 4;
+    boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
 
-    PacketHeader header = new PacketHeader(
-      packetLen, offset, seqno, (len == 0), len);
-    header.putInBuffer(pkt);
+    writePacketHeader(pkt, dataLen, packetLen);
 
     int checksumOff = pkt.position();
-    int checksumLen = numChunks * checksumSize;
     byte[] buf = pkt.array();
     
     if (checksumSize > 0 && checksumIn != null) {
-      try {
-        checksumIn.readFully(buf, checksumOff, checksumLen);
-      } catch (IOException e) {
-        LOG.warn(" Could not read or failed to veirfy checksum for data"
-            + " at offset " + offset + " for block " + block, e);
-        IOUtils.closeStream(checksumIn);
-        checksumIn = null;
-        if (corruptChecksumOk) {
-          if (checksumOff < checksumLen) {
-            // Just fill the array with zeros.
-            Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
-          }
-        } else {
-          throw e;
-        }
-      }
+      readChecksum(buf, checksumOff, checksumDataLen);
 
       // write in progress that we need to use to get last checksum
       if (lastDataPacket && lastChunkChecksum != null) {
-        int start = checksumOff + checksumLen - checksumSize;
+        int start = checksumOff + checksumDataLen - checksumSize;
         byte[] updatedChecksum = lastChunkChecksum.getChecksum();
         
         if (updatedChecksum != null) {
@@ -352,52 +411,28 @@ class BlockSender implements java.io.Clo
       }
     }
     
-    int dataOff = checksumOff + checksumLen;
-    
-    if (blockInPosition < 0) {
-      //normal transfer
-      IOUtils.readFully(blockIn, buf, dataOff, len);
+    int dataOff = checksumOff + checksumDataLen;
+    if (!transferTo) { // normal transfer
+      IOUtils.readFully(blockIn, buf, dataOff, dataLen);
 
       if (verifyChecksum) {
-        int dOff = dataOff;
-        int cOff = checksumOff;
-        int dLeft = len;
-
-        for (int i=0; i<numChunks; i++) {
-          checksum.reset();
-          int dLen = Math.min(dLeft, bytesPerChecksum);
-          checksum.update(buf, dOff, dLen);
-          if (!checksum.compare(buf, cOff)) {
-            long failedPos = offset + len -dLeft;
-            throw new ChecksumException("Checksum failed at " + 
-                                        failedPos, failedPos);
-          }
-          dLeft -= dLen;
-          dOff += dLen;
-          cOff += checksumSize;
-        }
+        verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
       }
-      //writing is done below (mainly to handle IOException)
     }
     
     try {
-      if (blockInPosition >= 0) {
-        //use transferTo(). Checks on out and blockIn are already done. 
-
+      if (transferTo) {
         SocketOutputStream sockOut = (SocketOutputStream)out;
-        //first write the packet
-        sockOut.write(buf, 0, dataOff);
+        sockOut.write(buf, 0, dataOff); // First write checksum
+        
         // no need to flush. since we know out is not a buffered stream. 
-
         sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), 
-                                blockInPosition, len);
-
-        blockInPosition += len;
-      } else {
+                                blockInPosition, dataLen);
+        blockInPosition += dataLen;
+      } else { 
         // normal transfer
-        out.write(buf, 0, dataOff + len);
+        out.write(buf, 0, dataOff + dataLen);
       }
-      
     } catch (IOException e) {
       /* Exception while writing to the client. Connection closure from
        * the other end is mostly the case and we do not care much about
@@ -419,9 +454,72 @@ class BlockSender implements java.io.Clo
       throttler.throttle(packetLen);
     }
 
-    return len;
+    return dataLen;
   }
-
+  
+  /**
+   * Read checksum into given buffer
+   * @param buf buffer to read the checksum into
+   * @param checksumOffset offset at which to write the checksum into buf
+   * @param checksumLen length of checksum to write
+   * @throws IOException on error
+   */
+  private void readChecksum(byte[] buf, final int checksumOffset,
+      final int checksumLen) throws IOException {
+    if (checksumSize <= 0 && checksumIn == null) {
+      return;
+    }
+    try {
+      checksumIn.readFully(buf, checksumOffset, checksumLen);
+    } catch (IOException e) {
+      LOG.warn(" Could not read or failed to veirfy checksum for data"
+          + " at offset " + offset + " for block " + block, e);
+      IOUtils.closeStream(checksumIn);
+      checksumIn = null;
+      if (corruptChecksumOk) {
+        if (checksumOffset < checksumLen) {
+          // Just fill the array with zeros.
+          Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
+        }
+      } else {
+        throw e;
+      }
+    }
+  }
+  
+  /**
+   * Compute checksum for chunks and verify the checksum that is read from
+   * the metadata file is correct.
+   * 
+   * @param buf buffer that has checksum and data
+   * @param dataOffset position where data is written in the buf
+   * @param datalen length of data
+   * @param numChunks number of chunks corresponding to data
+   * @param checksumOffset offset where checksum is written in the buf
+   * @throws ChecksumException on failed checksum verification
+   */
+  public void verifyChecksum(final byte[] buf, final int dataOffset,
+      final int datalen, final int numChunks, final int checksumOffset)
+      throws ChecksumException {
+    int dOff = dataOffset;
+    int cOff = checksumOffset;
+    int dLeft = datalen;
+
+    for (int i = 0; i < numChunks; i++) {
+      checksum.reset();
+      int dLen = Math.min(dLeft, chunkSize);
+      checksum.update(buf, dOff, dLen);
+      if (!checksum.compare(buf, cOff)) {
+        long failedPos = offset + datalen - dLeft;
+        throw new ChecksumException("Checksum failed at " + failedPos,
+            failedPos);
+      }
+      dLeft -= dLen;
+      dOff += dLen;
+      cOff += checksumSize;
+    }
+  }
+  
   /**
    * sendBlock() is used to read block and its metadata and stream the data to
    * either a client or to another datanode. 
@@ -433,70 +531,54 @@ class BlockSender implements java.io.Clo
    *        {@link SocketOutputStream#transferToFully(FileChannel, 
    *        long, int)}.
    * @param throttler for sending data.
-   * @return total bytes reads, including crc.
+   * @return total bytes read, including checksum data.
    */
   long sendBlock(DataOutputStream out, OutputStream baseStream, 
                  DataTransferThrottler throttler) throws IOException {
-    if( out == null ) {
+    if (out == null) {
       throw new IOException( "out stream is null" );
     }
-    this.throttler = throttler;
-
-    long initialOffset = offset;
+    final long initialOffset = offset;
     long totalRead = 0;
     OutputStream streamForSendChunks = out;
     
     final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
     try {
-      try {
-        checksum.writeHeader(out);
-        if ( chunkOffsetOK ) {
-          out.writeLong( offset );
-        }
-        out.flush();
-      } catch (IOException e) { //socket error
-        throw ioeToSocketException(e);
-      }
+      writeChecksumHeader(out);
       
       int maxChunksPerPacket;
       int pktSize = PacketHeader.PKT_HEADER_LEN;
-      
-      if (transferToAllowed && !verifyChecksum && 
-          baseStream instanceof SocketOutputStream && 
-          blockIn instanceof FileInputStream) {
-        
+      boolean transferTo = transferToAllowed && !verifyChecksum
+          && baseStream instanceof SocketOutputStream
+          && blockIn instanceof FileInputStream;
+      if (transferTo) {
         FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
-        
-        // blockInPosition also indicates sendChunks() uses transferTo.
         blockInPosition = fileChannel.position();
         streamForSendChunks = baseStream;
+        maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
         
-        // assure a mininum buffer size.
-        maxChunksPerPacket = (Math.max(HdfsConstants.IO_FILE_BUFFER_SIZE, 
-                                       MIN_BUFFER_WITH_TRANSFERTO)
-                              + bytesPerChecksum - 1)/bytesPerChecksum;
-        
-        // allocate smaller buffer while using transferTo(). 
+        // Smaller packet size to only hold checksum when doing transferTo
         pktSize += checksumSize * maxChunksPerPacket;
       } else {
-        maxChunksPerPacket = Math.max(1, (HdfsConstants.IO_FILE_BUFFER_SIZE
-            + bytesPerChecksum - 1) / bytesPerChecksum);
-        pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
+        maxChunksPerPacket = Math.max(1,
+            numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
+        // Packet size includes both checksum and data
+        pktSize += (chunkSize + checksumSize) * maxChunksPerPacket;
       }
 
       ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
 
       while (endOffset > offset) {
-        long len = sendChunks(pktBuf, maxChunksPerPacket, 
-                              streamForSendChunks);
+        long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
+            transferTo, throttler);
         offset += len;
-        totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
-                            checksumSize);
+        totalRead += len + (numberOfChunks(len) * checksumSize);
         seqno++;
       }
       try {
         // send an empty packet to mark the end of the block
-        sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);        
+        sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
+            throttler);
         out.flush();
       } catch (IOException e) { //socket error
         throw ioeToSocketException(e);
@@ -506,14 +588,39 @@ class BlockSender implements java.io.Clo
     } finally {
       if (clientTraceFmt != null) {
         final long endTime = System.nanoTime();
-        ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime));
+        ClientTraceLog.info(String.format(clientTraceFmt, totalRead,
+            initialOffset, endTime - startTime));
       }
       close();
     }
-
     return totalRead;
   }
   
+  /**
+   * Write checksum header to the output stream
+   */
+  private void writeChecksumHeader(DataOutputStream out) throws IOException {
+    try {
+      checksum.writeHeader(out);
+      if (chunkOffsetOK) {
+        out.writeLong(offset);
+      }
+      out.flush();
+    } catch (IOException e) { //socket error
+      throw ioeToSocketException(e);
+    }
+  }
+    
+  /**
+   * Write packet header into {@code pkt}
+   */
+  private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
+    pkt.clear();
+    PacketHeader header = new PacketHeader(packetLen, offset, seqno,
+        (dataLen == 0), dataLen);
+    header.putInBuffer(pkt);
+  }
+  
   boolean didSendEntireByteRange() {
     return sentEntireByteRange;
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Oct  6 01:16:48 2011
@@ -2058,7 +2058,7 @@ public class DataNode extends Configured
         out = new DataOutputStream(new BufferedOutputStream(baseStream,
             HdfsConstants.SMALL_BUFFER_SIZE));
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
-            false, false, false, DataNode.this);
+            false, false, false, DataNode.this, null);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
 
         //

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu Oct  6 01:16:48 2011
@@ -597,7 +597,7 @@ class DataXceiver extends Receiver imple
     try {
       // check if the block exists or not
       blockSender = new BlockSender(block, 0, -1, false, false, false, 
-          datanode);
+          datanode, null);
 
       // set up response stream
       OutputStream baseStream = NetUtils.getOutputStream(

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Thu Oct  6 01:16:48 2011
@@ -46,10 +46,12 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.web.JsonUtil;
 import org.apache.hadoop.hdfs.web.ParamFilter;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
@@ -219,13 +221,13 @@ public class DatanodeWebHdfsMethods {
 
     final String fullpath = path.getAbsolutePath();
     final DataNode datanode = (DataNode)context.getAttribute("datanode");
+    final Configuration conf = new Configuration(datanode.getConf());
+    final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
+    final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
 
     switch(op.getValue()) {
     case OPEN:
     {
-      final Configuration conf = new Configuration(datanode.getConf());
-      final InetSocketAddress nnRpcAddr = NameNode.getAddress(conf);
-      final DFSClient dfsclient = new DFSClient(nnRpcAddr, conf);
       final int b = bufferSize.getValue(conf);
       final DFSDataInputStream in = new DFSClient.DFSDataInputStream(
           dfsclient.open(fullpath, b, true));
@@ -244,6 +246,12 @@ public class DatanodeWebHdfsMethods {
       };
       return Response.ok(streaming).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
+    case GETFILECHECKSUM:
+    {
+      final MD5MD5CRC32FileChecksum checksum = dfsclient.getFileChecksum(fullpath);
+      final String js = JsonUtil.toJsonString(checksum);
+      return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+    }
     default:
       throw new UnsupportedOperationException(op + " is not supported");
     }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Thu Oct  6 01:16:48 2011
@@ -54,7 +54,6 @@ class EditLogBackupOutputStream extends 
     this.nnRegistration = nnReg;
     InetSocketAddress bnAddress =
       NetUtils.createSocketAddr(bnRegistration.getAddress());
-    Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress);
     try {
       this.backupNode =
         RPC.getProxy(JournalProtocol.class,
@@ -67,16 +66,6 @@ class EditLogBackupOutputStream extends 
     this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
   }
   
-  @Override // JournalStream
-  public String getName() {
-    return bnRegistration.getAddress();
-  }
-
-  @Override // JournalStream
-  public JournalType getType() {
-    return JournalType.BACKUP;
-  }
-
   @Override // EditLogOutputStream
   void write(FSEditLogOp op) throws IOException {
     doubleBuf.writeOp(op);
@@ -142,16 +131,6 @@ class EditLogBackupOutputStream extends 
   }
 
   /**
-   * There is no persistent storage. Therefore length is 0.<p>
-   * Length is used to check when it is large enough to start a checkpoint.
-   * This criteria should not be used for backup streams.
-   */
-  @Override // EditLogOutputStream
-  long length() throws IOException {
-    return 0;
-  }
-
-  /**
    * Get backup node registration.
    */
   NamenodeRegistration getRegistration() {

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Thu Oct  6 01:16:48 2011
@@ -37,9 +37,7 @@ import com.google.common.annotations.Vis
  * stores edits in a local file.
  */
 class EditLogFileOutputStream extends EditLogOutputStream {
-  private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);;
-
-  private static int EDITS_FILE_HEADER_SIZE_BYTES = Integer.SIZE / Byte.SIZE;
+  private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
 
   private File file;
   private FileOutputStream fp; // file stream for storing edit logs
@@ -73,16 +71,6 @@ class EditLogFileOutputStream extends Ed
     fc.position(fc.size());
   }
 
-  @Override // JournalStream
-  public String getName() {
-    return file.getPath();
-  }
-
-  @Override // JournalStream
-  public JournalType getType() {
-    return JournalType.FILE;
-  }
-
   /** {@inheritDoc} */
   @Override
   void write(FSEditLogOp op) throws IOException {
@@ -176,7 +164,10 @@ class EditLogFileOutputStream extends Ed
     if (fp == null) {
       throw new IOException("Trying to use aborted output stream");
     }
-    
+    if (doubleBuf.isFlushed()) {
+      LOG.info("Nothing to flush");
+      return;
+    }
     preallocate(); // preallocate file if necessary
     doubleBuf.flushTo(fp);
     fc.force(false); // metadata updates not needed because of preallocation
@@ -190,16 +181,6 @@ class EditLogFileOutputStream extends Ed
   public boolean shouldForceSync() {
     return doubleBuf.shouldForceSync();
   }
-  
-  /**
-   * Return the size of the current edit log including buffered data.
-   */
-  @Override
-  long length() throws IOException {
-    // file size - header size + size of both buffers
-    return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES + 
-      doubleBuf.countBufferedBytes();
-  }
 
   // allocate a big chunk of data
   private void preallocate() throws IOException {

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Thu Oct  6 01:16:48 2011
@@ -18,23 +18,20 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
-import java.util.zip.Checksum;
 
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Writable;
 
 /**
  * A generic abstract class to support journaling of edits logs into 
  * a persistent storage.
  */
-abstract class EditLogOutputStream implements JournalStream {
+abstract class EditLogOutputStream {
   // these are statistics counters
   private long numSync;        // number of sync(s) to disk
   private long totalTimeSync;  // total time to sync
 
-  EditLogOutputStream() throws IOException {
+  EditLogOutputStream() {
     numSync = totalTimeSync = 0;
   }
 
@@ -106,12 +103,6 @@ abstract class EditLogOutputStream imple
   }
 
   /**
-   * Return the size of the current edits log.
-   * Length is used to check when it is large enough to start a checkpoint.
-   */
-  abstract long length() throws IOException;
-
-  /**
    * Implement the policy when to automatically sync the buffered edits log
    * The buffered edits can be flushed when the buffer becomes full or
    * a certain period of time is elapsed.
@@ -132,12 +123,7 @@ abstract class EditLogOutputStream imple
   /**
    * Return number of calls to {@link #flushAndSync()}
    */
-  long getNumSync() {
+  protected long getNumSync() {
     return numSync;
   }
-
-  @Override // Object
-  public String toString() {
-    return getName();
-  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Oct  6 01:16:48 2011
@@ -17,12 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
-import java.util.SortedSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,25 +34,17 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Sets;
-
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
 
 /**
  * FSEditLog maintains a log of the namespace modifications.
@@ -62,9 +54,6 @@ import org.apache.hadoop.hdfs.server.nam
 @InterfaceStability.Evolving
 public class FSEditLog  {
 
-  static final String NO_JOURNAL_STREAMS_WARNING = "!!! WARNING !!!" +
-      " File system changes are not persistent. No journal streams.";
-
   static final Log LOG = LogFactory.getLog(FSEditLog.class);
 
   /**
@@ -82,10 +71,11 @@ public class FSEditLog  {
     CLOSED;
   }  
   private State state = State.UNINITIALIZED;
+  
+  //initialize
+  final private JournalSet journalSet;
+  private EditLogOutputStream editLogStream = null;
 
-
-  private List<JournalAndStream> journals = Lists.newArrayList();
-    
   // a monotonically increasing counter that represents transactionIds.
   private long txid = 0;
 
@@ -137,15 +127,15 @@ public class FSEditLog  {
     this.storage = storage;
     metrics = NameNode.getNameNodeMetrics();
     lastPrintTime = now();
-    
+
+    this.journalSet = new JournalSet();
     for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
-      journals.add(new JournalAndStream(new FileJournalManager(sd)));
+      journalSet.add(new FileJournalManager(sd));
     }
     
-    if (journals.isEmpty()) {
+    if (journalSet.isEmpty()) {
       LOG.error("No edits directories configured!");
-    }
-    
+    } 
     state = State.BETWEEN_LOG_SEGMENTS;
   }
   
@@ -172,9 +162,8 @@ public class FSEditLog  {
       LOG.warn("Closing log when already closed", new Exception());
       return;
     }
-    
     if (state == State.IN_SEGMENT) {
-      assert !journals.isEmpty();
+      assert editLogStream != null;
       waitForSyncToFinish();
       endCurrentLogSegment(true);
     }
@@ -193,20 +182,14 @@ public class FSEditLog  {
       // wait if an automatic sync is scheduled
       waitIfAutoSyncScheduled();
       
-      if (journals.isEmpty()) {
-        throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
-      }
-      
       long start = beginTransaction();
       op.setTransactionId(txid);
 
-      mapJournalsAndReportErrors(new JournalClosure() {
-        @Override 
-        public void apply(JournalAndStream jas) throws IOException {
-          if (!jas.isActive()) return;
-          jas.stream.write(op);
-        }
-      }, "logging edit");
+      try {
+        editLogStream.write(op);
+      } catch (IOException ex) {
+        // All journals failed, it is handled in logSync.
+      }
 
       endTransaction(start);
       
@@ -251,14 +234,7 @@ public class FSEditLog  {
    * @return true if any of the edit stream says that it should sync
    */
   private boolean shouldForceSync() {
-    for (JournalAndStream jas : journals) {
-      if (!jas.isActive()) continue;
-
-      if (jas.getCurrentStream().shouldForceSync()) {
-        return true;
-      }
-    }
-    return false;
+    return editLogStream.shouldForceSync();
   }
   
   private long beginTransaction() {
@@ -322,7 +298,7 @@ public class FSEditLog  {
    * NOTE: this should be done while holding the FSNamesystem lock, or
    * else more operations can start writing while this is in progress.
    */
-  void logSyncAll() throws IOException {
+  void logSyncAll() {
     // Record the most recent transaction ID as our own id
     synchronized (this) {
       TransactionId id = myTransactionId.get();
@@ -366,74 +342,73 @@ public class FSEditLog  {
     // Fetch the transactionId of this thread. 
     long mytxid = myTransactionId.get().txid;
     
-    List<JournalAndStream> candidateJournals =
-      Lists.newArrayListWithCapacity(journals.size());
-    List<JournalAndStream> badJournals = Lists.newArrayList();
-    
     boolean sync = false;
     try {
+      EditLogOutputStream logStream = null;
       synchronized (this) {
         try {
-        printStatistics(false);
-  
-        // if somebody is already syncing, then wait
-        while (mytxid > synctxid && isSyncRunning) {
-          try {
-            wait(1000);
-          } catch (InterruptedException ie) { 
+          printStatistics(false);
+
+          // if somebody is already syncing, then wait
+          while (mytxid > synctxid && isSyncRunning) {
+            try {
+              wait(1000);
+            } catch (InterruptedException ie) {
+            }
           }
-        }
   
-        //
-        // If this transaction was already flushed, then nothing to do
-        //
-        if (mytxid <= synctxid) {
-          numTransactionsBatchedInSync++;
-          if (metrics != null) // Metrics is non-null only when used inside name node
-            metrics.incrTransactionsBatchedInSync();
-          return;
-        }
+          //
+          // If this transaction was already flushed, then nothing to do
+          //
+          if (mytxid <= synctxid) {
+            numTransactionsBatchedInSync++;
+            if (metrics != null) {
+              // Metrics is non-null only when used inside name node
+              metrics.incrTransactionsBatchedInSync();
+            }
+            return;
+          }
      
-        // now, this thread will do the sync
-        syncStart = txid;
-        isSyncRunning = true;
-        sync = true;
-  
-        // swap buffers
-        assert !journals.isEmpty() : "no editlog streams";
-        
-        for (JournalAndStream jas : journals) {
-          if (!jas.isActive()) continue;
+          // now, this thread will do the sync
+          syncStart = txid;
+          isSyncRunning = true;
+          sync = true;
+  
+          // swap buffers
           try {
-            jas.getCurrentStream().setReadyToFlush();
-            candidateJournals.add(jas);
-          } catch (IOException ie) {
-            LOG.error("Unable to get ready to flush.", ie);
-            badJournals.add(jas);
+            if (journalSet.isEmpty()) {
+              throw new IOException("No journals available to flush");
+            }
+            editLogStream.setReadyToFlush();
+          } catch (IOException e) {
+            LOG.fatal("Could not sync any journal to persistent storage. "
+                + "Unsynced transactions: " + (txid - synctxid),
+                new Exception());
+            runtime.exit(1);
           }
-        }
         } finally {
           // Prevent RuntimeException from blocking other log edit write 
           doneWithAutoSyncScheduling();
         }
+        //editLogStream may become null,
+        //so store a local variable for flush.
+        logStream = editLogStream;
       }
-  
+      
       // do the sync
       long start = now();
-      for (JournalAndStream jas : candidateJournals) {
-        if (!jas.isActive()) continue;
-        try {
-          jas.getCurrentStream().flush();
-        } catch (IOException ie) {
-          LOG.error("Unable to sync edit log.", ie);
-          //
-          // remember the streams that encountered an error.
-          //
-          badJournals.add(jas);
+      try {
+        if (logStream != null) {
+          logStream.flush();
+        }
+      } catch (IOException ex) {
+        synchronized (this) {
+          LOG.fatal("Could not sync any journal to persistent storage. "
+              + "Unsynced transactions: " + (txid - synctxid), new Exception());
+          runtime.exit(1);
         }
       }
       long elapsed = now() - start;
-      disableAndReportErrorOnJournals(badJournals);
   
       if (metrics != null) { // Metrics non-null only when used inside name node
         metrics.addSync(elapsed);
@@ -443,13 +418,6 @@ public class FSEditLog  {
       // Prevent RuntimeException from blocking other log edit sync 
       synchronized (this) {
         if (sync) {
-          if (badJournals.size() >= journals.size()) {
-            LOG.fatal("Could not sync any journal to persistent storage. " +
-                "Unsynced transactions: " + (txid - synctxid),
-                new Exception());
-            runtime.exit(1);
-          }
-
           synctxid = syncStart;
           isSyncRunning = false;
         }
@@ -466,9 +434,6 @@ public class FSEditLog  {
     if (lastPrintTime + 60000 > now && !force) {
       return;
     }
-    if (journals.isEmpty()) {
-      return;
-    }
     lastPrintTime = now;
     StringBuilder buf = new StringBuilder();
     buf.append("Number of transactions: ");
@@ -478,20 +443,9 @@ public class FSEditLog  {
     buf.append("Number of transactions batched in Syncs: ");
     buf.append(numTransactionsBatchedInSync);
     buf.append(" Number of syncs: ");
-    for (JournalAndStream jas : journals) {
-      if (!jas.isActive()) continue;
-      buf.append(jas.getCurrentStream().getNumSync());
-      break;
-    }
-
+    buf.append(editLogStream.getNumSync());
     buf.append(" SyncTimes(ms): ");
-
-    for (JournalAndStream jas : journals) {
-      if (!jas.isActive()) continue;
-      EditLogOutputStream eStream = jas.getCurrentStream();
-      buf.append(eStream.getTotalSyncTime());
-      buf.append(" ");
-    }
+    buf.append(journalSet.getSyncTimes());
     LOG.info(buf);
   }
 
@@ -664,7 +618,6 @@ public class FSEditLog  {
    * log delegation token to edit log
    * @param id DelegationTokenIdentifier
    * @param expiryTime of the token
-   * @return
    */
   void logGetDelegationToken(DelegationTokenIdentifier id,
       long expiryTime) {
@@ -703,24 +656,11 @@ public class FSEditLog  {
   }
   
   /**
-   * @return the number of active (non-failed) journals
-   */
-  private int countActiveJournals() {
-    int count = 0;
-    for (JournalAndStream jas : journals) {
-      if (jas.isActive()) {
-        count++;
-      }
-    }
-    return count;
-  }
-  
-  /**
    * Used only by unit tests.
    */
   @VisibleForTesting
   List<JournalAndStream> getJournals() {
-    return journals;
+    return journalSet.getAllJournalStreams();
   }
   
   /**
@@ -734,62 +674,9 @@ public class FSEditLog  {
   /**
    * Return a manifest of what finalized edit logs are available
    */
-  public synchronized RemoteEditLogManifest getEditLogManifest(
-      long fromTxId) throws IOException {
-    // Collect RemoteEditLogs available from each FileJournalManager
-    List<RemoteEditLog> allLogs = Lists.newArrayList();
-    for (JournalAndStream j : journals) {
-      if (j.getManager() instanceof FileJournalManager) {
-        FileJournalManager fjm = (FileJournalManager)j.getManager();
-        try {
-          allLogs.addAll(fjm.getRemoteEditLogs(fromTxId));
-        } catch (Throwable t) {
-          LOG.warn("Cannot list edit logs in " + fjm, t);
-        }
-      }
-    }
-    
-    // Group logs by their starting txid
-    ImmutableListMultimap<Long, RemoteEditLog> logsByStartTxId =
-      Multimaps.index(allLogs, RemoteEditLog.GET_START_TXID);
-    long curStartTxId = fromTxId;
-
-    List<RemoteEditLog> logs = Lists.newArrayList();
-    while (true) {
-      ImmutableList<RemoteEditLog> logGroup = logsByStartTxId.get(curStartTxId);
-      if (logGroup.isEmpty()) {
-        // we have a gap in logs - for example because we recovered some old
-        // storage directory with ancient logs. Clear out any logs we've
-        // accumulated so far, and then skip to the next segment of logs
-        // after the gap.
-        SortedSet<Long> startTxIds = Sets.newTreeSet(logsByStartTxId.keySet());
-        startTxIds = startTxIds.tailSet(curStartTxId);
-        if (startTxIds.isEmpty()) {
-          break;
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Found gap in logs at " + curStartTxId + ": " +
-                "not returning previous logs in manifest.");
-          }
-          logs.clear();
-          curStartTxId = startTxIds.first();
-          continue;
-        }
-      }
-
-      // Find the one that extends the farthest forward
-      RemoteEditLog bestLog = Collections.max(logGroup);
-      logs.add(bestLog);
-      // And then start looking from after that point
-      curStartTxId = bestLog.getEndTxId() + 1;
-    }
-    RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
-    
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Generated manifest for logs since " + fromTxId + ":"
-          + ret);      
-    }
-    return ret;
+  public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId)
+      throws IOException {
+    return journalSet.getEditLogManifest(fromTxId);
   }
  
   /**
@@ -832,14 +719,9 @@ public class FSEditLog  {
     // See HDFS-2174.
     storage.attemptRestoreRemovedStorage();
     
-    mapJournalsAndReportErrors(new JournalClosure() {
-      @Override
-      public void apply(JournalAndStream jas) throws IOException {
-        jas.startLogSegment(segmentTxId);
-      }
-    }, "starting log segment " + segmentTxId);
-
-    if (countActiveJournals() == 0) {
+    try {
+      editLogStream = journalSet.startLogSegment(segmentTxId);
+    } catch (IOException ex) {
       throw new IOException("Unable to start log segment " +
           segmentTxId + ": no journals successfully started.");
     }
@@ -873,14 +755,12 @@ public class FSEditLog  {
     
     final long lastTxId = getLastWrittenTxId();
     
-    mapJournalsAndReportErrors(new JournalClosure() {
-      @Override
-      public void apply(JournalAndStream jas) throws IOException {
-        if (jas.isActive()) {
-          jas.close(lastTxId);
-        }
-      }
-    }, "ending log segment");
+    try {
+      journalSet.finalizeLogSegment(curSegmentTxId, lastTxId);
+      editLogStream = null;
+    } catch (IOException e) {
+      //All journals have failed, it will be handled in logSync.
+    }
     
     state = State.BETWEEN_LOG_SEGMENTS;
   }
@@ -889,14 +769,15 @@ public class FSEditLog  {
    * Abort all current logs. Called from the backup node.
    */
   synchronized void abortCurrentLogSegment() {
-    mapJournalsAndReportErrors(new JournalClosure() {
-      
-      @Override
-      public void apply(JournalAndStream jas) throws IOException {
-        jas.abort();
+    try {
+      //Check for null, as abort can be called any time.
+      if (editLogStream != null) {
+        editLogStream.abort();
+        editLogStream = null;
       }
-    }, "aborting all streams");
-    state = State.BETWEEN_LOG_SEGMENTS;
+    } catch (IOException e) {
+      LOG.warn("All journals failed to abort", e);
+    }
   }
 
   /**
@@ -912,13 +793,12 @@ public class FSEditLog  {
         "cannot purge logs older than txid " + minTxIdToKeep +
         " when current segment starts at " + curSegmentTxId;
     }
-    
-    mapJournalsAndReportErrors(new JournalClosure() {
-      @Override
-      public void apply(JournalAndStream jas) throws IOException {
-        jas.manager.purgeLogsOlderThan(minTxIdToKeep);
-      }
-    }, "purging logs older than " + minTxIdToKeep);
+
+    try {
+      journalSet.purgeLogsOlderThan(minTxIdToKeep);
+    } catch (IOException ex) {
+      //All journals have failed, it will be handled in logSync.
+    }
   }
 
   
@@ -946,9 +826,7 @@ public class FSEditLog  {
 
   // sets the initial capacity of the flush buffer.
   public void setOutputBufferCapacity(int size) {
-    for (JournalAndStream jas : journals) {
-      jas.manager.setOutputBufferCapacity(size);
-    }
+      journalSet.setOutputBufferCapacity(size);
   }
 
   /**
@@ -969,7 +847,7 @@ public class FSEditLog  {
     if(bnReg.isRole(NamenodeRole.CHECKPOINT))
       return; // checkpoint node does not stream edits
     
-    JournalAndStream jas = findBackupJournalAndStream(bnReg);
+    JournalManager jas = findBackupJournal(bnReg);
     if (jas != null) {
       // already registered
       LOG.info("Backup node " + bnReg + " re-registers");
@@ -978,35 +856,29 @@ public class FSEditLog  {
     
     LOG.info("Registering new backup node: " + bnReg);
     BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
-    journals.add(new JournalAndStream(bjm));
+    journalSet.add(bjm);
   }
   
-  synchronized void releaseBackupStream(NamenodeRegistration registration) {
-    for (Iterator<JournalAndStream> iter = journals.iterator();
-         iter.hasNext();) {
-      JournalAndStream jas = iter.next();
-      if (jas.manager instanceof BackupJournalManager &&
-          ((BackupJournalManager)jas.manager).matchesRegistration(
-              registration)) {
-        jas.abort();        
-        LOG.info("Removing backup journal " + jas);
-        iter.remove();
-      }
+  synchronized void releaseBackupStream(NamenodeRegistration registration)
+      throws IOException {
+    BackupJournalManager bjm = this.findBackupJournal(registration);
+    if (bjm != null) {
+      LOG.info("Removing backup journal " + bjm);
+      journalSet.remove(bjm);
     }
   }
   
   /**
    * Find the JournalAndStream associated with this BackupNode.
+   * 
    * @return null if it cannot be found
    */
-  private synchronized JournalAndStream findBackupJournalAndStream(
+  private synchronized BackupJournalManager findBackupJournal(
       NamenodeRegistration bnReg) {
-    for (JournalAndStream jas : journals) {
-      if (jas.manager instanceof BackupJournalManager) {
-        BackupJournalManager bjm = (BackupJournalManager)jas.manager;
-        if (bjm.matchesRegistration(bnReg)) {
-          return jas;
-        }
+    for (JournalManager bjm : journalSet.getJournalManagers()) {
+      if ((bjm instanceof BackupJournalManager)
+          && ((BackupJournalManager) bjm).matchesRegistration(bnReg)) {
+        return (BackupJournalManager) bjm;
       }
     }
     return null;
@@ -1018,124 +890,24 @@ public class FSEditLog  {
    */   
   synchronized void logEdit(final int length, final byte[] data) {
     long start = beginTransaction();
-    
-    mapJournalsAndReportErrors(new JournalClosure() {
-      @Override
-      public void apply(JournalAndStream jas) throws IOException {
-        if (jas.isActive()) {
-          jas.getCurrentStream().writeRaw(data, 0, length); // TODO writeRaw
-        }
-      }      
-    }, "Logging edit");
-
-    endTransaction(start);
-  }
 
-  //// Iteration across journals
-  private interface JournalClosure {
-    public void apply(JournalAndStream jas) throws IOException;
-  }
-
-  /**
-   * Apply the given function across all of the journal managers, disabling
-   * any for which the closure throws an IOException.
-   * @param status message used for logging errors (e.g. "opening journal")
-   */
-  private void mapJournalsAndReportErrors(
-      JournalClosure closure, String status) {
-    List<JournalAndStream> badJAS = Lists.newLinkedList();
-    for (JournalAndStream jas : journals) {
-      try {
-        closure.apply(jas);
-      } catch (Throwable t) {
-        LOG.error("Error " + status + " (journal " + jas + ")", t);
-        badJAS.add(jas);
-      }
-    }
-
-    disableAndReportErrorOnJournals(badJAS);
-  }
-  
-  /**
-   * Called when some journals experience an error in some operation.
-   * This propagates errors to the storage level.
-   */
-  private void disableAndReportErrorOnJournals(List<JournalAndStream> badJournals) {
-    if (badJournals == null || badJournals.isEmpty()) {
-      return; // nothing to do
-    }
- 
-    for (JournalAndStream j : badJournals) {
-      LOG.error("Disabling journal " + j);
-      j.abort();
-    }
-  }
-
-  /**
-   * Find the best editlog input stream to read from txid. In this case
-   * best means the editlog which has the largest continuous range of 
-   * transactions starting from the transaction id, fromTxId.
-   *
-   * If a journal throws an CorruptionException while reading from a txn id,
-   * it means that it has more transactions, but can't find any from fromTxId. 
-   * If this is the case and no other journal has transactions, we should throw
-   * an exception as it means more transactions exist, we just can't load them.
-   *
-   * @param fromTxId Transaction id to start from.
-   * @return a edit log input stream with tranactions fromTxId 
-   *         or null if no more exist
-   */
-  private EditLogInputStream selectStream(long fromTxId) 
-      throws IOException {
-    JournalManager bestjm = null;
-    long bestjmNumTxns = 0;
-    CorruptionException corruption = null;
-
-    for (JournalAndStream jas : journals) {
-      JournalManager candidate = jas.getManager();
-      long candidateNumTxns = 0;
-      try {
-        candidateNumTxns = candidate.getNumberOfTransactions(fromTxId);
-      } catch (CorruptionException ce) {
-        corruption = ce;
-      } catch (IOException ioe) {
-        LOG.warn("Error reading number of transactions from " + candidate);
-        continue; // error reading disk, just skip
-      }
-      
-      if (candidateNumTxns > bestjmNumTxns) {
-        bestjm = candidate;
-        bestjmNumTxns = candidateNumTxns;
-      }
-    }
-    
-    
-    if (bestjm == null) {
-      /**
-       * If all candidates either threw a CorruptionException or
-       * found 0 transactions, then a gap exists. 
-       */
-      if (corruption != null) {
-        throw new IOException("Gap exists in logs from " 
-                              + fromTxId, corruption);
-      } else {
-        return null;
-      }
+    try {
+      editLogStream.writeRaw(data, 0, length);
+    } catch (IOException ex) {
+      // All journals have failed, it will be handled in logSync.
     }
-
-    return bestjm.getInputStream(fromTxId);
+    endTransaction(start);
   }
 
   /**
    * Run recovery on all journals to recover any unclosed segments
    */
   void recoverUnclosedStreams() {
-    mapJournalsAndReportErrors(new JournalClosure() {
-        @Override
-        public void apply(JournalAndStream jas) throws IOException {
-          jas.manager.recoverUnfinalizedSegments();
-        }
-      }, "recovering unclosed streams");
+    try {
+      journalSet.recoverUnfinalizedSegments();
+    } catch (IOException ex) {
+      // All journals have failed, it is handled in logSync.
+    }
   }
 
   /**
@@ -1143,23 +915,16 @@ public class FSEditLog  {
    * @param fromTxId first transaction in the selected streams
    * @param toAtLeast the selected streams must contain this transaction
    */
-  Collection<EditLogInputStream> selectInputStreams(long fromTxId, long toAtLeastTxId) 
-      throws IOException {
-    List<EditLogInputStream> streams = Lists.newArrayList();
-    
-    boolean gapFound = false;
-    EditLogInputStream stream = selectStream(fromTxId);
+  Collection<EditLogInputStream> selectInputStreams(long fromTxId,
+      long toAtLeastTxId) throws IOException {
+    List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
+    EditLogInputStream stream = journalSet.getInputStream(fromTxId);
     while (stream != null) {
       fromTxId = stream.getLastTxId() + 1;
       streams.add(stream);
-      try {
-        stream = selectStream(fromTxId);
-      } catch (IOException ioe) {
-        gapFound = true;
-        break;
-      }
+      stream = journalSet.getInputStream(fromTxId);
     }
-    if (fromTxId <= toAtLeastTxId || gapFound) {
+    if (fromTxId <= toAtLeastTxId) {
       closeAllStreams(streams);
       throw new IOException("No non-corrupt logs for txid " 
                             + fromTxId);
@@ -1176,75 +941,4 @@ public class FSEditLog  {
       IOUtils.closeStream(s);
     }
   }
-
-  /**
-   * Container for a JournalManager paired with its currently
-   * active stream.
-   * 
-   * If a Journal gets disabled due to an error writing to its
-   * stream, then the stream will be aborted and set to null.
-   */
-  static class JournalAndStream {
-    private final JournalManager manager;
-    private EditLogOutputStream stream;
-    private long segmentStartsAtTxId = HdfsConstants.INVALID_TXID;
-    
-    private JournalAndStream(JournalManager manager) {
-      this.manager = manager;
-    }
-
-    private void startLogSegment(long txId) throws IOException {
-      Preconditions.checkState(stream == null);
-      stream = manager.startLogSegment(txId);
-      segmentStartsAtTxId = txId;
-    }
-
-    private void close(long lastTxId) throws IOException {
-      Preconditions.checkArgument(lastTxId >= segmentStartsAtTxId,
-          "invalid segment: lastTxId %s >= " +
-          "segment starting txid %s", lastTxId, segmentStartsAtTxId);
-          
-      if (stream == null) return;
-      stream.close();
-      manager.finalizeLogSegment(segmentStartsAtTxId, lastTxId);
-      stream = null;
-    }
-    
-    @VisibleForTesting
-    void abort() {
-      if (stream == null) return;
-      try {
-        stream.abort();
-      } catch (IOException ioe) {
-        LOG.error("Unable to abort stream " + stream, ioe);
-      }
-      stream = null;
-      segmentStartsAtTxId = HdfsConstants.INVALID_TXID;
-    }
-
-    private boolean isActive() {
-      return stream != null;
-    }
-
-    @VisibleForTesting
-    EditLogOutputStream getCurrentStream() {
-      return stream;
-    }
-    
-    @Override
-    public String toString() {
-      return "JournalAndStream(mgr=" + manager +
-        ", " + "stream=" + stream + ")";
-    }
-
-    @VisibleForTesting
-    void setCurrentStreamForTests(EditLogOutputStream stream) {
-      this.stream = stream;
-    }
-    
-    @VisibleForTesting
-    JournalManager getManager() {
-      return manager;
-    }
-  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Thu Oct  6 01:16:48 2011
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFac
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
-import java.util.HashMap;
 import java.util.Comparator;
 import java.util.Collections;
 import java.util.regex.Matcher;

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1179484&r1=1179483&r2=1179484&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Thu Oct  6 01:16:48 2011
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.IOException;
 
-import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
 
 /**
  * A JournalManager is responsible for managing a single place of storing