You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2016/02/02 20:28:55 UTC

[2/2] hadoop git commit: HDFS-9260. Improve the performance and GC friendliness of NameNode startup and full block reports (Staffan Friberg via cmccabe)

HDFS-9260. Improve the performance and GC friendliness of NameNode startup and full block reports (Staffan Friberg via cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dd9ebf6e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dd9ebf6e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dd9ebf6e

Branch: refs/heads/trunk
Commit: dd9ebf6eedfd4ff8b3486eae2a446de6b0c7fa8a
Parents: 2da03b4
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Tue Feb 2 11:23:00 2016 -0800
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Tue Feb 2 11:23:00 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |    3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   12 +
 .../DatanodeProtocolClientSideTranslatorPB.java |    5 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |    5 +-
 .../hdfs/server/blockmanagement/BlockInfo.java  |  192 +--
 .../blockmanagement/BlockInfoContiguous.java    |   29 +-
 .../blockmanagement/BlockInfoStriped.java       |   30 +-
 .../server/blockmanagement/BlockManager.java    |  458 +++++--
 .../hdfs/server/blockmanagement/BlocksMap.java  |   66 +-
 .../blockmanagement/DatanodeStorageInfo.java    |  123 +-
 .../hdfs/server/datanode/BPServiceActor.java    |    4 +-
 .../datanode/fsdataset/impl/ReplicaMap.java     |   71 +-
 .../server/protocol/BlockReportContext.java     |   10 +-
 .../hdfs/server/protocol/DatanodeProtocol.java  |    1 -
 .../apache/hadoop/hdfs/util/FoldedTreeSet.java  | 1285 ++++++++++++++++++
 .../src/main/proto/DatanodeProtocol.proto       |    3 +
 .../hdfs/protocol/TestBlockListAsLongs.java     |    4 +-
 .../server/blockmanagement/TestBlockInfo.java   |   88 --
 .../blockmanagement/TestBlockManager.java       |   71 +-
 .../server/datanode/SimulatedFSDataset.java     |    5 +-
 .../TestBlockHasMultipleReplicasOnSameDN.java   |   20 +-
 .../datanode/TestDataNodeHotSwapVolumes.java    |    1 +
 .../datanode/TestDataNodeVolumeFailure.java     |    4 +-
 ...TestDnRespectsBlockReportSplitThreshold.java |    4 +-
 .../TestNNHandlesBlockReportPerStorage.java     |    3 +-
 .../TestNNHandlesCombinedBlockReport.java       |    2 +-
 .../server/datanode/TestTriggerBlockReport.java |    1 +
 .../server/namenode/NNThroughputBenchmark.java  |    4 +-
 .../server/namenode/TestAddStripedBlocks.java   |    4 +-
 .../hdfs/server/namenode/TestDeadDatanode.java  |    2 +-
 .../hadoop/hdfs/util/FoldedTreeSetTest.java     |  644 +++++++++
 31 files changed, 2565 insertions(+), 589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 2eac881..38cb3df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -971,6 +971,9 @@ Release 2.9.0 - UNRELEASED
     HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had
     an error (Rakesh R via cmccabe)
 
+    HDFS-9260. Improve the performance and GC friendliness of NameNode startup
+    and full block reports (Staffan Friberg via cmccabe)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 5217740..76915cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -219,6 +219,18 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;
   public static final String  DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit";
   public static final int     DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4;
+  public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY
+      = "dfs.namenode.storageinfo.defragment.interval.ms";
+  public static final int
+      DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT = 10 * 60 * 1000;
+  public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY
+      = "dfs.namenode.storageinfo.defragment.timeout.ms";
+  public static final int
+      DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT = 4;
+  public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY
+      = "dfs.namenode.storageinfo.defragment.ratio";
+  public static final double
+      DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT = 0.75;
   public static final String  DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY = "dfs.web.authentication.filter";
   /* Phrased as below to avoid javac inlining as a constant, to match the behavior when
      this was AuthFilter.class.getName(). Note that if you change the import for AuthFilter, you

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 81c23e1..79113dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -174,12 +174,13 @@ public class DatanodeProtocolClientSideTranslatorPB implements
 
   @Override
   public DatanodeCommand blockReport(DatanodeRegistration registration,
-      String poolId, StorageBlockReport[] reports, BlockReportContext context)
+      String poolId, StorageBlockReport[] reports,
+      BlockReportContext context)
         throws IOException {
     BlockReportRequestProto.Builder builder = BlockReportRequestProto
         .newBuilder().setRegistration(PBHelper.convert(registration))
         .setBlockPoolId(poolId);
-    
+
     boolean useBlocksBuffer = registration.getNamespaceInfo()
         .isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 4b6baf2..e70cdf0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -824,8 +824,8 @@ public class PBHelper {
 
 
   public static BlockReportContext convert(BlockReportContextProto proto) {
-    return new BlockReportContext(proto.getTotalRpcs(),
-        proto.getCurRpc(), proto.getId(), proto.getLeaseId());
+    return new BlockReportContext(proto.getTotalRpcs(), proto.getCurRpc(),
+        proto.getId(), proto.getLeaseId(), proto.getSorted());
   }
 
   public static BlockReportContextProto convert(BlockReportContext context) {
@@ -834,6 +834,7 @@ public class PBHelper {
         setCurRpc(context.getCurRpc()).
         setId(context.getReportId()).
         setLeaseId(context.getLeaseId()).
+        setSorted(context.isSorted()).
         build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index e9fa123..5da2140 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -18,8 +18,9 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.IOException;
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -55,19 +56,9 @@ public abstract class BlockInfo extends Block
   /** For implementing {@link LightWeightGSet.LinkedElement} interface. */
   private LightWeightGSet.LinkedElement nextLinkedElement;
 
-  /**
-   * This array contains triplets of references. For each i-th storage, the
-   * block belongs to triplets[3*i] is the reference to the
-   * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
-   * references to the previous and the next blocks, respectively, in the list
-   * of blocks belonging to this storage.
-   *
-   * Using previous and next in Object triplets is done instead of a
-   * {@link LinkedList} list to efficiently use memory. With LinkedList the cost
-   * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16
-   * bytes using the triplets.
-   */
-  protected Object[] triplets;
+
+  // Storages this block is replicated on
+  protected DatanodeStorageInfo[] storages;
 
   private BlockUnderConstructionFeature uc;
 
@@ -77,14 +68,14 @@ public abstract class BlockInfo extends Block
    *             in the block group
    */
   public BlockInfo(short size) {
-    this.triplets = new Object[3 * size];
+    this.storages = new DatanodeStorageInfo[size];
     this.bcId = INVALID_INODE_ID;
     this.replication = isStriped() ? 0 : size;
   }
 
   public BlockInfo(Block blk, short size) {
     super(blk);
-    this.triplets = new Object[3*size];
+    this.storages = new DatanodeStorageInfo[size];
     this.bcId = INVALID_INODE_ID;
     this.replication = isStriped() ? 0 : size;
   }
@@ -109,79 +100,52 @@ public abstract class BlockInfo extends Block
     return bcId == INVALID_INODE_ID;
   }
 
-  public DatanodeDescriptor getDatanode(int index) {
-    DatanodeStorageInfo storage = getStorageInfo(index);
-    return storage == null ? null : storage.getDatanodeDescriptor();
-  }
+  public Iterator<DatanodeStorageInfo> getStorageInfos() {
+    return new Iterator<DatanodeStorageInfo>() {
 
-  DatanodeStorageInfo getStorageInfo(int index) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
-    return (DatanodeStorageInfo)triplets[index*3];
-  }
+      private int index = 0;
 
-  BlockInfo getPrevious(int index) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
-    BlockInfo info = (BlockInfo)triplets[index*3+1];
-    assert info == null ||
-        info.getClass().getName().startsWith(BlockInfo.class.getName()) :
-        "BlockInfo is expected at " + index*3;
-    return info;
-  }
+      @Override
+      public boolean hasNext() {
+        while (index < storages.length && storages[index] == null) {
+          index++;
+        }
+        return index < storages.length;
+      }
 
-  BlockInfo getNext(int index) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
-    BlockInfo info = (BlockInfo)triplets[index*3+2];
-    assert info == null || info.getClass().getName().startsWith(
-        BlockInfo.class.getName()) :
-        "BlockInfo is expected at " + index*3;
-    return info;
+      @Override
+      public DatanodeStorageInfo next() {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+        return storages[index++];
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException("Sorry. can't remove.");
+      }
+    };
   }
 
-  void setStorageInfo(int index, DatanodeStorageInfo storage) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
-    triplets[index*3] = storage;
+  public DatanodeDescriptor getDatanode(int index) {
+    DatanodeStorageInfo storage = getStorageInfo(index);
+    return storage == null ? null : storage.getDatanodeDescriptor();
   }
 
-  /**
-   * Return the previous block on the block list for the datanode at
-   * position index. Set the previous block on the list to "to".
-   *
-   * @param index - the datanode index
-   * @param to - block to be set to previous on the list of blocks
-   * @return current previous block on the list of blocks
-   */
-  BlockInfo setPrevious(int index, BlockInfo to) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
-    BlockInfo info = (BlockInfo) triplets[index*3+1];
-    triplets[index*3+1] = to;
-    return info;
+  DatanodeStorageInfo getStorageInfo(int index) {
+    assert this.storages != null : "BlockInfo is not initialized";
+    return storages[index];
   }
 
-  /**
-   * Return the next block on the block list for the datanode at
-   * position index. Set the next block on the list to "to".
-   *
-   * @param index - the datanode index
-   * @param to - block to be set to next on the list of blocks
-   * @return current next block on the list of blocks
-   */
-  BlockInfo setNext(int index, BlockInfo to) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
-    BlockInfo info = (BlockInfo) triplets[index*3+2];
-    triplets[index*3+2] = to;
-    return info;
+  void setStorageInfo(int index, DatanodeStorageInfo storage) {
+    assert this.storages != null : "BlockInfo is not initialized";
+    this.storages[index] = storage;
   }
 
   public int getCapacity() {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
-    return triplets.length / 3;
+    assert this.storages != null : "BlockInfo is not initialized";
+    return storages.length;
   }
 
   /**
@@ -240,80 +204,6 @@ public abstract class BlockInfo extends Block
     return -1;
   }
 
-  /**
-   * Insert this block into the head of the list of blocks
-   * related to the specified DatanodeStorageInfo.
-   * If the head is null then form a new list.
-   * @return current block as the new head of the list.
-   */
-  BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
-    int dnIndex = this.findStorageInfo(storage);
-    assert dnIndex >= 0 : "Data node is not found: current";
-    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
-        "Block is already in the list and cannot be inserted.";
-    this.setPrevious(dnIndex, null);
-    this.setNext(dnIndex, head);
-    if (head != null) {
-      head.setPrevious(head.findStorageInfo(storage), this);
-    }
-    return this;
-  }
-
-  /**
-   * Remove this block from the list of blocks
-   * related to the specified DatanodeStorageInfo.
-   * If this block is the head of the list then return the next block as
-   * the new head.
-   * @return the new head of the list or null if the list becomes
-   * empy after deletion.
-   */
-  BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) {
-    if (head == null) {
-      return null;
-    }
-    int dnIndex = this.findStorageInfo(storage);
-    if (dnIndex < 0) { // this block is not on the data-node list
-      return head;
-    }
-
-    BlockInfo next = this.getNext(dnIndex);
-    BlockInfo prev = this.getPrevious(dnIndex);
-    this.setNext(dnIndex, null);
-    this.setPrevious(dnIndex, null);
-    if (prev != null) {
-      prev.setNext(prev.findStorageInfo(storage), next);
-    }
-    if (next != null) {
-      next.setPrevious(next.findStorageInfo(storage), prev);
-    }
-    if (this == head) { // removing the head
-      head = next;
-    }
-    return head;
-  }
-
-  /**
-   * Remove this block from the list of blocks related to the specified
-   * DatanodeDescriptor. Insert it into the head of the list of blocks.
-   *
-   * @return the new head of the list.
-   */
-  public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage,
-      int curIndex, int headIndex) {
-    if (head == this) {
-      return this;
-    }
-    BlockInfo next = this.setNext(curIndex, head);
-    BlockInfo prev = this.setPrevious(curIndex, null);
-
-    head.setPrevious(headIndex, this);
-    prev.setNext(prev.findStorageInfo(storage), next);
-    if (next != null) {
-      next.setPrevious(next.findStorageInfo(storage), prev);
-    }
-    return this;
-  }
-
   @Override
   public int hashCode() {
     // Super implementation is sufficient

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index 746e298..f729c4f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@ -35,20 +35,20 @@ public class BlockInfoContiguous extends BlockInfo {
   }
 
   /**
-   * Ensure that there is enough  space to include num more triplets.
-   * @return first free triplet index.
+   * Ensure that there is enough  space to include num more storages.
+   * @return first free storage index.
    */
   private int ensureCapacity(int num) {
-    assert this.triplets != null : "BlockInfo is not initialized";
+    assert this.storages != null : "BlockInfo is not initialized";
     int last = numNodes();
-    if (triplets.length >= (last+num)*3) {
+    if (storages.length >= (last+num)) {
       return last;
     }
     /* Not enough space left. Create a new array. Should normally
      * happen only when replication is manually increased by the user. */
-    Object[] old = triplets;
-    triplets = new Object[(last+num)*3];
-    System.arraycopy(old, 0, triplets, 0, last * 3);
+    DatanodeStorageInfo[] old = storages;
+    storages = new DatanodeStorageInfo[(last+num)];
+    System.arraycopy(old, 0, storages, 0, last);
     return last;
   }
 
@@ -57,8 +57,6 @@ public class BlockInfoContiguous extends BlockInfo {
     // find the last null node
     int lastNode = ensureCapacity(1);
     setStorageInfo(lastNode, storage);
-    setNext(lastNode, null);
-    setPrevious(lastNode, null);
     return true;
   }
 
@@ -68,25 +66,18 @@ public class BlockInfoContiguous extends BlockInfo {
     if (dnIndex < 0) { // the node is not found
       return false;
     }
-    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
-        "Block is still in the list and must be removed first.";
     // find the last not null node
     int lastNode = numNodes()-1;
-    // replace current node triplet by the lastNode one
+    // replace current node entry by the lastNode one
     setStorageInfo(dnIndex, getStorageInfo(lastNode));
-    setNext(dnIndex, getNext(lastNode));
-    setPrevious(dnIndex, getPrevious(lastNode));
-    // set the last triplet to null
+    // set the last entry to null
     setStorageInfo(lastNode, null);
-    setNext(lastNode, null);
-    setPrevious(lastNode, null);
     return true;
   }
 
   @Override
   public int numNodes() {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+    assert this.storages != null : "BlockInfo is not initialized";
 
     for (int idx = getCapacity()-1; idx >= 0; idx--) {
       if (getDatanode(idx) != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 20d5858..c6e26ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -26,21 +26,20 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 /**
  * Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
  *
- * We still use triplets to store DatanodeStorageInfo for each block in the
- * block group, as well as the previous/next block in the corresponding
- * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
+ * We still use a storage array to store DatanodeStorageInfo for each block in
+ * the block group. For a (m+k) block group, the first (m+k) storage units
  * are sorted and strictly mapped to the corresponding block.
  *
  * Normally each block belonging to group is stored in only one DataNode.
- * However, it is possible that some block is over-replicated. Thus the triplet
+ * However, it is possible that some block is over-replicated. Thus the storage
  * array's size can be larger than (m+k). Thus currently we use an extra byte
- * array to record the block index for each triplet.
+ * array to record the block index for each entry.
  */
 @InterfaceAudience.Private
 public class BlockInfoStriped extends BlockInfo {
   private final ErasureCodingPolicy ecPolicy;
   /**
-   * Always the same size with triplets. Record the block index for each triplet
+   * Always the same size with storage. Record the block index for each entry
    * TODO: actually this is only necessary for over-replicated block. Thus can
    * be further optimized to save memory usage.
    */
@@ -104,7 +103,7 @@ public class BlockInfoStriped extends BlockInfo {
         return i;
       }
     }
-    // need to expand the triplet size
+    // need to expand the storage size
     ensureCapacity(i + 1, true);
     return i;
   }
@@ -130,8 +129,6 @@ public class BlockInfoStriped extends BlockInfo {
   private void addStorage(DatanodeStorageInfo storage, int index,
       int blockIndex) {
     setStorageInfo(index, storage);
-    setNext(index, null);
-    setPrevious(index, null);
     indices[index] = (byte) blockIndex;
   }
 
@@ -173,26 +170,22 @@ public class BlockInfoStriped extends BlockInfo {
     if (dnIndex < 0) { // the node is not found
       return false;
     }
-    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
-        "Block is still in the list and must be removed first.";
-    // set the triplet to null
+    // set the entry to null
     setStorageInfo(dnIndex, null);
-    setNext(dnIndex, null);
-    setPrevious(dnIndex, null);
     indices[dnIndex] = -1;
     return true;
   }
 
   private void ensureCapacity(int totalSize, boolean keepOld) {
     if (getCapacity() < totalSize) {
-      Object[] old = triplets;
+      DatanodeStorageInfo[] old = storages;
       byte[] oldIndices = indices;
-      triplets = new Object[totalSize * 3];
+      storages = new DatanodeStorageInfo[totalSize];
       indices = new byte[totalSize];
       initIndices();
 
       if (keepOld) {
-        System.arraycopy(old, 0, triplets, 0, old.length);
+        System.arraycopy(old, 0, storages, 0, old.length);
         System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
       }
     }
@@ -214,8 +207,7 @@ public class BlockInfoStriped extends BlockInfo {
 
   @Override
   public int numNodes() {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+    assert this.storages != null : "BlockInfo is not initialized";
     int num = 0;
     for (int idx = getCapacity()-1; idx >= 0; idx--) {
       if (getStorageInfo(idx) != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 587e6b6..25cec8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@@ -93,6 +94,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.hdfs.util.FoldedTreeSet;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 
@@ -106,6 +108,7 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.LightWeightGSet;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.VersionInfo;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -195,7 +198,12 @@ public class BlockManager implements BlockStatsMXBean {
 
   /**replicationRecheckInterval is how often namenode checks for new replication work*/
   private final long replicationRecheckInterval;
-  
+
+  /** How often to check and the limit for the storageinfo efficiency. */
+  private final long storageInfoDefragmentInterval;
+  private final long storageInfoDefragmentTimeout;
+  private final double storageInfoDefragmentRatio;
+
   /**
    * Mapping: Block -> { BlockCollection, datanodes, self ref }
    * Updated only in response to client-sent information.
@@ -204,6 +212,10 @@ public class BlockManager implements BlockStatsMXBean {
 
   /** Replication thread. */
   final Daemon replicationThread = new Daemon(new ReplicationMonitor());
+
+  /** StorageInfoDefragmenter thread. */
+  private final Daemon storageInfoDefragmenterThread =
+      new Daemon(new StorageInfoDefragmenter());
   
   /** Block report thread for handling async reports. */
   private final BlockReportProcessingThread blockReportThread =
@@ -376,7 +388,20 @@ public class BlockManager implements BlockStatsMXBean {
     this.replicationRecheckInterval = 
       conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
                   DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
-    
+
+    this.storageInfoDefragmentInterval =
+      conf.getLong(
+          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY,
+          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT);
+    this.storageInfoDefragmentTimeout =
+      conf.getLong(
+          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY,
+          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT);
+    this.storageInfoDefragmentRatio =
+      conf.getDouble(
+          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY,
+          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT);
+
     this.encryptDataTransfer =
         conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
             DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
@@ -508,6 +533,8 @@ public class BlockManager implements BlockStatsMXBean {
     datanodeManager.activate(conf);
     this.replicationThread.setName("ReplicationMonitor");
     this.replicationThread.start();
+    storageInfoDefragmenterThread.setName("StorageInfoMonitor");
+    storageInfoDefragmenterThread.start();
     this.blockReportThread.start();
     mxBeanName = MBeans.register("NameNode", "BlockStats", this);
     bmSafeMode.activate(blockTotal);
@@ -517,8 +544,10 @@ public class BlockManager implements BlockStatsMXBean {
     bmSafeMode.close();
     try {
       replicationThread.interrupt();
+      storageInfoDefragmenterThread.interrupt();
       blockReportThread.interrupt();
       replicationThread.join(3000);
+      storageInfoDefragmenterThread.join(3000);
       blockReportThread.join(3000);
     } catch (InterruptedException ie) {
     }
@@ -1165,9 +1194,15 @@ public class BlockManager implements BlockStatsMXBean {
    
   /** Remove the blocks associated to the given datanode. */
   void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
-    final Iterator<BlockInfo> it = node.getBlockIterator();
-    while(it.hasNext()) {
-      removeStoredBlock(it.next(), node);
+    for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+      final Iterator<BlockInfo> it = storage.getBlockIterator();
+      while (it.hasNext()) {
+        BlockInfo block = it.next();
+        // DatanodeStorageInfo must be removed using the iterator to avoid
+        // ConcurrentModificationException in the underlying storage
+        it.remove();
+        removeStoredBlock(block, node);
+      }
     }
     // Remove all pending DN messages referencing this DN.
     pendingDNMessages.removeAllMessagesForDatanode(node);
@@ -1183,6 +1218,9 @@ public class BlockManager implements BlockStatsMXBean {
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     while(it.hasNext()) {
       BlockInfo block = it.next();
+      // DatanodeStorageInfo must be removed using the iterator to avoid
+      // ConcurrentModificationException in the underlying storage
+      it.remove();
       removeStoredBlock(block, node);
       final Block b = getBlockOnStorage(block, storageInfo);
       if (b != null) {
@@ -2033,8 +2071,8 @@ public class BlockManager implements BlockStatsMXBean {
    */
   public boolean processReport(final DatanodeID nodeID,
       final DatanodeStorage storage,
-      final BlockListAsLongs newReport, BlockReportContext context,
-      boolean lastStorageInRpc) throws IOException {
+      final BlockListAsLongs newReport,
+      BlockReportContext context, boolean lastStorageInRpc) throws IOException {
     namesystem.writeLock();
     final long startTime = Time.monotonicNow(); //after acquiring write lock
     final long endTime;
@@ -2079,7 +2117,8 @@ public class BlockManager implements BlockStatsMXBean {
             nodeID.getDatanodeUuid());
         processFirstBlockReport(storageInfo, newReport);
       } else {
-        invalidatedBlocks = processReport(storageInfo, newReport);
+        invalidatedBlocks = processReport(storageInfo, newReport,
+            context != null ? context.isSorted() : false);
       }
       
       storageInfo.receivedBlockReport();
@@ -2149,6 +2188,9 @@ public class BlockManager implements BlockStatsMXBean {
       // TODO: remove this assumption in case we want to put a block on
       // more than one storage on a datanode (and because it's a difficult
       // assumption to really enforce)
+      // DatanodeStorageInfo must be removed using the iterator to avoid
+      // ConcurrentModificationException in the underlying storage
+      iter.remove();
       removeStoredBlock(block, zombie.getDatanodeDescriptor());
       Block b = getBlockOnStorage(block, zombie);
       if (b != null) {
@@ -2238,7 +2280,7 @@ public class BlockManager implements BlockStatsMXBean {
   
   private Collection<Block> processReport(
       final DatanodeStorageInfo storageInfo,
-      final BlockListAsLongs report) throws IOException {
+      final BlockListAsLongs report, final boolean sorted) throws IOException {
     // Normal case:
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
@@ -2248,9 +2290,29 @@ public class BlockManager implements BlockStatsMXBean {
     Collection<Block> toInvalidate = new LinkedList<>();
     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<>();
-    reportDiff(storageInfo, report,
-        toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-   
+
+    Iterable<BlockReportReplica> sortedReport;
+    if (!sorted) {
+      blockLog.warn("BLOCK* processReport: Report from the DataNode ({}) is "
+                    + "unsorted. This will cause overhead on the NameNode "
+                    + "which needs to sort the Full BR. Please update the "
+                    + "DataNode to the same version of Hadoop HDFS as the "
+                    + "NameNode ({}).",
+                    storageInfo.getDatanodeDescriptor().getDatanodeUuid(),
+                    VersionInfo.getVersion());
+      Set<BlockReportReplica> set = new FoldedTreeSet<>();
+      for (BlockReportReplica iblk : report) {
+        set.add(new BlockReportReplica(iblk));
+      }
+      sortedReport = set;
+    } else {
+      sortedReport = report;
+    }
+
+    reportDiffSorted(storageInfo, sortedReport,
+                     toAdd, toRemove, toInvalidate, toCorrupt, toUC);
+
+
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     // Process the blocks on each queue
     for (StatefulBlockInfo b : toUC) { 
@@ -2399,126 +2461,111 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
-  private void reportDiff(DatanodeStorageInfo storageInfo, 
-      BlockListAsLongs newReport,
+  private void reportDiffSorted(DatanodeStorageInfo storageInfo,
+      Iterable<BlockReportReplica> newReport,
       Collection<BlockInfoToAdd> toAdd,     // add to DatanodeDescriptor
       Collection<BlockInfo> toRemove,       // remove from DatanodeDescriptor
       Collection<Block> toInvalidate,       // should be removed from DN
       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
       Collection<StatefulBlockInfo> toUC) { // add to under-construction list
 
-    // place a delimiter in the list which separates blocks 
-    // that have been reported from those that have not
-    Block delimiterBlock = new Block();
-    BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
-        (short) 1);
-    AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
-    assert result == AddBlockResult.ADDED 
-        : "Delimiting block cannot be present in the node";
-    int headIndex = 0; //currently the delimiter is in the head of the list
-    int curIndex;
-
-    if (newReport == null) {
-      newReport = BlockListAsLongs.EMPTY;
-    }
-    // scan the report and process newly reported blocks
-    for (BlockReportReplica iblk : newReport) {
-      ReplicaState iState = iblk.getState();
-      BlockInfo storedBlock = processReportedBlock(storageInfo,
-          iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
-
-      // move block to the head of the list
-      if (storedBlock != null &&
-          (curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) {
-        headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
+    // The blocks must be sorted and the storagenodes blocks must be sorted
+    Iterator<BlockInfo> storageBlocksIterator = storageInfo.getBlockIterator();
+    DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
+    BlockInfo storageBlock = null;
+
+    for (BlockReportReplica replica : newReport) {
+
+      long replicaID = replica.getBlockId();
+      if (BlockIdManager.isStripedBlockID(replicaID)
+          && (!hasNonEcBlockUsingStripedID ||
+              !blocksMap.containsBlock(replica))) {
+        replicaID = BlockIdManager.convertToStripedID(replicaID);
+      }
+
+      ReplicaState reportedState = replica.getState();
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Reported block " + replica
+                  + " on " + dn + " size " + replica.getNumBytes()
+                  + " replicaState = " + reportedState);
+      }
+
+      if (shouldPostponeBlocksFromFuture
+          && isGenStampInFuture(replica)) {
+        queueReportedBlock(storageInfo, replica, reportedState,
+                           QUEUE_REASON_FUTURE_GENSTAMP);
+        continue;
+      }
+
+      if (storageBlock == null && storageBlocksIterator.hasNext()) {
+        storageBlock = storageBlocksIterator.next();
       }
+
+      do {
+        int cmp;
+        if (storageBlock == null ||
+            (cmp = Long.compare(replicaID, storageBlock.getBlockId())) < 0) {
+          // Check if block is available in NN but not yet on this storage
+          BlockInfo nnBlock = blocksMap.getStoredBlock(new Block(replicaID));
+          if (nnBlock != null) {
+            reportDiffSortedInner(storageInfo, replica, reportedState,
+                                  nnBlock, toAdd, toCorrupt, toUC);
+          } else {
+            // Replica not found anywhere so it should be invalidated
+            toInvalidate.add(new Block(replica));
+          }
+          break;
+        } else if (cmp == 0) {
+          // Replica matched current storageblock
+          reportDiffSortedInner(storageInfo, replica, reportedState,
+                                storageBlock, toAdd, toCorrupt, toUC);
+          storageBlock = null;
+        } else {
+          // replica has higher ID than storedBlock
+          // Remove all stored blocks with IDs lower than replica
+          do {
+            toRemove.add(storageBlock);
+            storageBlock = storageBlocksIterator.hasNext()
+                           ? storageBlocksIterator.next() : null;
+          } while (storageBlock != null &&
+                   Long.compare(replicaID, storageBlock.getBlockId()) > 0);
+        }
+      } while (storageBlock != null);
     }
 
-    // collect blocks that have not been reported
-    // all of them are next to the delimiter
-    Iterator<BlockInfo> it =
-        storageInfo.new BlockIterator(delimiter.getNext(0));
-    while (it.hasNext()) {
-      toRemove.add(it.next());
+    // Iterate any remaing blocks that have not been reported and remove them
+    while (storageBlocksIterator.hasNext()) {
+      toRemove.add(storageBlocksIterator.next());
     }
-    storageInfo.removeBlock(delimiter);
   }
 
-  /**
-   * Process a block replica reported by the data-node.
-   * No side effects except adding to the passed-in Collections.
-   * 
-   * <ol>
-   * <li>If the block is not known to the system (not in blocksMap) then the
-   * data-node should be notified to invalidate this block.</li>
-   * <li>If the reported replica is valid that is has the same generation stamp
-   * and length as recorded on the name-node, then the replica location should
-   * be added to the name-node.</li>
-   * <li>If the reported replica is not valid, then it is marked as corrupt,
-   * which triggers replication of the existing valid replicas.
-   * Corrupt replicas are removed from the system when the block
-   * is fully replicated.</li>
-   * <li>If the reported replica is for a block currently marked "under
-   * construction" in the NN, then it should be added to the 
-   * BlockUnderConstructionFeature's list of replicas.</li>
-   * </ol>
-   * 
-   * @param storageInfo DatanodeStorageInfo that sent the report.
-   * @param block reported block replica
-   * @param reportedState reported replica state
-   * @param toAdd add to DatanodeDescriptor
-   * @param toInvalidate missing blocks (not in the blocks map)
-   *        should be removed from the data-node
-   * @param toCorrupt replicas with unexpected length or generation stamp;
-   *        add to corrupt replicas
-   * @param toUC replicas of blocks currently under construction
-   * @return the up-to-date stored block, if it should be kept.
-   *         Otherwise, null.
-   */
-  private BlockInfo processReportedBlock(
+  private void reportDiffSortedInner(
       final DatanodeStorageInfo storageInfo,
-      final Block block, final ReplicaState reportedState,
+      final BlockReportReplica replica, final ReplicaState reportedState,
+      final BlockInfo storedBlock,
       final Collection<BlockInfoToAdd> toAdd,
-      final Collection<Block> toInvalidate, 
       final Collection<BlockToMarkCorrupt> toCorrupt,
       final Collection<StatefulBlockInfo> toUC) {
-    
-    DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
 
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Reported block " + block
-          + " on " + dn + " size " + block.getNumBytes()
-          + " replicaState = " + reportedState);
-    }
-  
-    if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) {
-      queueReportedBlock(storageInfo, block, reportedState,
-          QUEUE_REASON_FUTURE_GENSTAMP);
-      return null;
-    }
-    
-    // find block by blockId
-    BlockInfo storedBlock = getStoredBlock(block);
-    if(storedBlock == null) {
-      // If blocksMap does not contain reported block id,
-      // the replica should be removed from the data-node.
-      toInvalidate.add(new Block(block));
-      return null;
-    }
+    assert replica != null;
+    assert storedBlock != null;
+
+    DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
     BlockUCState ucState = storedBlock.getBlockUCState();
-    
+
     // Block is on the NN
-    if(LOG.isDebugEnabled()) {
+    if (LOG.isDebugEnabled()) {
       LOG.debug("In memory blockUCState = " + ucState);
     }
 
     // Ignore replicas already scheduled to be removed from the DN
-    if(invalidateBlocks.contains(dn, block)) {
-      return storedBlock;
+    if (invalidateBlocks.contains(dn, replica)) {
+      return;
     }
 
-    BlockToMarkCorrupt c = checkReplicaCorrupt(
-        block, reportedState, storedBlock, ucState, dn);
+    BlockToMarkCorrupt c = checkReplicaCorrupt(replica, reportedState,
+                                               storedBlock, ucState, dn);
     if (c != null) {
       if (shouldPostponeBlocksFromFuture) {
         // If the block is an out-of-date generation stamp or state,
@@ -2532,23 +2579,16 @@ public class BlockManager implements BlockStatsMXBean {
       } else {
         toCorrupt.add(c);
       }
-      return storedBlock;
-    }
-
-    if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
-      toUC.add(new StatefulBlockInfo(storedBlock,
-          new Block(block), reportedState));
-      return storedBlock;
-    }
-
-    // Add replica if appropriate. If the replica was previously corrupt
-    // but now okay, it might need to be updated.
-    if (reportedState == ReplicaState.FINALIZED
-        && (storedBlock.findStorageInfo(storageInfo) == -1 ||
-            corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
-      toAdd.add(new BlockInfoToAdd(storedBlock, block));
+    } else if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+      toUC.add(new StatefulBlockInfo(storedBlock, new Block(replica),
+          reportedState));
+    } else if (reportedState == ReplicaState.FINALIZED &&
+               (storedBlock.findStorageInfo(storageInfo) == -1 ||
+                corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
+      // Add replica if appropriate. If the replica was previously corrupt
+      // but now okay, it might need to be updated.
+      toAdd.add(new BlockInfoToAdd(storedBlock, replica));
     }
-    return storedBlock;
   }
 
   /**
@@ -2774,7 +2814,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // just add it
-    AddBlockResult result = storageInfo.addBlock(storedBlock, reported);
+    AddBlockResult result = storageInfo.addBlockInitial(storedBlock, reported);
 
     // Now check for completion of blocks and safe block count
     int numCurrentReplica = countLiveNodes(storedBlock);
@@ -3497,40 +3537,75 @@ public class BlockManager implements BlockStatsMXBean {
       DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, DatanodeDescriptor delHintNode)
       throws IOException {
-    // blockReceived reports a finalized block
-    Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
-    Collection<Block> toInvalidate = new LinkedList<Block>();
-    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
-    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+
     final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
 
-    processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
-        toCorrupt, toUC);
-    // the block is only in one of the to-do lists
-    // if it is in none then data-node already has it
-    assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
-      : "The block should be only in one of the lists.";
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Reported block " + block
+          + " on " + node + " size " + block.getNumBytes()
+          + " replicaState = " + reportedState);
+    }
 
-    for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b, storageInfo);
+    if (shouldPostponeBlocksFromFuture &&
+        isGenStampInFuture(block)) {
+      queueReportedBlock(storageInfo, block, reportedState,
+          QUEUE_REASON_FUTURE_GENSTAMP);
+      return;
     }
-    long numBlocksLogged = 0;
-    for (BlockInfoToAdd b : toAdd) {
-      addStoredBlock(b.stored, b.reported, storageInfo, delHintNode,
-          numBlocksLogged < maxNumBlocksToLog);
-      numBlocksLogged++;
+
+    // find block by blockId
+    BlockInfo storedBlock = getStoredBlock(block);
+    if(storedBlock == null) {
+      // If blocksMap does not contain reported block id,
+      // the replica should be removed from the data-node.
+      blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
+          "belong to any file", block, node, block.getNumBytes());
+      addToInvalidates(new Block(block), node);
+      return;
     }
-    if (numBlocksLogged > maxNumBlocksToLog) {
-      blockLog.debug("BLOCK* addBlock: logged info for {} of {} reported.",
-          maxNumBlocksToLog, numBlocksLogged);
+
+    BlockUCState ucState = storedBlock.getBlockUCState();
+    // Block is on the NN
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("In memory blockUCState = " + ucState);
     }
-    for (Block b : toInvalidate) {
-      blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
-          "belong to any file", b, node, b.getNumBytes());
-      addToInvalidates(b, node);
+
+    // Ignore replicas already scheduled to be removed from the DN
+    if(invalidateBlocks.contains(node, block)) {
+      return;
     }
-    for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, storageInfo, node);
+
+    BlockToMarkCorrupt c = checkReplicaCorrupt(
+        block, reportedState, storedBlock, ucState, node);
+    if (c != null) {
+      if (shouldPostponeBlocksFromFuture) {
+        // If the block is an out-of-date generation stamp or state,
+        // but we're the standby, we shouldn't treat it as corrupt,
+        // but instead just queue it for later processing.
+        // TODO: Pretty confident this should be s/storedBlock/block below,
+        // since we should be postponing the info of the reported block, not
+        // the stored block. See HDFS-6289 for more context.
+        queueReportedBlock(storageInfo, storedBlock, reportedState,
+            QUEUE_REASON_CORRUPT_STATE);
+      } else {
+        markBlockAsCorrupt(c, storageInfo, node);
+      }
+      return;
+    }
+
+    if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+      addStoredBlockUnderConstruction(
+          new StatefulBlockInfo(storedBlock, new Block(block), reportedState),
+          storageInfo);
+      return;
+    }
+
+    // Add replica if appropriate. If the replica was previously corrupt
+    // but now okay, it might need to be updated.
+    if (reportedState == ReplicaState.FINALIZED
+        && (storedBlock.findStorageInfo(storageInfo) == -1 ||
+            corruptReplicas.isReplicaCorrupt(storedBlock, node))) {
+      addStoredBlock(storedBlock, block, storageInfo, delHintNode, true);
     }
   }
 
@@ -4060,6 +4135,87 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
+  /**
+   * Runnable that monitors the fragmentation of the StorageInfo TreeSet and
+   * compacts it when it falls under a certain threshold.
+   */
+  private class StorageInfoDefragmenter implements Runnable {
+
+    @Override
+    public void run() {
+      while (namesystem.isRunning()) {
+        try {
+          // Check storage efficiency only when active NN is out of safe mode.
+          if (isPopulatingReplQueues()) {
+            scanAndCompactStorages();
+          }
+          Thread.sleep(storageInfoDefragmentInterval);
+        } catch (Throwable t) {
+          if (!namesystem.isRunning()) {
+            LOG.info("Stopping thread.");
+            if (!(t instanceof InterruptedException)) {
+              LOG.info("Received an exception while shutting down.", t);
+            }
+            break;
+          } else if (!checkNSRunning && t instanceof InterruptedException) {
+            LOG.info("Stopping for testing.");
+            break;
+          }
+          LOG.error("Thread received Runtime exception.", t);
+          terminate(1, t);
+        }
+      }
+    }
+
+    private void scanAndCompactStorages() throws InterruptedException {
+      ArrayList<String> datanodesAndStorages = new ArrayList<>();
+      for (DatanodeDescriptor node
+          : datanodeManager.getDatanodeListForReport(DatanodeReportType.ALL)) {
+        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+          try {
+            namesystem.readLock();
+            double ratio = storage.treeSetFillRatio();
+            if (ratio < storageInfoDefragmentRatio) {
+              datanodesAndStorages.add(node.getDatanodeUuid());
+              datanodesAndStorages.add(storage.getStorageID());
+            }
+            LOG.info("StorageInfo TreeSet fill ratio {} : {}{}",
+                     storage.getStorageID(), ratio,
+                     (ratio < storageInfoDefragmentRatio)
+                     ? " (queued for defragmentation)" : "");
+          } finally {
+            namesystem.readUnlock();
+          }
+        }
+      }
+      if (!datanodesAndStorages.isEmpty()) {
+        for (int i = 0; i < datanodesAndStorages.size(); i += 2) {
+          namesystem.writeLock();
+          try {
+            DatanodeStorageInfo storage = datanodeManager.
+                getDatanode(datanodesAndStorages.get(i)).
+                getStorageInfo(datanodesAndStorages.get(i + 1));
+            if (storage != null) {
+              boolean aborted =
+                  !storage.treeSetCompact(storageInfoDefragmentTimeout);
+              if (aborted) {
+                // Compaction timed out, reset iterator to continue with
+                // the same storage next iteration.
+                i -= 2;
+              }
+              LOG.info("StorageInfo TreeSet defragmented {} : {}{}",
+                       storage.getStorageID(), storage.treeSetFillRatio(),
+                       aborted ? " (aborted)" : "");
+            }
+          } finally {
+            namesystem.writeUnlock();
+          }
+          // Wait between each iteration
+          Thread.sleep(1000);
+        }
+      }
+    }
+  }
 
   /**
    * Compute block replication and block invalidation work that can be scheduled

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index 47a21fe..71d0598 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import java.util.Collections;
 import java.util.Iterator;
 
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -30,37 +31,6 @@ import org.apache.hadoop.util.LightWeightGSet;
  * the datanodes that store the block.
  */
 class BlocksMap {
-  private static class StorageIterator implements Iterator<DatanodeStorageInfo> {
-    private final BlockInfo blockInfo;
-    private int nextIdx = 0;
-      
-    StorageIterator(BlockInfo blkInfo) {
-      this.blockInfo = blkInfo;
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (blockInfo == null) {
-        return false;
-      }
-      while (nextIdx < blockInfo.getCapacity() &&
-          blockInfo.getDatanode(nextIdx) == null) {
-        // note that for striped blocks there may be null in the triplets
-        nextIdx++;
-      }
-      return nextIdx < blockInfo.getCapacity();
-    }
-
-    @Override
-    public DatanodeStorageInfo next() {
-      return blockInfo.getStorageInfo(nextIdx++);
-    }
-
-    @Override
-    public void remove()  {
-      throw new UnsupportedOperationException("Sorry. can't remove.");
-    }
-  }
 
   /** Constant {@link LightWeightGSet} capacity. */
   private final int capacity;
@@ -132,6 +102,16 @@ class BlocksMap {
     }
   }
 
+  /**
+   * Check if BlocksMap contains the block.
+   *
+   * @param b Block to check
+   * @return true if block is in the map, otherwise false
+   */
+  boolean containsBlock(Block b) {
+    return blocks.contains(b);
+  }
+
   /** Returns the block object if it exists in the map. */
   BlockInfo getStoredBlock(Block b) {
     return blocks.get(b);
@@ -142,7 +122,9 @@ class BlocksMap {
    * returns {@link Iterable} of the storages the block belongs to.
    */
   Iterable<DatanodeStorageInfo> getStorages(Block b) {
-    return getStorages(blocks.get(b));
+    BlockInfo block = blocks.get(b);
+    return block != null ? getStorages(block)
+           : Collections.<DatanodeStorageInfo>emptyList();
   }
 
   /**
@@ -150,12 +132,16 @@ class BlocksMap {
    * returns {@link Iterable} of the storages the block belongs to.
    */
   Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
-    return new Iterable<DatanodeStorageInfo>() {
-      @Override
-      public Iterator<DatanodeStorageInfo> iterator() {
-        return new StorageIterator(storedBlock);
-      }
-    };
+    if (storedBlock == null) {
+      return Collections.emptyList();
+    } else {
+      return new Iterable<DatanodeStorageInfo>() {
+        @Override
+        public Iterator<DatanodeStorageInfo> iterator() {
+          return storedBlock.getStorageInfos();
+        }
+      };
+    }
   }
 
   /** counts number of containing nodes. Better than using iterator. */
@@ -174,7 +160,7 @@ class BlocksMap {
     if (info == null)
       return false;
 
-    // remove block from the data-node list and the node from the block info
+    // remove block from the data-node set and the node from the block info
     boolean removed = removeBlock(node, info);
 
     if (info.hasNoStorage()    // no datanodes left
@@ -185,7 +171,7 @@ class BlocksMap {
   }
 
   /**
-   * Remove block from the list of blocks belonging to the data-node. Remove
+   * Remove block from the set of blocks belonging to the data-node. Remove
    * data-node from the block.
    */
   static boolean removeBlock(DatanodeDescriptor dn, BlockInfo b) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 1f1b24b..c4729ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.FoldedTreeSet;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -85,31 +86,6 @@ public class DatanodeStorageInfo {
     storageType = storage.getStorageType();
   }
 
-  /**
-   * Iterates over the list of blocks belonging to the data-node.
-   */
-  class BlockIterator implements Iterator<BlockInfo> {
-    private BlockInfo current;
-
-    BlockIterator(BlockInfo head) {
-      this.current = head;
-    }
-
-    public boolean hasNext() {
-      return current != null;
-    }
-
-    public BlockInfo next() {
-      BlockInfo res = current;
-      current = current.getNext(current.findStorageInfo(DatanodeStorageInfo.this));
-      return res;
-    }
-
-    public void remove() {
-      throw new UnsupportedOperationException("Sorry. can't remove.");
-    }
-  }
-
   private final DatanodeDescriptor dn;
   private final String storageID;
   private StorageType storageType;
@@ -120,8 +96,7 @@ public class DatanodeStorageInfo {
   private volatile long remaining;
   private long blockPoolUsed;
 
-  private volatile BlockInfo blockList = null;
-  private int numBlocks = 0;
+  private final FoldedTreeSet<BlockInfo> blocks = new FoldedTreeSet<>();
 
   // The ID of the last full block report which updated this storage.
   private long lastBlockReportId = 0;
@@ -207,7 +182,7 @@ public class DatanodeStorageInfo {
   }
 
   boolean areBlocksOnFailedStorage() {
-    return getState() == State.FAILED && numBlocks != 0;
+    return getState() == State.FAILED && !blocks.isEmpty();
   }
 
   @VisibleForTesting
@@ -234,6 +209,36 @@ public class DatanodeStorageInfo {
   long getBlockPoolUsed() {
     return blockPoolUsed;
   }
+  /**
+   * For use during startup. Expects block to be added in sorted order
+   * to enable fast insert in to the DatanodeStorageInfo
+   *
+   * @param b Block to add to DatanodeStorageInfo
+   * @param reportedBlock The reported replica
+   * @return Enum describing if block was added, replaced or already existed
+   */
+  public AddBlockResult addBlockInitial(BlockInfo b, Block reportedBlock) {
+    // First check whether the block belongs to a different storage
+    // on the same DN.
+    AddBlockResult result = AddBlockResult.ADDED;
+    DatanodeStorageInfo otherStorage =
+        b.findStorageInfo(getDatanodeDescriptor());
+
+    if (otherStorage != null) {
+      if (otherStorage != this) {
+        // The block belongs to a different storage. Remove it first.
+        otherStorage.removeBlock(b);
+        result = AddBlockResult.REPLACED;
+      } else {
+        // The block is already associated with this storage.
+        return AddBlockResult.ALREADY_EXIST;
+      }
+    }
+
+    b.addStorage(this, reportedBlock);
+    blocks.addSortedLast(b);
+    return result;
+  }
 
   public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
     // First check whether the block belongs to a different storage
@@ -253,9 +258,8 @@ public class DatanodeStorageInfo {
       }
     }
 
-    // add to the head of the data-node list
     b.addStorage(this, reportedBlock);
-    insertToList(b);
+    blocks.add(b);
     return result;
   }
 
@@ -263,45 +267,17 @@ public class DatanodeStorageInfo {
     return addBlock(b, b);
   }
 
-  public void insertToList(BlockInfo b) {
-    blockList = b.listInsert(blockList, this);
-    numBlocks++;
-  }
-
-  public boolean removeBlock(BlockInfo b) {
-    blockList = b.listRemove(blockList, this);
-    if (b.removeStorage(this)) {
-      numBlocks--;
-      return true;
-    } else {
-      return false;
-    }
+  boolean removeBlock(BlockInfo b) {
+    blocks.remove(b);
+    return b.removeStorage(this);
   }
 
   int numBlocks() {
-    return numBlocks;
+    return blocks.size();
   }
   
   Iterator<BlockInfo> getBlockIterator() {
-    return new BlockIterator(blockList);
-  }
-
-  /**
-   * Move block to the head of the list of blocks belonging to the data-node.
-   * @return the index of the head of the blockList
-   */
-  int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
-    blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex);
-    return curIndex;
-  }
-
-  /**
-   * Used for testing only
-   * @return the head of the blockList
-   */
-  @VisibleForTesting
-  BlockInfo getBlockListHeadForTesting(){
-    return blockList;
+    return blocks.iterator();
   }
 
   void updateState(StorageReport r) {
@@ -349,6 +325,27 @@ public class DatanodeStorageInfo {
         false, capacity, dfsUsed, remaining, blockPoolUsed);
   }
 
+  /**
+   * The fill ratio of the underlying TreeSet holding blocks.
+   *
+   * @return the fill ratio of the tree
+   */
+  public double treeSetFillRatio() {
+    return blocks.fillRatio();
+  }
+
+  /**
+   * Compact the underlying TreeSet holding blocks.
+   *
+   * @param timeout Maximum time to spend compacting the tree set in
+   *                milliseconds.
+   *
+   * @return true if compaction completed, false if aborted
+   */
+  public boolean treeSetCompact(long timeout) {
+    return blocks.compact(timeout);
+  }
+
   static Iterable<StorageType> toStorageTypes(
       final Iterable<DatanodeStorageInfo> infos) {
     return new Iterable<StorageType>() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 1b72961..bc4f2d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -461,7 +461,7 @@ class BPServiceActor implements Runnable {
         // Below split threshold, send all reports in a single message.
         DatanodeCommand cmd = bpNamenode.blockReport(
             bpRegistration, bpos.getBlockPoolId(), reports,
-              new BlockReportContext(1, 0, reportId, fullBrLeaseId));
+              new BlockReportContext(1, 0, reportId, fullBrLeaseId, true));
         numRPCs = 1;
         numReportsSent = reports.length;
         if (cmd != null) {
@@ -474,7 +474,7 @@ class BPServiceActor implements Runnable {
           DatanodeCommand cmd = bpNamenode.blockReport(
               bpRegistration, bpos.getBlockPoolId(), singleReport,
               new BlockReportContext(reports.length, r, reportId,
-                  fullBrLeaseId));
+                  fullBrLeaseId, true));
           numReportsSent++;
           numRPCs++;
           if (cmd != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index 6f0b8a7..34c9f2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -18,13 +18,14 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
-import org.apache.hadoop.util.LightWeightResizableGSet;
+import org.apache.hadoop.hdfs.util.FoldedTreeSet;
 
 /**
  * Maintains the replica map. 
@@ -33,9 +34,20 @@ class ReplicaMap {
   // Object using which this class is synchronized
   private final Object mutex;
   
-  // Map of block pool Id to another map of block Id to ReplicaInfo.
-  private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
-    new HashMap<String, LightWeightResizableGSet<Block, ReplicaInfo>>();
+  // Map of block pool Id to a set of ReplicaInfo.
+  private final Map<String, FoldedTreeSet<ReplicaInfo>> map = new HashMap<>();
+
+  // Special comparator used to compare Long to Block ID in the TreeSet.
+  private static final Comparator<Object> LONG_AND_BLOCK_COMPARATOR
+      = new Comparator<Object>() {
+
+        @Override
+        public int compare(Object o1, Object o2) {
+          long lookup = (long) o1;
+          long stored = ((Block) o2).getBlockId();
+          return lookup > stored ? 1 : lookup < stored ? -1 : 0;
+        }
+      };
 
   ReplicaMap(Object mutex) {
     if (mutex == null) {
@@ -92,11 +104,14 @@ class ReplicaMap {
   ReplicaInfo get(String bpid, long blockId) {
     checkBlockPool(bpid);
     synchronized(mutex) {
-      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
-      return m != null ? m.get(new Block(blockId)) : null;
+      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+      if (set == null) {
+        return null;
+      }
+      return set.get(blockId, LONG_AND_BLOCK_COMPARATOR);
     }
   }
-  
+
   /**
    * Add a replica's meta information into the map 
    * 
@@ -109,13 +124,13 @@ class ReplicaMap {
     checkBlockPool(bpid);
     checkBlock(replicaInfo);
     synchronized(mutex) {
-      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
-      if (m == null) {
+      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+      if (set == null) {
         // Add an entry for block pool if it does not exist already
-        m = new LightWeightResizableGSet<Block, ReplicaInfo>();
-        map.put(bpid, m);
+        set = new FoldedTreeSet<>();
+        map.put(bpid, set);
       }
-      return  m.put(replicaInfo);
+      return set.addOrReplace(replicaInfo);
     }
   }
 
@@ -138,12 +153,13 @@ class ReplicaMap {
     checkBlockPool(bpid);
     checkBlock(block);
     synchronized(mutex) {
-      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
-      if (m != null) {
-        ReplicaInfo replicaInfo = m.get(block);
+      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+      if (set != null) {
+        ReplicaInfo replicaInfo =
+            set.get(block.getBlockId(), LONG_AND_BLOCK_COMPARATOR);
         if (replicaInfo != null &&
             block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
-          return m.remove(block);
+          return set.removeAndGet(replicaInfo);
         }
       }
     }
@@ -160,9 +176,9 @@ class ReplicaMap {
   ReplicaInfo remove(String bpid, long blockId) {
     checkBlockPool(bpid);
     synchronized(mutex) {
-      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
-      if (m != null) {
-        return m.remove(new Block(blockId));
+      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+      if (set != null) {
+        return set.removeAndGet(blockId, LONG_AND_BLOCK_COMPARATOR);
       }
     }
     return null;
@@ -174,10 +190,9 @@ class ReplicaMap {
    * @return the number of replicas in the map
    */
   int size(String bpid) {
-    LightWeightResizableGSet<Block, ReplicaInfo> m = null;
     synchronized(mutex) {
-      m = map.get(bpid);
-      return m != null ? m.size() : 0;
+      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+      return set != null ? set.size() : 0;
     }
   }
   
@@ -192,19 +207,17 @@ class ReplicaMap {
    * @return a collection of the replicas belonging to the block pool
    */
   Collection<ReplicaInfo> replicas(String bpid) {
-    LightWeightResizableGSet<Block, ReplicaInfo> m = null;
-    m = map.get(bpid);
-    return m != null ? m.values() : null;
+    return map.get(bpid);
   }
 
   void initBlockPool(String bpid) {
     checkBlockPool(bpid);
     synchronized(mutex) {
-      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
-      if (m == null) {
+      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+      if (set == null) {
         // Add an entry for block pool if it does not exist already
-        m = new LightWeightResizableGSet<Block, ReplicaInfo>();
-        map.put(bpid, m);
+        set = new FoldedTreeSet<>();
+        map.put(bpid, set);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
index 5bcd719..94749e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
@@ -52,12 +52,16 @@ public class BlockReportContext {
    */
   private final long leaseId;
 
+  private final boolean sorted;
+
   public BlockReportContext(int totalRpcs, int curRpc,
-                            long reportId, long leaseId) {
+                            long reportId, long leaseId,
+                            boolean sorted) {
     this.totalRpcs = totalRpcs;
     this.curRpc = curRpc;
     this.reportId = reportId;
     this.leaseId = leaseId;
+    this.sorted = sorted;
   }
 
   public int getTotalRpcs() {
@@ -75,4 +79,8 @@ public class BlockReportContext {
   public long getLeaseId() {
     return leaseId;
   }
+
+  public boolean isSorted() {
+    return sorted;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index add4e73..b962855 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -131,7 +131,6 @@ public interface DatanodeProtocol {
    *     Each finalized block is represented as 3 longs. Each under-
    *     construction replica is represented as 4 longs.
    *     This is done instead of Block[] to reduce memory used by block reports.
-   * @param reports report of blocks per storage
    * @param context Context information for this block report.
    *
    * @return - the next command for DN to process.