You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ss...@apache.org on 2012/10/19 20:49:45 UTC

svn commit: r1400219 [1/2] - in /hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/mai...

Author: sseth
Date: Fri Oct 19 18:49:38 2012
New Revision: 1400219

URL: http://svn.apache.org/viewvc?rev=1400219&view=rev
Log:
merge from trunk to branch MR-3902

Added:
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/
      - copied from r1400218, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java
      - copied unchanged from r1400218, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java
Removed:
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/
Modified:
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java
    hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Oct 19 18:49:38 2012
@@ -147,6 +147,9 @@ Trunk (Unreleased)
     Block Pool Used, Block Pool Used(%) and Failed Volumes.
     (Brahma Reddy Battula via suresh)
 
+    HDFS-4052. BlockManager#invalidateWork should print log outside the lock.
+    (Jing Zhao via suresh)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -224,8 +227,6 @@ Trunk (Unreleased)
     HDFS-3834. Remove unused static fields NAME, DESCRIPTION and Usage from
     Command. (Jing Zhao via suresh)
 
-    HDFS-3678. Edit log files are never being purged from 2NN. (atm)
-
     HADOOP-8158. Interrupting hadoop fs -put from the command line
     causes a LeaseExpiredException. (daryn via harsh)
 
@@ -343,6 +344,8 @@ Release 2.0.3-alpha - Unreleased 
     HDFS-3912. Detect and avoid stale datanodes for writes.
     (Jing Zhao via suresh)
 
+    HDFS-4059. Add number of stale DataNodes to metrics. (Jing Zhao via suresh)
+
   IMPROVEMENTS
   
     HDFS-3925. Prettify PipelineAck#toString() for printing to a log
@@ -388,6 +391,20 @@ Release 2.0.3-alpha - Unreleased 
     HDFS-4036. Remove "throw UnresolvedLinkException" from
     FSDirectory.unprotectedAddFile(..). (Jing Zhao via szetszwo)
 
+    HDFS-2946. HA: Put a cap on the number of completed edits files retained
+    by the NN. (atm)
+
+    HDFS-4029. GenerationStamp should use an AtomicLong. (eli)
+
+    HDFS-4068. DatanodeID and DatanodeInfo member should be private. (eli)
+
+    HDFS-4073. Two minor improvements to FSDirectory.  (Jing Zhao via szetszwo)
+
+    HDFS-4074. Remove the unused default constructor from INode.  (Brandon Li
+    via szetszwo)
+
+    HDFS-4053. Increase the default block size. (eli)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -440,6 +457,26 @@ Release 2.0.3-alpha - Unreleased 
     HDFS-4044. Duplicate ChecksumType definition in HDFS .proto files.
     (Binglin Chang via suresh)
 
+    HDFS-4049. Fix hflush performance regression due to nagling delays
+    (todd)
+
+    HDFS-3678. Edit log files are never being purged from 2NN. (atm)
+
+    HDFS-4058. DirectoryScanner may fail with IOOB if the directory
+    scanning threads return out of volume order. (eli)
+
+    HDFS-3985. Add timeouts to TestMulitipleNNDataBlockScanner. (todd via eli)
+
+    HDFS-4061. TestBalancer and TestUnderReplicatedBlocks need timeouts. (eli)
+
+    HDFS-3997. OfflineImageViewer incorrectly passes value of imageVersion when
+    visiting IS_COMPRESSED element. (Mithun Radhakrishnan via atm)
+
+    HDFS-4055. TestAuditLogs is flaky. (Binglin Chang via eli)
+
+    HDFS-4072. On file deletion remove corresponding blocks pending
+    replications. (Jing Zhao via suresh)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Oct 19 18:49:38 2012
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.CommonConfig
 public class DFSConfigKeys extends CommonConfigurationKeys {
 
   public static final String  DFS_BLOCK_SIZE_KEY = "dfs.blocksize";
-  public static final long    DFS_BLOCK_SIZE_DEFAULT = 64*1024*1024;
+  public static final long    DFS_BLOCK_SIZE_DEFAULT = 128*1024*1024;
   public static final String  DFS_REPLICATION_KEY = "dfs.replication";
   public static final short   DFS_REPLICATION_DEFAULT = 3;
   public static final String  DFS_STREAM_BUFFER_SIZE_KEY = "dfs.stream-buffer-size";
@@ -162,6 +162,8 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT = 2;
   public static final String  DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY = "dfs.namenode.num.extra.edits.retained";
   public static final int     DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT = 1000000; //1M
+  public static final String  DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY = "dfs.namenode.max.extra.edits.segments.retained";
+  public static final int     DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT = 10000; // 10k
   public static final String  DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY = "dfs.namenode.min.supported.datanode.version";
   public static final String  DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT";
 

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Fri Oct 19 18:49:38 2012
@@ -37,12 +37,12 @@ import org.apache.hadoop.classification.
 public class DatanodeID implements Comparable<DatanodeID> {
   public static final DatanodeID[] EMPTY_ARRAY = {};
 
-  protected String ipAddr;     // IP address
-  protected String hostName;   // hostname
-  protected String storageID;  // unique per cluster storageID
-  protected int xferPort;      // data streaming port
-  protected int infoPort;      // info server port
-  protected int ipcPort;       // IPC server port
+  private String ipAddr;     // IP address
+  private String hostName;   // hostname
+  private String storageID;  // unique per cluster storageID
+  private int xferPort;      // data streaming port
+  private int infoPort;      // info server port
+  private int ipcPort;       // IPC server port
 
   public DatanodeID(DatanodeID from) {
     this(from.getIpAddr(),

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Fri Oct 19 18:49:38 2012
@@ -37,13 +37,13 @@ import org.apache.hadoop.util.Time;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class DatanodeInfo extends DatanodeID implements Node {
-  protected long capacity;
-  protected long dfsUsed;
-  protected long remaining;
-  protected long blockPoolUsed;
-  protected long lastUpdate;
-  protected int xceiverCount;
-  protected String location = NetworkTopology.DEFAULT_RACK;
+  private long capacity;
+  private long dfsUsed;
+  private long remaining;
+  private long blockPoolUsed;
+  private long lastUpdate;
+  private int xceiverCount;
+  private String location = NetworkTopology.DEFAULT_RACK;
   
   // Datanode administrative states
   public enum AdminStates {
@@ -81,8 +81,7 @@ public class DatanodeInfo extends Datano
     this.lastUpdate = from.getLastUpdate();
     this.xceiverCount = from.getXceiverCount();
     this.location = from.getNetworkLocation();
-    this.adminState = from.adminState;
-    this.hostName = from.hostName;
+    this.adminState = from.getAdminState();
   }
 
   public DatanodeInfo(DatanodeID nodeID) {

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java Fri Oct 19 18:49:38 2012
@@ -53,14 +53,8 @@ public class PacketReceiver implements C
   private final boolean useDirectBuffers;
 
   /**
-   * Internal buffer for reading the length prefixes at the start of
-   * the packet.
-   */
-  private final ByteBuffer lengthPrefixBuf = ByteBuffer.allocate(
-      PacketHeader.PKT_LENGTHS_LEN);
-
-  /**
-   * The entirety of the most recently read packet, excepting the
+   * The entirety of the most recently read packet.
+   * The first PKT_LENGTHS_LEN bytes of this buffer are the
    * length prefixes.
    */
   private ByteBuffer curPacketBuf = null;
@@ -82,6 +76,7 @@ public class PacketReceiver implements C
   
   public PacketReceiver(boolean useDirectBuffers) {
     this.useDirectBuffers = useDirectBuffers;
+    reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN);
   }
 
   public PacketHeader getHeader() {
@@ -133,11 +128,12 @@ public class PacketReceiver implements C
     //            checksums were not requested
     // DATA       the actual block data
     Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
-    
-    lengthPrefixBuf.clear();
-    doReadFully(ch, in, lengthPrefixBuf);
-    lengthPrefixBuf.flip();
-    int payloadLen = lengthPrefixBuf.getInt();
+
+    curPacketBuf.clear();
+    curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN);
+    doReadFully(ch, in, curPacketBuf);
+    curPacketBuf.flip();
+    int payloadLen = curPacketBuf.getInt();
     
     if (payloadLen < Ints.BYTES) {
       // The "payload length" includes its own length. Therefore it
@@ -146,7 +142,7 @@ public class PacketReceiver implements C
           payloadLen);
     }
     int dataPlusChecksumLen = payloadLen - Ints.BYTES;
-    int headerLen = lengthPrefixBuf.getShort();
+    int headerLen = curPacketBuf.getShort();
     if (headerLen < 0) {
       throw new IOException("Invalid header length " + headerLen);
     }
@@ -166,13 +162,17 @@ public class PacketReceiver implements C
 
     // Make sure we have space for the whole packet, and
     // read it.
-    reallocPacketBuf(dataPlusChecksumLen + headerLen);
+    reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN +
+        dataPlusChecksumLen + headerLen);
     curPacketBuf.clear();
-    curPacketBuf.limit(dataPlusChecksumLen + headerLen);
+    curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
+    curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN +
+        dataPlusChecksumLen + headerLen);
     doReadFully(ch, in, curPacketBuf);
     curPacketBuf.flip();
+    curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
 
-    // Extract the header from the front of the buffer.
+    // Extract the header from the front of the buffer (after the length prefixes)
     byte[] headerBuf = new byte[headerLen];
     curPacketBuf.get(headerBuf);
     if (curHeader == null) {
@@ -197,10 +197,6 @@ public class PacketReceiver implements C
   public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException {
     Preconditions.checkState(!useDirectBuffers,
         "Currently only supported for non-direct buffers");
-    assert lengthPrefixBuf.capacity() == PacketHeader.PKT_LENGTHS_LEN;
-    mirrorOut.write(lengthPrefixBuf.array(),
-        lengthPrefixBuf.arrayOffset(),
-        lengthPrefixBuf.capacity());
     mirrorOut.write(curPacketBuf.array(),
         curPacketBuf.arrayOffset(),
         curPacketBuf.remaining());
@@ -223,23 +219,36 @@ public class PacketReceiver implements C
 
   private void reslicePacket(
       int headerLen, int checksumsLen, int dataLen) {
+    // Packet structure (refer to doRead() for details):
+    //   PLEN    HLEN      HEADER     CHECKSUMS  DATA
+    //   32-bit  16-bit   <protobuf>  <variable length>
+    //   |--- lenThroughHeader ----|
+    //   |----------- lenThroughChecksums   ----|
+    //   |------------------- lenThroughData    ------| 
+    int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen;
+    int lenThroughChecksums = lenThroughHeader + checksumsLen;
+    int lenThroughData = lenThroughChecksums + dataLen;
+
     assert dataLen >= 0 : "invalid datalen: " + dataLen;
-    
-    assert curPacketBuf.position() == headerLen;
-    assert checksumsLen + dataLen == curPacketBuf.remaining() :
+    assert curPacketBuf.position() == lenThroughHeader;
+    assert curPacketBuf.limit() == lenThroughData :
       "headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen +
       " rem=" + curPacketBuf.remaining();
-    
-    curPacketBuf.position(headerLen);
-    curPacketBuf.limit(headerLen + checksumsLen);
+
+    // Slice the checksums.
+    curPacketBuf.position(lenThroughHeader);
+    curPacketBuf.limit(lenThroughChecksums);
     curChecksumSlice = curPacketBuf.slice();
 
-    curPacketBuf.position(headerLen + checksumsLen);
-    curPacketBuf.limit(headerLen + checksumsLen + dataLen);
+    // Slice the data.
+    curPacketBuf.position(lenThroughChecksums);
+    curPacketBuf.limit(lenThroughData);
     curDataSlice = curPacketBuf.slice();
     
+    // Reset buffer to point to the entirety of the packet (including
+    // length prefixes)
     curPacketBuf.position(0);
-    curPacketBuf.limit(headerLen + checksumsLen + dataLen);
+    curPacketBuf.limit(lenThroughData);
   }
 
   
@@ -258,12 +267,21 @@ public class PacketReceiver implements C
     // one.
     if (curPacketBuf == null ||
         curPacketBuf.capacity() < atLeastCapacity) {
-      returnPacketBufToPool();
+      ByteBuffer newBuf;
       if (useDirectBuffers) {
-        curPacketBuf = bufferPool.getBuffer(atLeastCapacity);
+        newBuf = bufferPool.getBuffer(atLeastCapacity);
       } else {
-        curPacketBuf = ByteBuffer.allocate(atLeastCapacity);
+        newBuf = ByteBuffer.allocate(atLeastCapacity);
       }
+      // If reallocing an existing buffer, copy the old packet length
+      // prefixes over
+      if (curPacketBuf != null) {
+        curPacketBuf.flip();
+        newBuf.put(curPacketBuf);
+      }
+      
+      returnPacketBufToPool();
+      curPacketBuf = newBuf;
     }
   }
   

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Oct 19 18:49:38 2012
@@ -288,7 +288,7 @@ public class BlockManager {
   }
 
   private static BlockTokenSecretManager createBlockTokenSecretManager(
-      final Configuration conf) throws IOException {
+      final Configuration conf) {
     final boolean isEnabled = conf.getBoolean(
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, 
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
@@ -1260,7 +1260,7 @@ public class BlockManager {
           // Move the block-replication into a "pending" state.
           // The reason we use 'pending' is so we can retry
           // replications that fail after an appropriate amount of time.
-          pendingReplications.add(block, targets.length);
+          pendingReplications.increment(block, targets.length);
           if(NameNode.stateChangeLog.isDebugEnabled()) {
             NameNode.stateChangeLog.debug(
                 "BLOCK* block " + block
@@ -1306,8 +1306,11 @@ public class BlockManager {
 
   /**
    * Choose target datanodes according to the replication policy.
-   * @throws IOException if the number of targets < minimum replication.
-   * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, HashMap, long)
+   * 
+   * @throws IOException
+   *           if the number of targets < minimum replication.
+   * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor,
+   *      List, boolean, HashMap, long)
    */
   public DatanodeDescriptor[] chooseTarget(final String src,
       final int numOfReplicas, final DatanodeDescriptor client,
@@ -1811,7 +1814,7 @@ assert storedBlock.findDatanode(dn) < 0 
 
   /**
    * Queue the given reported block for later processing in the
-   * standby node. {@see PendingDataNodeMessages}.
+   * standby node. @see PendingDataNodeMessages.
    * @param reason a textual reason to report in the debug logs
    */
   private void queueReportedBlock(DatanodeDescriptor dn, Block block,
@@ -1976,14 +1979,15 @@ assert storedBlock.findDatanode(dn) < 0 
   }
   
   /**
-   * Faster version of {@link addStoredBlock()}, intended for use with 
-   * initial block report at startup.  If not in startup safe mode, will
-   * call standard addStoredBlock().
-   * Assumes this method is called "immediately" so there is no need to
-   * refresh the storedBlock from blocksMap.
-   * Doesn't handle underReplication/overReplication, or worry about
+   * Faster version of
+   * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, DatanodeDescriptor, boolean)}
+   * , intended for use with initial block report at startup. If not in startup
+   * safe mode, will call standard addStoredBlock(). Assumes this method is
+   * called "immediately" so there is no need to refresh the storedBlock from
+   * blocksMap. Doesn't handle underReplication/overReplication, or worry about
    * pendingReplications or corruptReplicas, because it's in startup safe mode.
    * Doesn't log every block, because there are typically millions of them.
+   * 
    * @throws IOException
    */
   private void addStoredBlockImmediate(BlockInfo storedBlock,
@@ -2505,7 +2509,7 @@ assert storedBlock.findDatanode(dn) < 0 
     //
     // Modify the blocks->datanode map and node's map.
     //
-    pendingReplications.remove(block);
+    pendingReplications.decrement(block);
     processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
         delHintNode);
   }
@@ -2641,7 +2645,7 @@ assert storedBlock.findDatanode(dn) < 0 
   }
 
   /** 
-   * Simpler, faster form of {@link countNodes()} that only returns the number
+   * Simpler, faster form of {@link #countNodes(Block)} that only returns the number
    * of live nodes.  If in startup safemode (or its 30-sec extension period),
    * then it gains speed by ignoring issues of excess replicas or nodes
    * that are decommissioned or in process of becoming decommissioned.
@@ -2790,6 +2794,8 @@ assert storedBlock.findDatanode(dn) < 0 
     addToInvalidates(block);
     corruptReplicas.removeFromCorruptReplicasMap(block);
     blocksMap.removeBlock(block);
+    // Remove the block from pendingReplications
+    pendingReplications.remove(block);
     if (postponedMisreplicatedBlocks.remove(block)) {
       postponedMisreplicatedBlocksCount--;
     }
@@ -2856,6 +2862,9 @@ assert storedBlock.findDatanode(dn) < 0 
    * @return number of blocks scheduled for removal during this iteration.
    */
   private int invalidateWorkForOneNode(String nodeId) {
+    final List<Block> toInvalidate;
+    final DatanodeDescriptor dn;
+    
     namesystem.writeLock();
     try {
       // blocks should not be replicated or removed if safe mode is on
@@ -2865,10 +2874,23 @@ assert storedBlock.findDatanode(dn) < 0 
       }
       // get blocks to invalidate for the nodeId
       assert nodeId != null;
-      return invalidateBlocks.invalidateWork(nodeId);
+      dn = datanodeManager.getDatanode(nodeId);
+      if (dn == null) {
+        invalidateBlocks.remove(nodeId);
+        return 0;
+      }
+      toInvalidate = invalidateBlocks.invalidateWork(nodeId, dn);
+      if (toInvalidate == null) {
+        return 0;
+      }
     } finally {
       namesystem.writeUnlock();
     }
+    if (NameNode.stateChangeLog.isInfoEnabled()) {
+      NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName()
+          + ": ask " + dn + " to delete " + toInvalidate);
+    }
+    return toInvalidate.size();
   }
 
   boolean blockHasEnoughRacks(Block b) {

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Fri Oct 19 18:49:38 2012
@@ -276,11 +276,11 @@ public class DatanodeDescriptor extends 
   }
 
   public void resetBlocks() {
-    this.capacity = 0;
-    this.remaining = 0;
-    this.blockPoolUsed = 0;
-    this.dfsUsed = 0;
-    this.xceiverCount = 0;
+    setCapacity(0);
+    setRemaining(0);
+    setBlockPoolUsed(0);
+    setDfsUsed(0);
+    setXceiverCount(0);
     this.blockList = null;
     this.invalidateBlocks.clear();
     this.volumeFailures = 0;
@@ -303,15 +303,15 @@ public class DatanodeDescriptor extends 
    */
   public void updateHeartbeat(long capacity, long dfsUsed, long remaining,
       long blockPoolUsed, int xceiverCount, int volFailures) {
-    this.capacity = capacity;
-    this.dfsUsed = dfsUsed;
-    this.remaining = remaining;
-    this.blockPoolUsed = blockPoolUsed;
-    this.lastUpdate = Time.now();
-    this.xceiverCount = xceiverCount;
+    setCapacity(capacity);
+    setRemaining(remaining);
+    setBlockPoolUsed(blockPoolUsed);
+    setDfsUsed(dfsUsed);
+    setXceiverCount(xceiverCount);
+    setLastUpdate(Time.now());    
     this.volumeFailures = volFailures;
     this.heartbeatedSinceFailover = true;
-    rollBlocksScheduled(lastUpdate);
+    rollBlocksScheduled(getLastUpdate());
   }
 
   /**

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Oct 19 18:49:38 2012
@@ -567,7 +567,7 @@ public class DatanodeManager {
   /**
    * Decommission the node if it is in exclude list.
    */
-  private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) { 
+  private void checkDecommissioning(DatanodeDescriptor nodeReg) { 
     // If the registered node is in exclude list, then decommission it
     if (inExcludedHostsList(nodeReg)) {
       startDecommission(nodeReg);
@@ -713,7 +713,7 @@ public class DatanodeManager {
         
       // also treat the registration message as a heartbeat
       heartbeatManager.register(nodeS);
-      checkDecommissioning(nodeS, dnAddress);
+      checkDecommissioning(nodeS);
       return;
     } 
 
@@ -733,7 +733,7 @@ public class DatanodeManager {
       = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
     resolveNetworkLocation(nodeDescr);
     addDatanode(nodeDescr);
-    checkDecommissioning(nodeDescr, dnAddress);
+    checkDecommissioning(nodeDescr);
     
     // also treat the registration message as a heartbeat
     // no need to update its timestamp
@@ -885,7 +885,7 @@ public class DatanodeManager {
    * @return Return the current number of stale DataNodes (detected by
    * HeartbeatManager). 
    */
-  int getNumStaleNodes() {
+  public int getNumStaleNodes() {
     return this.numStaleNodes;
   }
 

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Fri Oct 19 18:49:38 2012
@@ -134,26 +134,7 @@ class InvalidateBlocks {
     return new ArrayList<String>(node2blocks.keySet());
   }
 
-  /** Invalidate work for the storage. */
-  int invalidateWork(final String storageId) {
-    final DatanodeDescriptor dn = datanodeManager.getDatanode(storageId);
-    if (dn == null) {
-      remove(storageId);
-      return 0;
-    }
-    final List<Block> toInvalidate = invalidateWork(storageId, dn);
-    if (toInvalidate == null) {
-      return 0;
-    }
-
-    if (NameNode.stateChangeLog.isInfoEnabled()) {
-      NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName()
-          + ": ask " + dn + " to delete " + toInvalidate);
-    }
-    return toInvalidate.size();
-  }
-
-  private synchronized List<Block> invalidateWork(
+  synchronized List<Block> invalidateWork(
       final String storageId, final DatanodeDescriptor dn) {
     final LightWeightHashSet<Block> set = node2blocks.get(storageId);
     if (set == null) {

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Fri Oct 19 18:49:38 2012
@@ -72,7 +72,7 @@ class PendingReplicationBlocks {
   /**
    * Add a block to the list of pending Replications
    */
-  void add(Block block, int numReplicas) {
+  void increment(Block block, int numReplicas) {
     synchronized (pendingReplications) {
       PendingBlockInfo found = pendingReplications.get(block);
       if (found == null) {
@@ -89,7 +89,7 @@ class PendingReplicationBlocks {
    * Decrement the number of pending replication requests
    * for this block.
    */
-  void remove(Block block) {
+  void decrement(Block block) {
     synchronized (pendingReplications) {
       PendingBlockInfo found = pendingReplications.get(block);
       if (found != null) {
@@ -104,6 +104,16 @@ class PendingReplicationBlocks {
     }
   }
 
+  /**
+   * Remove the record about the given block from pendingReplications.
+   * @param block The given block whose pending replication requests need to be
+   *              removed
+   */
+  void remove(Block block) {
+    synchronized (pendingReplications) {
+      pendingReplications.remove(block);
+    }
+  }
 
   public void clear() {
     synchronized (pendingReplications) {

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java Fri Oct 19 18:49:38 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.common;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 
 /****************************************************************
@@ -35,7 +37,7 @@ public class GenerationStamp implements 
    */
   public static final long GRANDFATHER_GENERATION_STAMP = 0;
 
-  private volatile long genstamp;
+  private AtomicLong genstamp = new AtomicLong();
 
   /**
    * Create a new instance, initialized to FIRST_VALID_STAMP.
@@ -48,35 +50,36 @@ public class GenerationStamp implements 
    * Create a new instance, initialized to the specified value.
    */
   GenerationStamp(long stamp) {
-    this.genstamp = stamp;
+    genstamp.set(stamp);
   }
 
   /**
    * Returns the current generation stamp
    */
   public long getStamp() {
-    return this.genstamp;
+    return genstamp.get();
   }
 
   /**
    * Sets the current generation stamp
    */
   public void setStamp(long stamp) {
-    this.genstamp = stamp;
+    genstamp.set(stamp);
   }
 
   /**
    * First increments the counter and then returns the stamp 
    */
-  public synchronized long nextStamp() {
-    this.genstamp++;
-    return this.genstamp;
+  public long nextStamp() {
+    return genstamp.incrementAndGet();
   }
 
   @Override // Comparable
   public int compareTo(GenerationStamp that) {
-    return this.genstamp < that.genstamp ? -1 :
-           this.genstamp > that.genstamp ? 1 : 0;
+    long stamp1 = this.genstamp.get();
+    long stamp2 = that.genstamp.get();
+    return stamp1 < stamp2 ? -1 :
+           stamp1 > stamp2 ? 1 : 0;
   }
 
   @Override // Object
@@ -89,6 +92,7 @@ public class GenerationStamp implements 
 
   @Override // Object
   public int hashCode() {
-    return (int) (genstamp^(genstamp>>>32));
+    long stamp = genstamp.get();
+    return (int) (stamp^(stamp>>>32));
   }
 }

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Fri Oct 19 18:49:38 2012
@@ -431,16 +431,16 @@ public class DirectoryScanner implements
   private Map<String, ScanInfo[]> getDiskReport() {
     // First get list of data directories
     final List<? extends FsVolumeSpi> volumes = dataset.getVolumes();
-    ArrayList<ScanInfoPerBlockPool> dirReports =
-      new ArrayList<ScanInfoPerBlockPool>(volumes.size());
-    
+
+    // Use an array since the threads may return out of order and
+    // compilersInProgress#keySet may return out of order as well.
+    ScanInfoPerBlockPool[] dirReports = new ScanInfoPerBlockPool[volumes.size()];
+
     Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress =
       new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
+
     for (int i = 0; i < volumes.size(); i++) {
-      if (!isValid(dataset, volumes.get(i))) {
-        // volume is invalid
-        dirReports.add(i, null);
-      } else {
+      if (isValid(dataset, volumes.get(i))) {
         ReportCompiler reportCompiler =
           new ReportCompiler(volumes.get(i));
         Future<ScanInfoPerBlockPool> result = 
@@ -452,7 +452,7 @@ public class DirectoryScanner implements
     for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
         compilersInProgress.entrySet()) {
       try {
-        dirReports.add(report.getKey(), report.getValue().get());
+        dirReports[report.getKey()] = report.getValue().get();
       } catch (Exception ex) {
         LOG.error("Error compiling report", ex);
         // Propagate ex to DataBlockScanner to deal with
@@ -465,7 +465,7 @@ public class DirectoryScanner implements
     for (int i = 0; i < volumes.size(); i++) {
       if (isValid(dataset, volumes.get(i))) {
         // volume is still valid
-        list.addAll(dirReports.get(i));
+        list.addAll(dirReports[i]);
       }
     }
 

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Oct 19 18:49:38 2012
@@ -290,13 +290,18 @@ public class FSDirectory implements Clos
     try {
       newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
     } catch (IOException e) {
+      if(NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug(
+            "DIR* FSDirectory.unprotectedAddFile: exception when add " + path
+                + " to the file system", e);
+      }
       return null;
     }
     return newNode;
   }
 
   INodeDirectory addToParent(byte[] src, INodeDirectory parentINode,
-      INode newNode, boolean propagateModTime) throws UnresolvedLinkException {
+      INode newNode, boolean propagateModTime) {
     // NOTE: This does not update space counts for parents
     INodeDirectory newParent = null;
     writeLock();

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Oct 19 18:49:38 2012
@@ -1174,6 +1174,11 @@ public class FSEditLog implements LogsPu
       // TODO: are we sure this is OK?
     }
   }
+  
+  public void selectInputStreams(Collection<EditLogInputStream> streams,
+      long fromTxId, boolean inProgressOk) {
+    journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
+  }
 
   public Collection<EditLogInputStream> selectInputStreams(
       long fromTxId, long toAtLeastTxId) throws IOException {
@@ -1191,7 +1196,7 @@ public class FSEditLog implements LogsPu
       long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
       boolean inProgressOk) throws IOException {
     List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
-    journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
+    selectInputStreams(streams, fromTxId, inProgressOk);
 
     try {
       checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Oct 19 18:49:38 2012
@@ -4677,6 +4677,13 @@ public class FSNamesystem implements Nam
   public int getNumDeadDataNodes() {
     return getBlockManager().getDatanodeManager().getNumDeadDataNodes();
   }
+  
+  @Override // FSNamesystemMBean
+  @Metric({"StaleDataNodes", 
+    "Number of datanodes marked stale due to delayed heartbeat"})
+  public int getNumStaleDataNodes() {
+    return getBlockManager().getDatanodeManager().getNumStaleNodes();
+  }
 
   /**
    * Sets the generation stamp for this filesystem

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Fri Oct 19 18:49:38 2012
@@ -247,6 +247,11 @@ public class FileJournalManager implemen
     LOG.debug(this + ": selecting input streams starting at " + fromTxId + 
         (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
         "from among " + elfs.size() + " candidate file(s)");
+    addStreamsToCollectionFromFiles(elfs, streams, fromTxId, inProgressOk);
+  }
+  
+  static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs,
+      Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) {
     for (EditLogFile elf : elfs) {
       if (elf.isInProgress()) {
         if (!inProgressOk) {

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Fri Oct 19 18:49:38 2012
@@ -96,13 +96,6 @@ abstract class INode implements Comparab
     }
   }
 
-  protected INode() {
-    name = null;
-    parent = null;
-    modificationTime = 0;
-    accessTime = 0;
-  }
-
   INode(PermissionStatus permissions, long mTime, long atime) {
     this.name = null;
     this.parent = null;

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Fri Oct 19 18:49:38 2012
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collection;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -56,21 +55,6 @@ public interface JournalManager extends 
    */
   void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException;
 
-   /**
-   * Get a list of edit log input streams.  The list will start with the
-   * stream that contains fromTxnId, and continue until the end of the journal
-   * being managed.
-   * 
-   * @param fromTxnId the first transaction id we want to read
-   * @param inProgressOk whether or not in-progress streams should be returned
-   *
-   * @return a list of streams
-   * @throws IOException if the underlying storage has an error or is otherwise
-   * inaccessible
-   */
-  void selectInputStreams(Collection<EditLogInputStream> streams,
-      long fromTxnId, boolean inProgressOk) throws IOException;
-
   /**
    * Set the amount of memory that this stream should use to buffer edits
    */

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java Fri Oct 19 18:49:38 2012
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.Collection;
 
 /**
  * Interface used to abstract over classes which manage edit logs that may need
@@ -33,5 +34,20 @@ interface LogsPurgeable {
    * @throws IOException in the event of error
    */
   public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
-
+  
+  /**
+   * Get a list of edit log input streams.  The list will start with the
+   * stream that contains fromTxnId, and continue until the end of the journal
+   * being managed.
+   * 
+   * @param fromTxId the first transaction id we want to read
+   * @param inProgressOk whether or not in-progress streams should be returned
+   *
+   * @return a list of streams
+   * @throws IOException if the underlying storage has an error or is otherwise
+   * inaccessible
+   */
+  void selectInputStreams(Collection<EditLogInputStream> streams,
+      long fromTxId, boolean inProgressOk) throws IOException;
+  
 }

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java Fri Oct 19 18:49:38 2012
@@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.TreeSet;
 
@@ -32,6 +34,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -48,6 +51,7 @@ public class NNStorageRetentionManager {
   
   private final int numCheckpointsToRetain;
   private final long numExtraEditsToRetain;
+  private final int maxExtraEditsSegmentsToRetain;
   private static final Log LOG = LogFactory.getLog(
       NNStorageRetentionManager.class);
   private final NNStorage storage;
@@ -65,6 +69,9 @@ public class NNStorageRetentionManager {
     this.numExtraEditsToRetain = conf.getLong(
         DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY,
         DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT);
+    this.maxExtraEditsSegmentsToRetain = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY,
+        DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT);
     Preconditions.checkArgument(numCheckpointsToRetain > 0,
         "Must retain at least one checkpoint");
     Preconditions.checkArgument(numExtraEditsToRetain >= 0,
@@ -94,7 +101,39 @@ public class NNStorageRetentionManager {
     // provide a "cushion" of older txns that we keep, which is
     // handy for HA, where a remote node may not have as many
     // new images.
-    long purgeLogsFrom = Math.max(0, minImageTxId + 1 - numExtraEditsToRetain);
+    //
+    // First, determine the target number of extra transactions to retain based
+    // on the configured amount.
+    long minimumRequiredTxId = minImageTxId + 1;
+    long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
+    
+    ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
+    purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false);
+    Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
+      @Override
+      public int compare(EditLogInputStream a, EditLogInputStream b) {
+        return ComparisonChain.start()
+            .compare(a.getFirstTxId(), b.getFirstTxId())
+            .compare(a.getLastTxId(), b.getLastTxId())
+            .result();
+      }
+    });
+    
+    // Next, adjust the number of transactions to retain if doing so would mean
+    // keeping too many segments around.
+    while (editLogs.size() > maxExtraEditsSegmentsToRetain) {
+      purgeLogsFrom = editLogs.get(0).getFirstTxId();
+      editLogs.remove(0);
+    }
+    
+    // Finally, ensure that we're not trying to purge any transactions that we
+    // actually need.
+    if (purgeLogsFrom > minimumRequiredTxId) {
+      throw new AssertionError("Should not purge more edits than required to "
+          + "restore: " + purgeLogsFrom + " should be <= "
+          + minimumRequiredTxId);
+    }
+    
     purgeableLogs.purgeLogsOlderThan(purgeLogsFrom);
   }
   

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Fri Oct 19 18:49:38 2012
@@ -751,6 +751,24 @@ public class SecondaryNameNode implement
           }
         }
       }
+
+      @Override
+      public void selectInputStreams(Collection<EditLogInputStream> streams,
+          long fromTxId, boolean inProgressOk) {
+        Iterator<StorageDirectory> iter = storage.dirIterator();
+        while (iter.hasNext()) {
+          StorageDirectory dir = iter.next();
+          List<EditLogFile> editFiles;
+          try {
+            editFiles = FileJournalManager.matchEditLogs(
+                dir.getCurrentDir());
+          } catch (IOException ioe) {
+            throw new RuntimeException(ioe);
+          }
+          FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams,
+              fromTxId, inProgressOk);
+        }
+      }
       
     }
     

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java Fri Oct 19 18:49:38 2012
@@ -112,4 +112,10 @@ public interface FSNamesystemMBean {
    * @return number of dead data nodes
    */
   public int getNumDeadDataNodes();
+  
+  /**
+   * Number of stale data nodes
+   * @return number of stale data nodes
+   */
+  public int getNumStaleDataNodes();
 }

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java Fri Oct 19 18:49:38 2012
@@ -82,9 +82,9 @@ public class DatanodeRegistration extend
   public String toString() {
     return getClass().getSimpleName()
       + "(" + getIpAddr()
-      + ", storageID=" + storageID
-      + ", infoPort=" + infoPort
-      + ", ipcPort=" + ipcPort
+      + ", storageID=" + getStorageID()
+      + ", infoPort=" + getInfoPort()
+      + ", ipcPort=" + getIpcPort()
       + ", storageInfo=" + storageInfo
       + ")";
   }

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Fri Oct 19 18:49:38 2012
@@ -165,7 +165,7 @@ class ImageLoaderCurrent implements Imag
 
       if (LayoutVersion.supports(Feature.FSIMAGE_COMPRESSION, imageVersion)) {
         boolean isCompressed = in.readBoolean();
-        v.visit(ImageElement.IS_COMPRESSED, imageVersion);
+        v.visit(ImageElement.IS_COMPRESSED, String.valueOf(isCompressed));
         if (isCompressed) {
           String codecClassName = Text.readString(in);
           v.visit(ImageElement.COMPRESS_CODEC, codecClassName);

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Oct 19 18:49:38 2012
@@ -358,7 +358,7 @@
 
 <property>
   <name>dfs.blocksize</name>
-  <value>67108864</value>
+  <value>134217728</value>
   <description>
       The default block size for new files, in bytes.
       You can use the following suffix (case insensitive):
@@ -660,6 +660,20 @@
   edits in order to start again.
   Typically each edit is on the order of a few hundred bytes, so the default
   of 1 million edits should be on the order of hundreds of MBs or low GBs.
+
+  NOTE: Fewer extra edits may be retained than value specified for this setting
+  if doing so would mean that more segments would be retained than the number
+  configured by dfs.namenode.max.extra.edits.segments.retained.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.max.extra.edits.segments.retained</name>
+  <value>10000</value>
+  <description>The maximum number of extra edit log segments which should be retained
+  beyond what is minimally necessary for a NN restart. When used in conjunction with
+  dfs.namenode.num.extra.edits.retained, this configuration property serves to cap
+  the number of extra edits files to a reasonable value.
   </description>
 </property>
 

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java Fri Oct 19 18:49:38 2012
@@ -20,45 +20,40 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
-import org.apache.log4j.Level;
+import org.apache.hadoop.metrics2.util.Quantile;
+import org.apache.hadoop.metrics2.util.SampleQuantiles;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 import org.junit.Test;
 
+import com.google.common.base.Stopwatch;
+
 /**
  * This class tests hflushing concurrently from many threads.
  */
 public class TestMultiThreadedHflush {
   static final int blockSize = 1024*1024;
-  static final int numBlocks = 10;
-  static final int fileSize = numBlocks * blockSize + 1;
 
   private static final int NUM_THREADS = 10;
   private static final int WRITE_SIZE = 517;
   private static final int NUM_WRITES_PER_THREAD = 1000;
   
   private byte[] toWrite = null;
-
-  {
-    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
-  }
+  
+  private final SampleQuantiles quantiles = new SampleQuantiles(
+      new Quantile[] {
+        new Quantile(0.50, 0.050),
+        new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
+        new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) });
 
   /*
    * creates a file but does not close it
@@ -104,8 +99,11 @@ public class TestMultiThreadedHflush {
     }
 
     private void doAWrite() throws IOException {
+      Stopwatch sw = new Stopwatch().start();
       stm.write(toWrite);
       stm.hflush();
+      long micros = sw.elapsedTime(TimeUnit.MICROSECONDS);
+      quantiles.insert(micros);
     }
   }
 
@@ -115,14 +113,28 @@ public class TestMultiThreadedHflush {
    * They all finish before the file is closed.
    */
   @Test
-  public void testMultipleHflushers() throws Exception {
+  public void testMultipleHflushersRepl1() throws Exception {
+    doTestMultipleHflushers(1);
+  }
+  
+  @Test
+  public void testMultipleHflushersRepl3() throws Exception {
+    doTestMultipleHflushers(3);
+  }
+  
+  private void doTestMultipleHflushers(int repl) throws Exception {
     Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(repl)
+        .build();
 
     FileSystem fs = cluster.getFileSystem();
     Path p = new Path("/multiple-hflushers.dat");
     try {
-      doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, NUM_WRITES_PER_THREAD);
+      doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE,
+          NUM_WRITES_PER_THREAD, repl);
+      System.out.println("Latency quantiles (in microseconds):\n" +
+          quantiles);
     } finally {
       fs.close();
       cluster.shutdown();
@@ -200,13 +212,13 @@ public class TestMultiThreadedHflush {
   }
 
   public void doMultithreadedWrites(
-    Configuration conf, Path p, int numThreads, int bufferSize, int numWrites)
-    throws Exception {
+      Configuration conf, Path p, int numThreads, int bufferSize, int numWrites,
+      int replication) throws Exception {
     initBuffer(bufferSize);
 
     // create a new file.
     FileSystem fs = p.getFileSystem(conf);
-    FSDataOutputStream stm = createFile(fs, p, 1);
+    FSDataOutputStream stm = createFile(fs, p, replication);
     System.out.println("Created file simpleFlush.dat");
 
     // There have been a couple issues with flushing empty buffers, so do
@@ -240,20 +252,41 @@ public class TestMultiThreadedHflush {
   }
 
   public static void main(String args[]) throws Exception {
-    if (args.length != 1) {
-      System.err.println(
-        "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
-        " <path to test file> ");
-      System.exit(1);
+    System.exit(ToolRunner.run(new CLIBenchmark(), args));
+  }
+  
+  private static class CLIBenchmark extends Configured implements Tool {
+    public int run(String args[]) throws Exception {
+      if (args.length != 1) {
+        System.err.println(
+          "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
+          " <path to test file> ");
+        System.err.println(
+            "Configurations settable by -D options:\n" +
+            "  num.threads [default 10] - how many threads to run\n" +
+            "  write.size [default 511] - bytes per write\n" +
+            "  num.writes [default 50000] - how many writes to perform");
+        System.exit(1);
+      }
+      TestMultiThreadedHflush test = new TestMultiThreadedHflush();
+      Configuration conf = getConf();
+      Path p = new Path(args[0]);
+      
+      int numThreads = conf.getInt("num.threads", 10);
+      int writeSize = conf.getInt("write.size", 511);
+      int numWrites = conf.getInt("num.writes", 50000);
+      int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+          DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+      
+      Stopwatch sw = new Stopwatch().start();
+      test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
+          replication);
+      sw.stop();
+  
+      System.out.println("Finished in " + sw.elapsedMillis() + "ms");
+      System.out.println("Latency quantiles (in microseconds):\n" +
+          test.quantiles);
+      return 0;
     }
-    TestMultiThreadedHflush test = new TestMultiThreadedHflush();
-    Configuration conf = new Configuration();
-    Path p = new Path(args[0]);
-    long st = System.nanoTime();
-    test.doMultithreadedWrites(conf, p, 10, 511, 50000);
-    long et = System.nanoTime();
-
-    System.out.println("Finished in " + ((et - st) / 1000000) + "ms");
   }
-
 }

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Fri Oct 19 18:49:38 2012
@@ -372,7 +372,7 @@ public class TestBalancer {
    * Test parse method in Balancer#Cli class with threshold value out of
    * boundaries.
    */
-  @Test
+  @Test(timeout=100000)
   public void testBalancerCliParseWithThresholdOutOfBoundaries() {
     String parameters[] = new String[] { "-threshold", "0" };
     String reason = "IllegalArgumentException is expected when threshold value"
@@ -394,7 +394,7 @@ public class TestBalancer {
   
   /** Test a cluster with even distribution, 
    * then a new empty node is added to the cluster*/
-  @Test
+  @Test(timeout=100000)
   public void testBalancer0() throws Exception {
     testBalancer0Internal(new HdfsConfiguration());
   }
@@ -406,7 +406,7 @@ public class TestBalancer {
   }
 
   /** Test unevenly distributed cluster */
-  @Test
+  @Test(timeout=100000)
   public void testBalancer1() throws Exception {
     testBalancer1Internal(new HdfsConfiguration());
   }
@@ -419,7 +419,7 @@ public class TestBalancer {
         new String[] {RACK0, RACK1});
   }
   
-  @Test
+  @Test(timeout=100000)
   public void testBalancer2() throws Exception {
     testBalancer2Internal(new HdfsConfiguration());
   }
@@ -467,8 +467,7 @@ public class TestBalancer {
   /**
    * Test parse method in Balancer#Cli class with wrong number of params
    */
-
-  @Test
+  @Test(timeout=100000)
   public void testBalancerCliParseWithWrongParams() {
     String parameters[] = new String[] { "-threshold" };
     String reason =

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Fri Oct 19 18:49:38 2012
@@ -191,4 +191,12 @@ public class BlockManagerTestUtil {
         "Must use default policy, got %s", bpp.getClass());
     ((BlockPlacementPolicyDefault)bpp).setPreferLocalNode(prefer);
   }
+  
+  /**
+   * Call heartbeat check function of HeartbeatManager
+   * @param bm the BlockManager to manipulate
+   */
+  public static void checkHeartbeat(BlockManager bm) {
+    bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
+  }
 }

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java Fri Oct 19 18:49:38 2012
@@ -20,14 +20,30 @@ package org.apache.hadoop.hdfs.server.bl
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.junit.Test;
 
 /**
- * This class tests the internals of PendingReplicationBlocks.java
+ * This class tests the internals of PendingReplicationBlocks.java,
+ * as well as how PendingReplicationBlocks acts in BlockManager
  */
 public class TestPendingReplication {
   final static int TIMEOUT = 3;     // 3 seconds
+  private static final int DFS_REPLICATION_INTERVAL = 1;
+  // Number of datanodes in the cluster
+  private static final int DATANODE_COUNT = 5;
 
   @Test
   public void testPendingReplication() {
@@ -40,7 +56,7 @@ public class TestPendingReplication {
     //
     for (int i = 0; i < 10; i++) {
       Block block = new Block(i, i, 0);
-      pendingReplications.add(block, i);
+      pendingReplications.increment(block, i);
     }
     assertEquals("Size of pendingReplications ",
                  10, pendingReplications.size());
@@ -50,15 +66,15 @@ public class TestPendingReplication {
     // remove one item and reinsert it
     //
     Block blk = new Block(8, 8, 0);
-    pendingReplications.remove(blk);             // removes one replica
+    pendingReplications.decrement(blk);             // removes one replica
     assertEquals("pendingReplications.getNumReplicas ",
                  7, pendingReplications.getNumReplicas(blk));
 
     for (int i = 0; i < 7; i++) {
-      pendingReplications.remove(blk);           // removes all replicas
+      pendingReplications.decrement(blk);           // removes all replicas
     }
     assertTrue(pendingReplications.size() == 9);
-    pendingReplications.add(blk, 8);
+    pendingReplications.increment(blk, 8);
     assertTrue(pendingReplications.size() == 10);
 
     //
@@ -86,7 +102,7 @@ public class TestPendingReplication {
 
     for (int i = 10; i < 15; i++) {
       Block block = new Block(i, i, 0);
-      pendingReplications.add(block, i);
+      pendingReplications.increment(block, i);
     }
     assertTrue(pendingReplications.size() == 15);
 
@@ -116,4 +132,70 @@ public class TestPendingReplication {
     }
     pendingReplications.stop();
   }
+  
+  /**
+   * Test if BlockManager can correctly remove corresponding pending records
+   * when a file is deleted
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testPendingAndInvalidate() throws Exception {
+    final Configuration CONF = new HdfsConfiguration();
+    CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
+    CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFS_REPLICATION_INTERVAL);
+    CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
+        DFS_REPLICATION_INTERVAL);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(
+        DATANODE_COUNT).build();
+    cluster.waitActive();
+    
+    FSNamesystem namesystem = cluster.getNamesystem();
+    BlockManager bm = namesystem.getBlockManager();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    try {
+      // 1. create a file
+      Path filePath = new Path("/tmp.txt");
+      DFSTestUtil.createFile(fs, filePath, 1024, (short) 3, 0L);
+      
+      // 2. disable the heartbeats
+      for (DataNode dn : cluster.getDataNodes()) {
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+      }
+      
+      // 3. mark a couple of blocks as corrupt
+      LocatedBlock block = NameNodeAdapter.getBlockLocations(
+          cluster.getNameNode(), filePath.toString(), 0, 1).get(0);
+      cluster.getNamesystem().writeLock();
+      try {
+        bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
+            "TEST");
+        bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[1],
+            "TEST");
+      } finally {
+        cluster.getNamesystem().writeUnlock();
+      }
+      BlockManagerTestUtil.computeAllPendingWork(bm);
+      BlockManagerTestUtil.updateState(bm);
+      assertEquals(bm.getPendingReplicationBlocksCount(), 1L);
+      assertEquals(bm.pendingReplications.getNumReplicas(block.getBlock()
+          .getLocalBlock()), 2);
+      
+      // 4. delete the file
+      fs.delete(filePath, true);
+      // retry at most 10 times, each time sleep for 1s. Note that 10s is much
+      // less than the default pending record timeout (5~10min)
+      int retries = 10; 
+      long pendingNum = bm.getPendingReplicationBlocksCount();
+      while (pendingNum != 0 && retries-- > 0) {
+        Thread.sleep(1000);  // let NN do the deletion
+        BlockManagerTestUtil.updateState(bm);
+        pendingNum = bm.getPendingReplicationBlocksCount();
+      }
+      assertEquals(pendingNum, 0L);
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java Fri Oct 19 18:49:38 2012
@@ -30,7 +30,7 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.junit.Test;
 
 public class TestUnderReplicatedBlocks {
-  @Test
+  @Test(timeout=300000) // 5 min timeout
   public void testSetrepIncWithUnderReplicatedBlocks() throws Exception {
     Configuration conf = new HdfsConfiguration();
     final short REPLICATION_FACTOR = 2;

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java Fri Oct 19 18:49:38 2012
@@ -74,7 +74,7 @@ public class TestMultipleNNDataBlockScan
     }
   }
   
-  @Test
+  @Test(timeout=120000)
   public void testDataBlockScanner() throws IOException, InterruptedException {
     setUp();
     try {
@@ -97,7 +97,7 @@ public class TestMultipleNNDataBlockScan
     }
   }
   
-  @Test
+  @Test(timeout=120000)
   public void testBlockScannerAfterRefresh() throws IOException,
       InterruptedException {
     setUp();
@@ -149,7 +149,7 @@ public class TestMultipleNNDataBlockScan
     }
   }
   
-  @Test
+  @Test(timeout=120000)
   public void testBlockScannerAfterRestart() throws IOException,
       InterruptedException {
     setUp();
@@ -176,7 +176,7 @@ public class TestMultipleNNDataBlockScan
     }
   }
   
-  @Test
+  @Test(timeout=120000)
   public void test2NNBlockRescanInterval() throws IOException {
     ((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL);
     Configuration conf = new HdfsConfiguration();
@@ -206,7 +206,7 @@ public class TestMultipleNNDataBlockScan
    * 
    * @throws Exception
    */
-  @Test
+  @Test(timeout=120000)
   public void testBlockRescanInterval() throws IOException {
     ((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL);
     Configuration conf = new HdfsConfiguration();

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java Fri Oct 19 18:49:38 2012
@@ -117,7 +117,7 @@ public class TestAuditLogs {
     int val = istream.read();
     istream.close();
     verifyAuditLogs(true);
-    assertTrue("failed to read from file", val > 0);
+    assertTrue("failed to read from file", val >= 0);
   }
 
   /** test that allowed stat puts proper entry in audit log */
@@ -168,7 +168,7 @@ public class TestAuditLogs {
     istream.close();
 
     verifyAuditLogsRepeat(true, 3);
-    assertTrue("failed to read from file", val > 0);
+    assertTrue("failed to read from file", val >= 0);
   }
 
   /** test that stat via webhdfs puts proper entry in audit log */

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java Fri Oct 19 18:49:38 2012
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.ser
 import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -196,6 +197,35 @@ public class TestNNStorageRetentionManag
     runTest(tc);
   }
   
+  @Test
+  public void testRetainExtraLogsLimitedSegments() throws IOException {
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY,
+        150);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY, 2);
+    TestCaseDescription tc = new TestCaseDescription();
+    tc.addRoot("/foo1", NameNodeDirType.IMAGE);
+    tc.addRoot("/foo2", NameNodeDirType.EDITS);
+    tc.addImage("/foo1/current/" + getImageFileName(100), true);
+    tc.addImage("/foo1/current/" + getImageFileName(200), true);
+    tc.addImage("/foo1/current/" + getImageFileName(300), false);
+    tc.addImage("/foo1/current/" + getImageFileName(400), false);
+
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(1, 100), true);
+    // Without lowering the max segments to retain, we'd retain all segments
+    // going back to txid 150 (300 - 150).
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(101, 175), true);
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(176, 200), true);
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(201, 225), true);
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(226, 240), true);
+    // Only retain 2 extra segments. The 301-400 segment is considered required,
+    // not extra.
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(241, 275), false);
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(276, 300), false);
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(301, 400), false);
+    tc.addLog("/foo2/current/" + getInProgressEditsFileName(401), false);
+    runTest(tc);
+  }
+  
   private void runTest(TestCaseDescription tc) throws IOException {
     StoragePurger mockPurger =
       Mockito.mock(NNStorageRetentionManager.StoragePurger.class);
@@ -287,8 +317,10 @@ public class TestNNStorageRetentionManag
       return mockStorageForDirs(sds.toArray(new StorageDirectory[0]));
     }
     
+    @SuppressWarnings("unchecked")
     public FSEditLog mockEditLog(StoragePurger purger) {
       final List<JournalManager> jms = Lists.newArrayList();
+      final JournalSet journalSet = new JournalSet(0);
       for (FakeRoot root : dirRoots.values()) {
         if (!root.type.isOfType(NameNodeDirType.EDITS)) continue;
         
@@ -297,6 +329,7 @@ public class TestNNStorageRetentionManag
             root.mockStorageDir(), null);
         fjm.purger = purger;
         jms.add(fjm);
+        journalSet.add(fjm, false);
       }
 
       FSEditLog mockLog = Mockito.mock(FSEditLog.class);
@@ -314,6 +347,18 @@ public class TestNNStorageRetentionManag
           return null;
         }
       }).when(mockLog).purgeLogsOlderThan(Mockito.anyLong());
+      
+      Mockito.doAnswer(new Answer<Void>() {
+        
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          Object[] args = invocation.getArguments();
+          journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
+              (long)((Long)args[1]), (boolean)((Boolean)args[2]));
+          return null;
+        }
+      }).when(mockLog).selectInputStreams(Mockito.anyCollection(),
+          Mockito.anyLong(), Mockito.anyBoolean());
       return mockLog;
     }
   }