You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by fe...@apache.org on 2021/06/17 03:08:46 UTC

[hadoop] branch trunk updated: HDFS-13671. Namenode deletes large dir slowly caused by FoldedTreeSet#removeAndGet (#3065)

This is an automated email from the ASF dual-hosted git repository.

ferhui pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 56d2497  HDFS-13671. Namenode deletes large dir slowly caused by FoldedTreeSet#removeAndGet (#3065)
56d2497 is described below

commit 56d249759fcee4d9c44b3e8f37ef316abab7e91f
Author: hust_hhb <28...@qq.com>
AuthorDate: Thu Jun 17 11:08:29 2021 +0800

    HDFS-13671. Namenode deletes large dir slowly caused by FoldedTreeSet#removeAndGet (#3065)
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   12 -
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java    |    5 +-
 .../hdfs/server/blockmanagement/BlockInfo.java     |  189 ++-
 .../blockmanagement/BlockInfoContiguous.java       |   29 +-
 .../server/blockmanagement/BlockInfoStriped.java   |   33 +-
 .../hdfs/server/blockmanagement/BlockManager.java  |  453 +++----
 .../hdfs/server/blockmanagement/BlocksMap.java     |   66 +-
 .../blockmanagement/DatanodeStorageInfo.java       |  127 +-
 .../hdfs/server/datanode/BPServiceActor.java       |    4 +-
 .../hdfs/server/datanode/DirectoryScanner.java     |    4 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java    |    7 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java     |    7 +-
 .../server/datanode/fsdataset/impl/ReplicaMap.java |   81 +-
 .../hadoop/hdfs/server/namenode/FSNamesystem.java  |    7 +-
 .../hdfs/server/protocol/BlockReportContext.java   |   10 +-
 .../hdfs/server/protocol/DatanodeProtocol.java     |    1 +
 .../org/apache/hadoop/hdfs/util/FoldedTreeSet.java | 1285 --------------------
 .../src/main/proto/DatanodeProtocol.proto          |    3 +-
 .../src/main/resources/hdfs-default.xml            |   26 -
 .../org/apache/hadoop/hdfs/TestCrcCorruption.java  |    2 +-
 .../hadoop/hdfs/TestReconstructStripedFile.java    |    2 +-
 .../hadoop/hdfs/protocol/TestBlockListAsLongs.java |    4 +-
 .../hdfs/server/blockmanagement/TestBlockInfo.java |   88 ++
 .../server/blockmanagement/TestBlockManager.java   |   70 +-
 .../blockmanagement/TestBlockReportLease.java      |    5 +-
 ...tReconstructStripedBlocksWithRackAwareness.java |    9 +-
 .../hdfs/server/datanode/SimulatedFSDataset.java   |    2 +-
 .../TestBlockHasMultipleReplicasOnSameDN.java      |    7 +-
 .../hdfs/server/datanode/TestLargeBlockReport.java |    5 +-
 .../TestNNHandlesBlockReportPerStorage.java        |    2 +-
 .../datanode/TestNNHandlesCombinedBlockReport.java |    2 +-
 .../datanode/extdataset/ExternalDatasetImpl.java   |    2 +-
 .../datanode/fsdataset/impl/TestFsDatasetImpl.java |   38 +-
 .../server/namenode/NNThroughputBenchmark.java     |    4 +-
 .../hdfs/server/namenode/TestAddStripedBlocks.java |    4 +-
 .../hdfs/server/namenode/TestDeadDatanode.java     |    2 +-
 .../apache/hadoop/hdfs/util/FoldedTreeSetTest.java |  644 ----------
 37 files changed, 621 insertions(+), 2620 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 77355be..bc371ea 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -291,18 +291,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;
   public static final String  DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit";
   public static final int     DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4;
-  public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY
-      = "dfs.namenode.storageinfo.defragment.interval.ms";
-  public static final int
-      DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT = 10 * 60 * 1000;
-  public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY
-      = "dfs.namenode.storageinfo.defragment.timeout.ms";
-  public static final int
-      DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT = 4;
-  public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY
-      = "dfs.namenode.storageinfo.defragment.ratio";
-  public static final double
-      DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT = 0.75;
   public static final String DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_KEY
       = "dfs.namenode.blockreport.queue.size";
   public static final int    DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index dff5fa5..690ad0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -967,8 +967,8 @@ public class PBHelper {
 
 
   public static BlockReportContext convert(BlockReportContextProto proto) {
-    return new BlockReportContext(proto.getTotalRpcs(), proto.getCurRpc(),
-        proto.getId(), proto.getLeaseId(), proto.getSorted());
+    return new BlockReportContext(proto.getTotalRpcs(),
+        proto.getCurRpc(), proto.getId(), proto.getLeaseId());
   }
 
   public static BlockReportContextProto convert(BlockReportContext context) {
@@ -977,7 +977,6 @@ public class PBHelper {
         setCurRpc(context.getCurRpc()).
         setId(context.getReportId()).
         setLeaseId(context.getLeaseId()).
-        setSorted(context.isSorted()).
         build();
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index c6a7bb5..b8047a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -19,8 +19,8 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.NoSuchElementException;
 
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -57,9 +57,19 @@ public abstract class BlockInfo extends Block
   /** For implementing {@link LightWeightGSet.LinkedElement} interface. */
   private LightWeightGSet.LinkedElement nextLinkedElement;
 
-
-  // Storages this block is replicated on
-  protected DatanodeStorageInfo[] storages;
+  /**
+   * This array contains triplets of references. For each i-th storage, the
+   * block belongs to triplets[3*i] is the reference to the
+   * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
+   * references to the previous and the next blocks, respectively, in the list
+   * of blocks belonging to this storage.
+   *
+   * Using previous and next in Object triplets is done instead of a
+   * {@link LinkedList} list to efficiently use memory. With LinkedList the cost
+   * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16
+   * bytes using the triplets.
+   */
+  protected Object[] triplets;
 
   private BlockUnderConstructionFeature uc;
 
@@ -69,14 +79,14 @@ public abstract class BlockInfo extends Block
    *             in the block group
    */
   public BlockInfo(short size) {
-    this.storages = new DatanodeStorageInfo[size];
+    this.triplets = new Object[3 * size];
     this.bcId = INVALID_INODE_ID;
     this.replication = isStriped() ? 0 : size;
   }
 
   public BlockInfo(Block blk, short size) {
     super(blk);
-    this.storages = new DatanodeStorageInfo[size];
+    this.triplets = new Object[3*size];
     this.bcId = INVALID_INODE_ID;
     this.replication = isStriped() ? 0 : size;
   }
@@ -106,31 +116,7 @@ public abstract class BlockInfo extends Block
   }
 
   public Iterator<DatanodeStorageInfo> getStorageInfos() {
-    return new Iterator<DatanodeStorageInfo>() {
-
-      private int index = 0;
-
-      @Override
-      public boolean hasNext() {
-        while (index < storages.length && storages[index] == null) {
-          index++;
-        }
-        return index < storages.length;
-      }
-
-      @Override
-      public DatanodeStorageInfo next() {
-        if (!hasNext()) {
-          throw new NoSuchElementException();
-        }
-        return storages[index++];
-      }
-
-      @Override
-      public void remove() {
-        throw new UnsupportedOperationException("Sorry. can't remove.");
-      }
-    };
+    return new BlocksMap.StorageIterator(this);
   }
 
   public DatanodeDescriptor getDatanode(int index) {
@@ -139,18 +125,73 @@ public abstract class BlockInfo extends Block
   }
 
   DatanodeStorageInfo getStorageInfo(int index) {
-    assert this.storages != null : "BlockInfo is not initialized";
-    return storages[index];
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
+    return (DatanodeStorageInfo)triplets[index*3];
+  }
+
+  BlockInfo getPrevious(int index) {
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
+    BlockInfo info = (BlockInfo)triplets[index*3+1];
+    assert info == null ||
+        info.getClass().getName().startsWith(BlockInfo.class.getName()) :
+        "BlockInfo is expected at " + index*3;
+    return info;
+  }
+
+  BlockInfo getNext(int index) {
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
+    BlockInfo info = (BlockInfo)triplets[index*3+2];
+    assert info == null || info.getClass().getName().startsWith(
+        BlockInfo.class.getName()) :
+        "BlockInfo is expected at " + index*3;
+    return info;
   }
 
   void setStorageInfo(int index, DatanodeStorageInfo storage) {
-    assert this.storages != null : "BlockInfo is not initialized";
-    this.storages[index] = storage;
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
+    triplets[index*3] = storage;
+  }
+
+  /**
+   * Return the previous block on the block list for the datanode at
+   * position index. Set the previous block on the list to "to".
+   *
+   * @param index - the datanode index
+   * @param to - block to be set to previous on the list of blocks
+   * @return current previous block on the list of blocks
+   */
+  BlockInfo setPrevious(int index, BlockInfo to) {
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
+    BlockInfo info = (BlockInfo) triplets[index*3+1];
+    triplets[index*3+1] = to;
+    return info;
+  }
+
+  /**
+   * Return the next block on the block list for the datanode at
+   * position index. Set the next block on the list to "to".
+   *
+   * @param index - the datanode index
+   * @param to - block to be set to next on the list of blocks
+   * @return current next block on the list of blocks
+   */
+  BlockInfo setNext(int index, BlockInfo to) {
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
+    BlockInfo info = (BlockInfo) triplets[index*3+2];
+    triplets[index*3+2] = to;
+    return info;
   }
 
   public int getCapacity() {
-    assert this.storages != null : "BlockInfo is not initialized";
-    return storages.length;
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+    return triplets.length / 3;
   }
 
   /**
@@ -227,6 +268,80 @@ public abstract class BlockInfo extends Block
     return -1;
   }
 
+  /**
+   * Insert this block into the head of the list of blocks
+   * related to the specified DatanodeStorageInfo.
+   * If the head is null then form a new list.
+   * @return current block as the new head of the list.
+   */
+  BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
+    int dnIndex = this.findStorageInfo(storage);
+    assert dnIndex >= 0 : "Data node is not found: current";
+    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
+        "Block is already in the list and cannot be inserted.";
+    this.setPrevious(dnIndex, null);
+    this.setNext(dnIndex, head);
+    if (head != null) {
+      head.setPrevious(head.findStorageInfo(storage), this);
+    }
+    return this;
+  }
+
+  /**
+   * Remove this block from the list of blocks
+   * related to the specified DatanodeStorageInfo.
+   * If this block is the head of the list then return the next block as
+   * the new head.
+   * @return the new head of the list or null if the list becomes
+   * empy after deletion.
+   */
+  BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) {
+    if (head == null) {
+      return null;
+    }
+    int dnIndex = this.findStorageInfo(storage);
+    if (dnIndex < 0) { // this block is not on the data-node list
+      return head;
+    }
+
+    BlockInfo next = this.getNext(dnIndex);
+    BlockInfo prev = this.getPrevious(dnIndex);
+    this.setNext(dnIndex, null);
+    this.setPrevious(dnIndex, null);
+    if (prev != null) {
+      prev.setNext(prev.findStorageInfo(storage), next);
+    }
+    if (next != null) {
+      next.setPrevious(next.findStorageInfo(storage), prev);
+    }
+    if (this == head) { // removing the head
+      head = next;
+    }
+    return head;
+  }
+
+  /**
+   * Remove this block from the list of blocks related to the specified
+   * DatanodeDescriptor. Insert it into the head of the list of blocks.
+   *
+   * @return the new head of the list.
+   */
+  public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage,
+      int curIndex, int headIndex) {
+    if (head == this) {
+      return this;
+    }
+    BlockInfo next = this.setNext(curIndex, head);
+    BlockInfo prev = this.setPrevious(curIndex, null);
+
+    head.setPrevious(headIndex, this);
+    prev.setNext(prev.findStorageInfo(storage), next);
+    if (next != null) {
+      next.setPrevious(next.findStorageInfo(storage), prev);
+    }
+    return this;
+  }
+
   @Override
   public int hashCode() {
     // Super implementation is sufficient
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index f830678..d68b72d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@ -38,20 +38,20 @@ public class BlockInfoContiguous extends BlockInfo {
   }
 
   /**
-   * Ensure that there is enough  space to include num more storages.
-   * @return first free storage index.
+   * Ensure that there is enough  space to include num more triplets.
+   * @return first free triplet index.
    */
   private int ensureCapacity(int num) {
-    assert this.storages != null : "BlockInfo is not initialized";
+    assert this.triplets != null : "BlockInfo is not initialized";
     int last = numNodes();
-    if (storages.length >= (last+num)) {
+    if (triplets.length >= (last+num)*3) {
       return last;
     }
     /* Not enough space left. Create a new array. Should normally
      * happen only when replication is manually increased by the user. */
-    DatanodeStorageInfo[] old = storages;
-    storages = new DatanodeStorageInfo[(last+num)];
-    System.arraycopy(old, 0, storages, 0, last);
+    Object[] old = triplets;
+    triplets = new Object[(last+num)*3];
+    System.arraycopy(old, 0, triplets, 0, last * 3);
     return last;
   }
 
@@ -63,6 +63,8 @@ public class BlockInfoContiguous extends BlockInfo {
     // find the last null node
     int lastNode = ensureCapacity(1);
     setStorageInfo(lastNode, storage);
+    setNext(lastNode, null);
+    setPrevious(lastNode, null);
     return true;
   }
 
@@ -72,12 +74,18 @@ public class BlockInfoContiguous extends BlockInfo {
     if (dnIndex < 0) { // the node is not found
       return false;
     }
+    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
+        "Block is still in the list and must be removed first.";
     // find the last not null node
     int lastNode = numNodes()-1;
-    // replace current node entry by the lastNode one
+    // replace current node triplet by the lastNode one
     setStorageInfo(dnIndex, getStorageInfo(lastNode));
-    // set the last entry to null
+    setNext(dnIndex, getNext(lastNode));
+    setPrevious(dnIndex, getPrevious(lastNode));
+    // set the last triplet to null
     setStorageInfo(lastNode, null);
+    setNext(lastNode, null);
+    setPrevious(lastNode, null);
     return true;
   }
 
@@ -96,7 +104,8 @@ public class BlockInfoContiguous extends BlockInfo {
 
   @Override
   public int numNodes() {
-    assert this.storages != null : "BlockInfo is not initialized";
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
 
     for (int idx = getCapacity()-1; idx >= 0; idx--) {
       if (getDatanode(idx) != null) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 5a13341..42d0471 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -32,20 +32,21 @@ import java.util.NoSuchElementException;
 /**
  * Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
  *
- * We still use a storage array to store DatanodeStorageInfo for each block in
- * the block group. For a (m+k) block group, the first (m+k) storage units
+ * We still use triplets to store DatanodeStorageInfo for each block in the
+ * block group, as well as the previous/next block in the corresponding
+ * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
  * are sorted and strictly mapped to the corresponding block.
  *
  * Normally each block belonging to group is stored in only one DataNode.
- * However, it is possible that some block is over-replicated. Thus the storage
+ * However, it is possible that some block is over-replicated. Thus the triplet
  * array's size can be larger than (m+k). Thus currently we use an extra byte
- * array to record the block index for each entry.
+ * array to record the block index for each triplet.
  */
 @InterfaceAudience.Private
 public class BlockInfoStriped extends BlockInfo {
   private final ErasureCodingPolicy ecPolicy;
   /**
-   * Always the same size with storage. Record the block index for each entry
+   * Always the same size with triplets. Record the block index for each triplet
    * TODO: actually this is only necessary for over-replicated block. Thus can
    * be further optimized to save memory usage.
    */
@@ -109,7 +110,7 @@ public class BlockInfoStriped extends BlockInfo {
         return i;
       }
     }
-    // need to expand the storage size
+    // need to expand the triplet size
     ensureCapacity(i + 1, true);
     return i;
   }
@@ -141,6 +142,8 @@ public class BlockInfoStriped extends BlockInfo {
   private void addStorage(DatanodeStorageInfo storage, int index,
       int blockIndex) {
     setStorageInfo(index, storage);
+    setNext(index, null);
+    setPrevious(index, null);
     indices[index] = (byte) blockIndex;
   }
 
@@ -183,22 +186,26 @@ public class BlockInfoStriped extends BlockInfo {
     if (dnIndex < 0) { // the node is not found
       return false;
     }
-    // set the entry to null
+    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
+        "Block is still in the list and must be removed first.";
+    // set the triplet to null
     setStorageInfo(dnIndex, null);
+    setNext(dnIndex, null);
+    setPrevious(dnIndex, null);
     indices[dnIndex] = -1;
     return true;
   }
 
   private void ensureCapacity(int totalSize, boolean keepOld) {
     if (getCapacity() < totalSize) {
-      DatanodeStorageInfo[] old = storages;
+      Object[] old = triplets;
       byte[] oldIndices = indices;
-      storages = new DatanodeStorageInfo[totalSize];
+      triplets = new Object[totalSize * 3];
       indices = new byte[totalSize];
       initIndices();
 
       if (keepOld) {
-        System.arraycopy(old, 0, storages, 0, old.length);
+        System.arraycopy(old, 0, triplets, 0, old.length);
         System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
       }
     }
@@ -225,7 +232,8 @@ public class BlockInfoStriped extends BlockInfo {
 
   @Override
   public int numNodes() {
-    assert this.storages != null : "BlockInfo is not initialized";
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
     int num = 0;
     for (int idx = getCapacity()-1; idx >= 0; idx--) {
       if (getStorageInfo(idx) != null) {
@@ -304,7 +312,8 @@ public class BlockInfoStriped extends BlockInfo {
               throw new NoSuchElementException();
             }
             int i = index++;
-            return new StorageAndBlockIndex(storages[i], indices[i]);
+            return new StorageAndBlockIndex(
+                (DatanodeStorageInfo) triplets[i * 3], indices[i]);
           }
 
           @Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index cc7b93f..ead915f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -68,7 +69,6 @@ import org.apache.hadoop.hdfs.protocol.BlockType;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -110,7 +110,6 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
-import org.apache.hadoop.hdfs.util.FoldedTreeSet;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.server.namenode.CacheManager;
 
@@ -125,7 +124,6 @@ import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.LightWeightGSet;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.VersionInfo;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
@@ -312,11 +310,6 @@ public class BlockManager implements BlockStatsMXBean {
   private int replQueueResetToHeadThreshold;
   private int replQueueCallsSinceReset = 0;
 
-  /** How often to check and the limit for the storageinfo efficiency. */
-  private final long storageInfoDefragmentInterval;
-  private final long storageInfoDefragmentTimeout;
-  private final double storageInfoDefragmentRatio;
-
   /**
    * Mapping: Block {@literal ->} { BlockCollection, datanodes, self ref }
    * Updated only in response to client-sent information.
@@ -331,10 +324,6 @@ public class BlockManager implements BlockStatsMXBean {
    * {@link #redundancyThread} has run at least one full iteration.
    */
   private final AtomicLong lastRedundancyCycleTS = new AtomicLong(-1);
-  /** StorageInfoDefragmenter thread. */
-  private final Daemon storageInfoDefragmenterThread =
-      new Daemon(new StorageInfoDefragmenter());
-  
   /** Block report thread for handling async reports. */
   private final BlockReportProcessingThread blockReportThread;
 
@@ -548,19 +537,6 @@ public class BlockManager implements BlockStatsMXBean {
         DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT,
         TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
 
-    this.storageInfoDefragmentInterval =
-      conf.getLong(
-          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY,
-          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT);
-    this.storageInfoDefragmentTimeout =
-      conf.getLong(
-          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY,
-          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT);
-    this.storageInfoDefragmentRatio =
-      conf.getDouble(
-          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY,
-          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT);
-
     this.encryptDataTransfer =
         conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
             DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
@@ -749,8 +725,6 @@ public class BlockManager implements BlockStatsMXBean {
     datanodeManager.activate(conf);
     this.redundancyThread.setName("RedundancyMonitor");
     this.redundancyThread.start();
-    storageInfoDefragmenterThread.setName("StorageInfoMonitor");
-    storageInfoDefragmenterThread.start();
     this.blockReportThread.start();
     mxBeanName = MBeans.register("NameNode", "BlockStats", this);
     bmSafeMode.activate(blockTotal);
@@ -763,10 +737,8 @@ public class BlockManager implements BlockStatsMXBean {
     bmSafeMode.close();
     try {
       redundancyThread.interrupt();
-      storageInfoDefragmenterThread.interrupt();
       blockReportThread.interrupt();
       redundancyThread.join(3000);
-      storageInfoDefragmenterThread.join(3000);
       blockReportThread.join(3000);
     } catch (InterruptedException ie) {
     }
@@ -1715,18 +1687,9 @@ public class BlockManager implements BlockStatsMXBean {
   /** Remove the blocks associated to the given datanode. */
   void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
     providedStorageMap.removeDatanode(node);
-    for (DatanodeStorageInfo storage : node.getStorageInfos()) {
-      final Iterator<BlockInfo> it = storage.getBlockIterator();
-      //add the BlockInfos to a new collection as the
-      //returned iterator is not modifiable.
-      Collection<BlockInfo> toRemove = new ArrayList<>();
-      while (it.hasNext()) {
-        toRemove.add(it.next());
-      }
-
-      for (BlockInfo b : toRemove) {
-        removeStoredBlock(b, node);
-      }
+    final Iterator<BlockInfo> it = node.getBlockIterator();
+    while(it.hasNext()) {
+      removeStoredBlock(it.next(), node);
     }
     // Remove all pending DN messages referencing this DN.
     pendingDNMessages.removeAllMessagesForDatanode(node);
@@ -1740,11 +1703,8 @@ public class BlockManager implements BlockStatsMXBean {
     assert namesystem.hasWriteLock();
     final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
-    Collection<BlockInfo> toRemove = new ArrayList<>();
-    while (it.hasNext()) {
-      toRemove.add(it.next());
-    }
-    for (BlockInfo block : toRemove) {
+    while(it.hasNext()) {
+      BlockInfo block = it.next();
       removeStoredBlock(block, node);
       final Block b = getBlockOnStorage(block, storageInfo);
       if (b != null) {
@@ -1908,7 +1868,7 @@ public class BlockManager implements BlockStatsMXBean {
         // stale storage due to failover or any other reason.
         corruptReplicas.removeFromCorruptReplicasMap(b.getStored(), node);
         BlockInfoStriped blk = (BlockInfoStriped) getStoredBlock(b.getStored());
-        blk.removeStorage(storageInfo);
+        storageInfo.removeBlock(blk);
       }
       // the block is over-replicated so invalidate the replicas immediately
       invalidateBlock(b, node, numberOfReplicas);
@@ -2824,7 +2784,7 @@ public class BlockManager implements BlockStatsMXBean {
         // Block reports for provided storage are not
         // maintained by DN heartbeats
         if (!StorageType.PROVIDED.equals(storageInfo.getStorageType())) {
-          invalidatedBlocks = processReport(storageInfo, newReport, context);
+          invalidatedBlocks = processReport(storageInfo, newReport);
         }
       }
       storageInfo.receivedBlockReport();
@@ -2921,8 +2881,7 @@ public class BlockManager implements BlockStatsMXBean {
   
   Collection<Block> processReport(
       final DatanodeStorageInfo storageInfo,
-      final BlockListAsLongs report,
-      BlockReportContext context) throws IOException {
+      final BlockListAsLongs report) throws IOException {
     // Normal case:
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
@@ -2932,36 +2891,8 @@ public class BlockManager implements BlockStatsMXBean {
     Collection<Block> toInvalidate = new ArrayList<>();
     Collection<BlockToMarkCorrupt> toCorrupt = new ArrayList<>();
     Collection<StatefulBlockInfo> toUC = new ArrayList<>();
-
-    boolean sorted = false;
-    String strBlockReportId = "";
-    if (context != null) {
-      sorted = context.isSorted();
-      strBlockReportId = Long.toHexString(context.getReportId());
-    }
-
-    Iterable<BlockReportReplica> sortedReport;
-    if (!sorted) {
-      blockLog.warn("BLOCK* processReport 0x{}: Report from the DataNode ({}) "
-                    + "is unsorted. This will cause overhead on the NameNode "
-                    + "which needs to sort the Full BR. Please update the "
-                    + "DataNode to the same version of Hadoop HDFS as the "
-                    + "NameNode ({}).",
-                    strBlockReportId,
-                    storageInfo.getDatanodeDescriptor().getDatanodeUuid(),
-                    VersionInfo.getVersion());
-      Set<BlockReportReplica> set = new FoldedTreeSet<>();
-      for (BlockReportReplica iblk : report) {
-        set.add(new BlockReportReplica(iblk));
-      }
-      sortedReport = set;
-    } else {
-      sortedReport = report;
-    }
-
-    reportDiffSorted(storageInfo, sortedReport,
-                     toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-
+    reportDiff(storageInfo, report,
+                 toAdd, toRemove, toInvalidate, toCorrupt, toUC);
 
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     // Process the blocks on each queue
@@ -2978,8 +2909,8 @@ public class BlockManager implements BlockStatsMXBean {
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
-      blockLog.info("BLOCK* processReport 0x{}: logged info for {} of {} " +
-          "reported.", strBlockReportId, maxNumBlocksToLog, numBlocksLogged);
+      blockLog.info("BLOCK* processReport: logged info for {} of {} " +
+          "reported.", maxNumBlocksToLog, numBlocksLogged);
     }
     for (Block b : toInvalidate) {
       addToInvalidates(b, node);
@@ -3111,106 +3042,127 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
-  private void reportDiffSorted(DatanodeStorageInfo storageInfo,
-      Iterable<BlockReportReplica> newReport,
+  private void reportDiff(DatanodeStorageInfo storageInfo,
+      BlockListAsLongs newReport,
       Collection<BlockInfoToAdd> toAdd,     // add to DatanodeDescriptor
       Collection<BlockInfo> toRemove,       // remove from DatanodeDescriptor
       Collection<Block> toInvalidate,       // should be removed from DN
       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
       Collection<StatefulBlockInfo> toUC) { // add to under-construction list
 
-    // The blocks must be sorted and the storagenodes blocks must be sorted
-    Iterator<BlockInfo> storageBlocksIterator = storageInfo.getBlockIterator();
+    // place a delimiter in the list which separates blocks
+    // that have been reported from those that have not
     DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
-    BlockInfo storageBlock = null;
-
-    for (BlockReportReplica replica : newReport) {
-
-      long replicaID = replica.getBlockId();
-      if (BlockIdManager.isStripedBlockID(replicaID)
-          && (!hasNonEcBlockUsingStripedID ||
-              !blocksMap.containsBlock(replica))) {
-        replicaID = BlockIdManager.convertToStripedID(replicaID);
-      }
-
-      ReplicaState reportedState = replica.getState();
-
-      LOG.debug("Reported block {} on {} size {} replicaState = {}",
-          replica, dn, replica.getNumBytes(), reportedState);
-
-      if (shouldPostponeBlocksFromFuture
-          && isGenStampInFuture(replica)) {
-        queueReportedBlock(storageInfo, replica, reportedState,
-                           QUEUE_REASON_FUTURE_GENSTAMP);
-        continue;
-      }
-
-      if (storageBlock == null && storageBlocksIterator.hasNext()) {
-        storageBlock = storageBlocksIterator.next();
-      }
-
-      do {
-        int cmp;
-        if (storageBlock == null ||
-            (cmp = Long.compare(replicaID, storageBlock.getBlockId())) < 0) {
-          // Check if block is available in NN but not yet on this storage
-          BlockInfo nnBlock = blocksMap.getStoredBlock(new Block(replicaID));
-          if (nnBlock != null) {
-            reportDiffSortedInner(storageInfo, replica, reportedState,
-                                  nnBlock, toAdd, toCorrupt, toUC);
-          } else {
-            // Replica not found anywhere so it should be invalidated
-            toInvalidate.add(new Block(replica));
-          }
-          break;
-        } else if (cmp == 0) {
-          // Replica matched current storageblock
-          reportDiffSortedInner(storageInfo, replica, reportedState,
-                                storageBlock, toAdd, toCorrupt, toUC);
-          storageBlock = null;
-        } else {
-          // replica has higher ID than storedBlock
-          // Remove all stored blocks with IDs lower than replica
-          do {
-            toRemove.add(storageBlock);
-            storageBlock = storageBlocksIterator.hasNext()
-                           ? storageBlocksIterator.next() : null;
-          } while (storageBlock != null &&
-                   Long.compare(replicaID, storageBlock.getBlockId()) > 0);
+    Block delimiterBlock = new Block();
+    BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
+        (short) 1);
+    AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
+    assert result == AddBlockResult.ADDED
+        : "Delimiting block cannot be present in the node";
+    int headIndex = 0; //currently the delimiter is in the head of the list
+    int curIndex;
+
+    if (newReport == null) {
+      newReport = BlockListAsLongs.EMPTY;
+    }
+    // scan the report and process newly reported blocks
+    for (BlockReportReplica iblk : newReport) {
+      ReplicaState iState = iblk.getState();
+      LOG.debug("Reported block {} on {} size {} replicaState = {}", iblk, dn,
+          iblk.getNumBytes(), iState);
+      BlockInfo storedBlock = processReportedBlock(storageInfo,
+          iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
+
+      // move block to the head of the list
+      if (storedBlock != null) {
+        curIndex = storedBlock.findStorageInfo(storageInfo);
+        if (curIndex >= 0) {
+          headIndex =
+              storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
         }
-      } while (storageBlock != null);
+      }
     }
 
-    // Iterate any remaining blocks that have not been reported and remove them
-    while (storageBlocksIterator.hasNext()) {
-      toRemove.add(storageBlocksIterator.next());
+    // collect blocks that have not been reported
+    // all of them are next to the delimiter
+    Iterator<BlockInfo> it =
+        storageInfo.new BlockIterator(delimiter.getNext(0));
+    while (it.hasNext()) {
+      toRemove.add(it.next());
     }
+    storageInfo.removeBlock(delimiter);
   }
 
-  private void reportDiffSortedInner(
+  /**
+   * Process a block replica reported by the data-node.
+   * No side effects except adding to the passed-in Collections.
+   *
+   * <ol>
+   * <li>If the block is not known to the system (not in blocksMap) then the
+   * data-node should be notified to invalidate this block.</li>
+   * <li>If the reported replica is valid that is has the same generation stamp
+   * and length as recorded on the name-node, then the replica location should
+   * be added to the name-node.</li>
+   * <li>If the reported replica is not valid, then it is marked as corrupt,
+   * which triggers replication of the existing valid replicas.
+   * Corrupt replicas are removed from the system when the block
+   * is fully replicated.</li>
+   * <li>If the reported replica is for a block currently marked "under
+   * construction" in the NN, then it should be added to the
+   * BlockUnderConstructionFeature's list of replicas.</li>
+   * </ol>
+   *
+   * @param storageInfo DatanodeStorageInfo that sent the report.
+   * @param block reported block replica
+   * @param reportedState reported replica state
+   * @param toAdd add to DatanodeDescriptor
+   * @param toInvalidate missing blocks (not in the blocks map)
+   *        should be removed from the data-node
+   * @param toCorrupt replicas with unexpected length or generation stamp;
+   *        add to corrupt replicas
+   * @param toUC replicas of blocks currently under construction
+   * @return the up-to-date stored block, if it should be kept.
+   *         Otherwise, null.
+   */
+  private BlockInfo processReportedBlock(
       final DatanodeStorageInfo storageInfo,
-      final BlockReportReplica replica, final ReplicaState reportedState,
-      final BlockInfo storedBlock,
+      final Block block, final ReplicaState reportedState,
       final Collection<BlockInfoToAdd> toAdd,
+      final Collection<Block> toInvalidate,
       final Collection<BlockToMarkCorrupt> toCorrupt,
       final Collection<StatefulBlockInfo> toUC) {
 
-    assert replica != null;
-    assert storedBlock != null;
-
     DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
+
+    LOG.debug("Reported block {} on {} size {} replicaState = {}", block, dn,
+        block.getNumBytes(), reportedState);
+
+    if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) {
+      queueReportedBlock(storageInfo, block, reportedState,
+          QUEUE_REASON_FUTURE_GENSTAMP);
+      return null;
+    }
+
+    // find block by blockId
+    BlockInfo storedBlock = getStoredBlock(block);
+    if(storedBlock == null) {
+      // If blocksMap does not contain reported block id,
+      // the replica should be removed from the data-node.
+      toInvalidate.add(new Block(block));
+      return null;
+    }
     BlockUCState ucState = storedBlock.getBlockUCState();
 
     // Block is on the NN
     LOG.debug("In memory blockUCState = {}", ucState);
 
     // Ignore replicas already scheduled to be removed from the DN
-    if (invalidateBlocks.contains(dn, replica)) {
-      return;
+    if(invalidateBlocks.contains(dn, block)) {
+      return storedBlock;
     }
 
-    BlockToMarkCorrupt c = checkReplicaCorrupt(replica, reportedState,
-                                               storedBlock, ucState, dn);
+    BlockToMarkCorrupt c = checkReplicaCorrupt(
+        block, reportedState, storedBlock, ucState, dn);
     if (c != null) {
       if (shouldPostponeBlocksFromFuture) {
         // If the block is an out-of-date generation stamp or state,
@@ -3220,21 +3172,28 @@ public class BlockManager implements BlockStatsMXBean {
         // comes from the IBR / FBR and hence what we should use to compare
         // against the memory state.
         // See HDFS-6289 and HDFS-15422 for more context.
-        queueReportedBlock(storageInfo, replica, reportedState,
+        queueReportedBlock(storageInfo, block, reportedState,
             QUEUE_REASON_CORRUPT_STATE);
       } else {
         toCorrupt.add(c);
       }
-    } else if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
-      toUC.add(new StatefulBlockInfo(storedBlock, new Block(replica),
-          reportedState));
-    } else if (reportedState == ReplicaState.FINALIZED &&
-               (storedBlock.findStorageInfo(storageInfo) == -1 ||
-                corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
-      // Add replica if appropriate. If the replica was previously corrupt
-      // but now okay, it might need to be updated.
-      toAdd.add(new BlockInfoToAdd(storedBlock, new Block(replica)));
+      return storedBlock;
     }
+
+    if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+      toUC.add(new StatefulBlockInfo(storedBlock,
+          new Block(block), reportedState));
+      return storedBlock;
+    }
+
+    // Add replica if appropriate. If the replica was previously corrupt
+    // but now okay, it might need to be updated.
+    if (reportedState == ReplicaState.FINALIZED
+        && (storedBlock.findStorageInfo(storageInfo) == -1 ||
+        corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
+      toAdd.add(new BlockInfoToAdd(storedBlock, new Block(block)));
+    }
+    return storedBlock;
   }
 
   /**
@@ -3477,7 +3436,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // just add it
-    AddBlockResult result = storageInfo.addBlockInitial(storedBlock, reported);
+    AddBlockResult result = storageInfo.addBlock(storedBlock, reported);
 
     // Now check for completion of blocks and safe block count
     int numCurrentReplica = countLiveNodes(storedBlock);
@@ -4242,6 +4201,12 @@ public class BlockManager implements BlockStatsMXBean {
       DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, DatanodeDescriptor delHintNode)
       throws IOException {
+    // blockReceived reports a finalized block
+    Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
+    Collection<Block> toInvalidate = new LinkedList<Block>();
+    Collection<BlockToMarkCorrupt> toCorrupt =
+        new LinkedList<BlockToMarkCorrupt>();
+    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
 
     final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
 
@@ -4255,58 +4220,33 @@ public class BlockManager implements BlockStatsMXBean {
       return false;
     }
 
-    // find block by blockId
-    BlockInfo storedBlock = getStoredBlock(block);
-    if(storedBlock == null) {
-      // If blocksMap does not contain reported block id,
-      // the replica should be removed from the data-node.
-      blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
-          "belong to any file", block, node, block.getNumBytes());
-      addToInvalidates(new Block(block), node);
-      return true;
-    }
-
-    BlockUCState ucState = storedBlock.getBlockUCState();
-    // Block is on the NN
-    LOG.debug("In memory blockUCState = {}", ucState);
+    processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
+        toCorrupt, toUC);
+    // the block is only in one of the to-do lists
+    // if it is in none then data-node already has it
+    assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt
+        .size() <= 1 : "The block should be only in one of the lists.";
 
-    // Ignore replicas already scheduled to be removed from the DN
-    if(invalidateBlocks.contains(node, block)) {
-      return true;
+    for (StatefulBlockInfo b : toUC) {
+      addStoredBlockUnderConstruction(b, storageInfo);
     }
-
-    BlockToMarkCorrupt c = checkReplicaCorrupt(
-        block, reportedState, storedBlock, ucState, node);
-    if (c != null) {
-      if (shouldPostponeBlocksFromFuture) {
-        // If the block is an out-of-date generation stamp or state,
-        // but we're the standby, we shouldn't treat it as corrupt,
-        // but instead just queue it for later processing.
-        // Storing the reported block for later processing, as that is what
-        // comes from the IBR / FBR and hence what we should use to compare
-        // against the memory state.
-        // See HDFS-6289 and HDFS-15422 for more context.
-        queueReportedBlock(storageInfo, block, reportedState,
-            QUEUE_REASON_CORRUPT_STATE);
-      } else {
-        markBlockAsCorrupt(c, storageInfo, node);
-      }
-      return true;
+    long numBlocksLogged = 0;
+    for (BlockInfoToAdd b : toAdd) {
+      addStoredBlock(b.stored, b.reported, storageInfo, delHintNode,
+          numBlocksLogged < maxNumBlocksToLog);
+      numBlocksLogged++;
     }
-
-    if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
-      addStoredBlockUnderConstruction(
-          new StatefulBlockInfo(storedBlock, new Block(block), reportedState),
-          storageInfo);
-      return true;
+    if (numBlocksLogged > maxNumBlocksToLog) {
+      blockLog.debug("BLOCK* addBlock: logged info for {} of {} reported.",
+          maxNumBlocksToLog, numBlocksLogged);
     }
-
-    // Add replica if appropriate. If the replica was previously corrupt
-    // but now okay, it might need to be updated.
-    if (reportedState == ReplicaState.FINALIZED
-        && (storedBlock.findStorageInfo(storageInfo) == -1 ||
-            corruptReplicas.isReplicaCorrupt(storedBlock, node))) {
-      addStoredBlock(storedBlock, block, storageInfo, delHintNode, true);
+    for (Block b : toInvalidate) {
+      blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
+          "belong to any file", b, node, b.getNumBytes());
+      addToInvalidates(b, node);
+    }
+    for (BlockToMarkCorrupt b : toCorrupt) {
+      markBlockAsCorrupt(b, storageInfo, node);
     }
     return true;
   }
@@ -5006,91 +4946,6 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
-  /**
-   * Runnable that monitors the fragmentation of the StorageInfo TreeSet and
-   * compacts it when it falls under a certain threshold.
-   */
-  private class StorageInfoDefragmenter implements Runnable {
-
-    @Override
-    public void run() {
-      while (namesystem.isRunning()) {
-        try {
-          // Check storage efficiency only when active NN is out of safe mode.
-          if (isPopulatingReplQueues()) {
-            scanAndCompactStorages();
-          }
-          Thread.sleep(storageInfoDefragmentInterval);
-        } catch (Throwable t) {
-          if (!namesystem.isRunning()) {
-            LOG.info("Stopping thread.");
-            if (!(t instanceof InterruptedException)) {
-              LOG.info("Received an exception while shutting down.", t);
-            }
-            break;
-          } else if (!checkNSRunning && t instanceof InterruptedException) {
-            LOG.info("Stopping for testing.");
-            break;
-          }
-          LOG.error("Thread received Runtime exception.", t);
-          terminate(1, t);
-        }
-      }
-    }
-
-    private void scanAndCompactStorages() throws InterruptedException {
-      ArrayList<String> datanodesAndStorages = new ArrayList<>();
-      for (DatanodeDescriptor node
-          : datanodeManager.getDatanodeListForReport(DatanodeReportType.ALL)) {
-        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
-          try {
-            namesystem.readLock();
-            double ratio = storage.treeSetFillRatio();
-            if (ratio < storageInfoDefragmentRatio) {
-              datanodesAndStorages.add(node.getDatanodeUuid());
-              datanodesAndStorages.add(storage.getStorageID());
-            }
-            LOG.debug("StorageInfo TreeSet fill ratio {} : {}{}",
-                     storage.getStorageID(), ratio,
-                     (ratio < storageInfoDefragmentRatio)
-                     ? " (queued for defragmentation)" : "");
-          } finally {
-            namesystem.readUnlock();
-          }
-        }
-      }
-      if (!datanodesAndStorages.isEmpty()) {
-        for (int i = 0; i < datanodesAndStorages.size(); i += 2) {
-          namesystem.writeLock();
-          try {
-            final DatanodeDescriptor dn = datanodeManager.
-                getDatanode(datanodesAndStorages.get(i));
-            if (dn == null) {
-              continue;
-            }
-            final DatanodeStorageInfo storage = dn.
-                getStorageInfo(datanodesAndStorages.get(i + 1));
-            if (storage != null) {
-              boolean aborted =
-                  !storage.treeSetCompact(storageInfoDefragmentTimeout);
-              if (aborted) {
-                // Compaction timed out, reset iterator to continue with
-                // the same storage next iteration.
-                i -= 2;
-              }
-              LOG.info("StorageInfo TreeSet defragmented {} : {}{}",
-                       storage.getStorageID(), storage.treeSetFillRatio(),
-                       aborted ? " (aborted)" : "");
-            }
-          } finally {
-            namesystem.writeUnlock();
-          }
-          // Wait between each iteration
-          Thread.sleep(1000);
-        }
-      }
-    }
-  }
 
   /**
    * Compute block replication and block invalidation work that can be scheduled
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index a96c815..9deeb41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.concurrent.atomic.LongAdder;
 
@@ -32,6 +31,37 @@ import org.apache.hadoop.util.LightWeightGSet;
  * the datanodes that store the block.
  */
 class BlocksMap {
+  public static class StorageIterator implements Iterator<DatanodeStorageInfo> {
+    private final BlockInfo blockInfo;
+    private int nextIdx = 0;
+
+    StorageIterator(BlockInfo blkInfo) {
+      this.blockInfo = blkInfo;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (blockInfo == null) {
+        return false;
+      }
+      while (nextIdx < blockInfo.getCapacity() &&
+          blockInfo.getDatanode(nextIdx) == null) {
+        // note that for striped blocks there may be null in the triplets
+        nextIdx++;
+      }
+      return nextIdx < blockInfo.getCapacity();
+    }
+
+    @Override
+    public DatanodeStorageInfo next() {
+      return blockInfo.getStorageInfo(nextIdx++);
+    }
+
+    @Override
+    public void remove()  {
+      throw new UnsupportedOperationException("Sorry. can't remove.");
+    }
+  }
 
   /** Constant {@link LightWeightGSet} capacity. */
   private final int capacity;
@@ -111,16 +141,6 @@ class BlocksMap {
     }
   }
 
-  /**
-   * Check if BlocksMap contains the block.
-   *
-   * @param b Block to check
-   * @return true if block is in the map, otherwise false
-   */
-  boolean containsBlock(Block b) {
-    return blocks.contains(b);
-  }
-
   /** Returns the block object if it exists in the map. */
   BlockInfo getStoredBlock(Block b) {
     return blocks.get(b);
@@ -131,9 +151,7 @@ class BlocksMap {
    * returns {@link Iterable} of the storages the block belongs to.
    */
   Iterable<DatanodeStorageInfo> getStorages(Block b) {
-    BlockInfo block = blocks.get(b);
-    return block != null ? getStorages(block)
-           : Collections.<DatanodeStorageInfo>emptyList();
+    return getStorages(blocks.get(b));
   }
 
   /**
@@ -141,16 +159,12 @@ class BlocksMap {
    * returns {@link Iterable} of the storages the block belongs to.
    */
   Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
-    if (storedBlock == null) {
-      return Collections.emptyList();
-    } else {
-      return new Iterable<DatanodeStorageInfo>() {
-        @Override
-        public Iterator<DatanodeStorageInfo> iterator() {
-          return storedBlock.getStorageInfos();
-        }
-      };
-    }
+    return new Iterable<DatanodeStorageInfo>() {
+      @Override
+      public Iterator<DatanodeStorageInfo> iterator() {
+        return new StorageIterator(storedBlock);
+      }
+    };
   }
 
   /** counts number of containing nodes. Better than using iterator. */
@@ -169,7 +183,7 @@ class BlocksMap {
     if (info == null)
       return false;
 
-    // remove block from the data-node set and the node from the block info
+    // remove block from the data-node list and the node from the block info
     boolean removed = removeBlock(node, info);
 
     if (info.hasNoStorage()    // no datanodes left
@@ -181,7 +195,7 @@ class BlocksMap {
   }
 
   /**
-   * Remove block from the set of blocks belonging to the data-node. Remove
+   * Remove block from the list of blocks belonging to the data-node. Remove
    * data-node from the block.
    */
   static boolean removeBlock(DatanodeDescriptor dn, BlockInfo b) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 8e46a26..188ea11 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -28,7 +27,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
-import org.apache.hadoop.hdfs.util.FoldedTreeSet;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 
@@ -87,6 +85,32 @@ public class DatanodeStorageInfo {
     storageType = storage.getStorageType();
   }
 
+  /**
+   * Iterates over the list of blocks belonging to the data-node.
+   */
+  class BlockIterator implements Iterator<BlockInfo> {
+    private BlockInfo current;
+
+    BlockIterator(BlockInfo head) {
+      this.current = head;
+    }
+
+    public boolean hasNext() {
+      return current != null;
+    }
+
+    public BlockInfo next() {
+      BlockInfo res = current;
+      current =
+          current.getNext(current.findStorageInfo(DatanodeStorageInfo.this));
+      return res;
+    }
+
+    public void remove() {
+      throw new UnsupportedOperationException("Sorry. can't remove.");
+    }
+  }
+
   private final DatanodeDescriptor dn;
   private final String storageID;
   private StorageType storageType;
@@ -98,7 +122,8 @@ public class DatanodeStorageInfo {
   private volatile long remaining;
   private long blockPoolUsed;
 
-  private final FoldedTreeSet<BlockInfo> blocks = new FoldedTreeSet<>();
+  private volatile BlockInfo blockList = null;
+  private int numBlocks = 0;
 
   /** The number of block reports received */
   private int blockReportCount = 0;
@@ -182,7 +207,7 @@ public class DatanodeStorageInfo {
   }
 
   boolean areBlocksOnFailedStorage() {
-    return getState() == State.FAILED && !blocks.isEmpty();
+    return getState() == State.FAILED && numBlocks != 0;
   }
 
   @VisibleForTesting
@@ -213,36 +238,6 @@ public class DatanodeStorageInfo {
   long getBlockPoolUsed() {
     return blockPoolUsed;
   }
-  /**
-   * For use during startup. Expects block to be added in sorted order
-   * to enable fast insert in to the DatanodeStorageInfo
-   *
-   * @param b Block to add to DatanodeStorageInfo
-   * @param reportedBlock The reported replica
-   * @return Enum describing if block was added, replaced or already existed
-   */
-  public AddBlockResult addBlockInitial(BlockInfo b, Block reportedBlock) {
-    // First check whether the block belongs to a different storage
-    // on the same DN.
-    AddBlockResult result = AddBlockResult.ADDED;
-    DatanodeStorageInfo otherStorage =
-        b.findStorageInfo(getDatanodeDescriptor());
-
-    if (otherStorage != null) {
-      if (otherStorage != this) {
-        // The block belongs to a different storage. Remove it first.
-        otherStorage.removeBlock(b);
-        result = AddBlockResult.REPLACED;
-      } else {
-        // The block is already associated with this storage.
-        return AddBlockResult.ALREADY_EXIST;
-      }
-    }
-
-    b.addStorage(this, reportedBlock);
-    blocks.addSortedLast(b);
-    return result;
-  }
 
   public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
     // First check whether the block belongs to a different storage
@@ -262,8 +257,9 @@ public class DatanodeStorageInfo {
       }
     }
 
+    // add to the head of the data-node list
     b.addStorage(this, reportedBlock);
-    blocks.add(b);
+    insertToList(b);
     return result;
   }
 
@@ -271,21 +267,45 @@ public class DatanodeStorageInfo {
     return addBlock(b, b);
   }
 
+  public void insertToList(BlockInfo b) {
+    blockList = b.listInsert(blockList, this);
+    numBlocks++;
+  }
   boolean removeBlock(BlockInfo b) {
-    blocks.remove(b);
-    return b.removeStorage(this);
+    blockList = b.listRemove(blockList, this);
+    if (b.removeStorage(this)) {
+      numBlocks--;
+      return true;
+    } else {
+      return false;
+    }
   }
 
   int numBlocks() {
-    return blocks.size();
+    return numBlocks;
   }
-  
+
+  Iterator<BlockInfo> getBlockIterator() {
+    return new BlockIterator(blockList);
+  }
+
   /**
-   * @return iterator to an unmodifiable set of blocks
-   * related to this {@link DatanodeStorageInfo}
+   * Move block to the head of the list of blocks belonging to the data-node.
+   * @return the index of the head of the blockList
    */
-  Iterator<BlockInfo> getBlockIterator() {
-    return Collections.unmodifiableSet(blocks).iterator();
+  int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
+    blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex);
+    return curIndex;
+  }
+
+
+  /**
+   * Used for testing only.
+   * @return the head of the blockList
+   */
+  @VisibleForTesting
+  BlockInfo getBlockListHeadForTesting(){
+    return blockList;
   }
 
   void updateState(StorageReport r) {
@@ -344,27 +364,6 @@ public class DatanodeStorageInfo {
         false, capacity, dfsUsed, remaining, blockPoolUsed, nonDfsUsed);
   }
 
-  /**
-   * The fill ratio of the underlying TreeSet holding blocks.
-   *
-   * @return the fill ratio of the tree
-   */
-  public double treeSetFillRatio() {
-    return blocks.fillRatio();
-  }
-
-  /**
-   * Compact the underlying TreeSet holding blocks.
-   *
-   * @param timeout Maximum time to spend compacting the tree set in
-   *                milliseconds.
-   *
-   * @return true if compaction completed, false if aborted
-   */
-  public boolean treeSetCompact(long timeout) {
-    return blocks.compact(timeout);
-  }
-
   static Iterable<StorageType> toStorageTypes(
       final Iterable<DatanodeStorageInfo> infos) {
     return new Iterable<StorageType>() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 2d24c0f..5d3b1ba 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -413,7 +413,7 @@ class BPServiceActor implements Runnable {
         // Below split threshold, send all reports in a single message.
         DatanodeCommand cmd = bpNamenode.blockReport(
             bpRegistration, bpos.getBlockPoolId(), reports,
-              new BlockReportContext(1, 0, reportId, fullBrLeaseId, true));
+            new BlockReportContext(1, 0, reportId, fullBrLeaseId));
         blockReportSizes.add(
             calculateBlockReportPBSize(useBlocksBuffer, reports));
         numRPCs = 1;
@@ -428,7 +428,7 @@ class BPServiceActor implements Runnable {
           DatanodeCommand cmd = bpNamenode.blockReport(
               bpRegistration, bpos.getBlockPoolId(), singleReport,
               new BlockReportContext(reports.length, r, reportId,
-                  fullBrLeaseId, true));
+                  fullBrLeaseId));
           blockReportSizes.add(
               calculateBlockReportPBSize(useBlocksBuffer, singleReport));
           numReportsSent++;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 7a8569d..eef1732 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -515,8 +515,8 @@ public class DirectoryScanner implements Runnable {
       Collection<ScanInfo> diffRecord = new ArrayList<>();
 
       statsRecord.totalBlocks = blockpoolReport.size();
-      final List<ReplicaInfo> bl;
-      bl = dataset.getSortedFinalizedBlocks(bpid);
+      final List<ReplicaInfo> bl = dataset.getFinalizedBlocks(bpid);
+      Collections.sort(bl); // Sort based on blockId
 
       int d = 0; // index for blockpoolReport
       int m = 0; // index for memReprot
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 7536847..f162ea1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -238,17 +238,16 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
   VolumeFailureSummary getVolumeFailureSummary();
 
   /**
-   * Gets a sorted list of references to the finalized blocks for the given
-   * block pool. The list is sorted by blockID.
+   * Gets a list of references to the finalized blocks for the given block pool.
    * <p>
    * Callers of this function should call
    * {@link FsDatasetSpi#acquireDatasetLock} to avoid blocks' status being
    * changed during list iteration.
    * </p>
    * @return a list of references to the finalized blocks for the given block
-   *         pool. The list is sorted by blockID.
+   *         pool.
    */
-  List<ReplicaInfo> getSortedFinalizedBlocks(String bpid);
+  List<ReplicaInfo> getFinalizedBlocks(String bpid);
 
   /**
    * Check whether the in-memory block record matches the block on the disk,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 3a4c3b0..fb5d11e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -2148,18 +2148,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   }
 
   /**
-   * Gets a list of references to the finalized blocks for the given block pool,
-   * sorted by blockID.
+   * Gets a list of references to the finalized blocks for the given block pool.
    * <p>
    * Callers of this function should call
    * {@link FsDatasetSpi#acquireDatasetLock()} to avoid blocks' status being
    * changed during list iteration.
    * </p>
    * @return a list of references to the finalized blocks for the given block
-   *         pool. The list is sorted by blockID.
+   *         pool.
    */
   @Override
-  public List<ReplicaInfo> getSortedFinalizedBlocks(String bpid) {
+  public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
     try (AutoCloseableLock lock = datasetReadLock.acquire()) {
       final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
           volumeMap.size(bpid));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index 5dfcc77..c1d103e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -26,7 +25,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
-import org.apache.hadoop.hdfs.util.FoldedTreeSet;
+import org.apache.hadoop.util.LightWeightResizableGSet;
 import org.apache.hadoop.util.AutoCloseableLock;
 
 /**
@@ -37,20 +36,9 @@ class ReplicaMap {
   private final AutoCloseableLock readLock;
   private final AutoCloseableLock writeLock;
   
-  // Map of block pool Id to a set of ReplicaInfo.
-  private final Map<String, FoldedTreeSet<ReplicaInfo>> map = new HashMap<>();
-
-  // Special comparator used to compare Long to Block ID in the TreeSet.
-  private static final Comparator<Object> LONG_AND_BLOCK_COMPARATOR
-      = new Comparator<Object>() {
-
-        @Override
-        public int compare(Object o1, Object o2) {
-          long lookup = (long) o1;
-          long stored = ((Block) o2).getBlockId();
-          return lookup > stored ? 1 : lookup < stored ? -1 : 0;
-        }
-      };
+  // Map of block pool Id to another map of block Id to ReplicaInfo.
+  private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
+      new HashMap<>();
 
   ReplicaMap(AutoCloseableLock readLock, AutoCloseableLock writeLock) {
     if (readLock == null || writeLock == null) {
@@ -113,11 +101,8 @@ class ReplicaMap {
   ReplicaInfo get(String bpid, long blockId) {
     checkBlockPool(bpid);
     try (AutoCloseableLock l = readLock.acquire()) {
-      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
-      if (set == null) {
-        return null;
-      }
-      return set.get(blockId, LONG_AND_BLOCK_COMPARATOR);
+      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
+      return m != null ? m.get(new Block(blockId)) : null;
     }
   }
 
@@ -133,13 +118,13 @@ class ReplicaMap {
     checkBlockPool(bpid);
     checkBlock(replicaInfo);
     try (AutoCloseableLock l = writeLock.acquire()) {
-      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
-      if (set == null) {
+      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
+      if (m == null) {
         // Add an entry for block pool if it does not exist already
-        set = new FoldedTreeSet<>();
-        map.put(bpid, set);
+        m = new LightWeightResizableGSet<Block, ReplicaInfo>();
+        map.put(bpid, m);
       }
-      return set.addOrReplace(replicaInfo);
+      return  m.put(replicaInfo);
     }
   }
 
@@ -151,18 +136,17 @@ class ReplicaMap {
     checkBlockPool(bpid);
     checkBlock(replicaInfo);
     try (AutoCloseableLock l = writeLock.acquire()) {
-      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
-      if (set == null) {
+      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
+      if (m == null) {
         // Add an entry for block pool if it does not exist already
-        set = new FoldedTreeSet<>();
-        map.put(bpid, set);
+        m = new LightWeightResizableGSet<Block, ReplicaInfo>();
+        map.put(bpid, m);
       }
-      ReplicaInfo oldReplicaInfo = set.get(replicaInfo.getBlockId(),
-          LONG_AND_BLOCK_COMPARATOR);
+      ReplicaInfo oldReplicaInfo = m.get(replicaInfo);
       if (oldReplicaInfo != null) {
         return oldReplicaInfo;
       } else {
-        set.addOrReplace(replicaInfo);
+        m.put(replicaInfo);
       }
       return replicaInfo;
     }
@@ -201,13 +185,12 @@ class ReplicaMap {
     checkBlockPool(bpid);
     checkBlock(block);
     try (AutoCloseableLock l = writeLock.acquire()) {
-      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
-      if (set != null) {
-        ReplicaInfo replicaInfo =
-            set.get(block.getBlockId(), LONG_AND_BLOCK_COMPARATOR);
+      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
+      if (m != null) {
+        ReplicaInfo replicaInfo = m.get(block);
         if (replicaInfo != null &&
             block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
-          return set.removeAndGet(replicaInfo);
+          return m.remove(block);
         }
       }
     }
@@ -224,9 +207,9 @@ class ReplicaMap {
   ReplicaInfo remove(String bpid, long blockId) {
     checkBlockPool(bpid);
     try (AutoCloseableLock l = writeLock.acquire()) {
-      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
-      if (set != null) {
-        return set.removeAndGet(blockId, LONG_AND_BLOCK_COMPARATOR);
+      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
+      if (m != null) {
+        return m.remove(new Block(blockId));
       }
     }
     return null;
@@ -239,8 +222,8 @@ class ReplicaMap {
    */
   int size(String bpid) {
     try (AutoCloseableLock l = readLock.acquire()) {
-      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
-      return set != null ? set.size() : 0;
+      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
+      return m != null ? m.size() : 0;
     }
   }
   
@@ -255,17 +238,19 @@ class ReplicaMap {
    * @return a collection of the replicas belonging to the block pool
    */
   Collection<ReplicaInfo> replicas(String bpid) {
-    return map.get(bpid);
+    LightWeightResizableGSet<Block, ReplicaInfo> m = null;
+    m = map.get(bpid);
+    return m != null ? m.values() : null;
   }
 
   void initBlockPool(String bpid) {
     checkBlockPool(bpid);
     try (AutoCloseableLock l = writeLock.acquire()) {
-      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
-      if (set == null) {
+      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
+      if (m == null) {
         // Add an entry for block pool if it does not exist already
-        set = new FoldedTreeSet<>();
-        map.put(bpid, set);
+        m = new LightWeightResizableGSet<Block, ReplicaInfo>();
+        map.put(bpid, m);
       }
     }
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 6db856b..32da3c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1997,7 +1997,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     LightWeightHashSet<Long> openFileIds = new LightWeightHashSet<>();
     for (DatanodeDescriptor dataNode :
         blockManager.getDatanodeManager().getDatanodes()) {
-      for (long ucFileId : dataNode.getLeavingServiceStatus().getOpenFiles()) {
+      // Sort open files
+      LightWeightHashSet<Long> dnOpenFiles =
+          dataNode.getLeavingServiceStatus().getOpenFiles();
+      Long[] dnOpenFileIds = new Long[dnOpenFiles.size()];
+      Arrays.sort(dnOpenFiles.toArray(dnOpenFileIds));
+      for (Long ucFileId : dnOpenFileIds) {
         INode ucFile = getFSDirectory().getInode(ucFileId);
         if (ucFile == null || ucFileId <= prevId ||
             openFileIds.contains(ucFileId)) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
index 94749e2..5bcd719 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
@@ -52,16 +52,12 @@ public class BlockReportContext {
    */
   private final long leaseId;
 
-  private final boolean sorted;
-
   public BlockReportContext(int totalRpcs, int curRpc,
-                            long reportId, long leaseId,
-                            boolean sorted) {
+                            long reportId, long leaseId) {
     this.totalRpcs = totalRpcs;
     this.curRpc = curRpc;
     this.reportId = reportId;
     this.leaseId = leaseId;
-    this.sorted = sorted;
   }
 
   public int getTotalRpcs() {
@@ -79,8 +75,4 @@ public class BlockReportContext {
   public long getLeaseId() {
     return leaseId;
   }
-
-  public boolean isSorted() {
-    return sorted;
-  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index 5680ef3..24cd7aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -140,6 +140,7 @@ public interface DatanodeProtocol {
    *     Each finalized block is represented as 3 longs. Each under-
    *     construction replica is represented as 4 longs.
    *     This is done instead of Block[] to reduce memory used by block reports.
+   * @param reports report of blocks per storage
    * @param context Context information for this block report.
    *
    * @return - the next command for DN to process.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java
deleted file mode 100644
index 1c6be1d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java
+++ /dev/null
@@ -1,1285 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.util;
-
-import org.apache.hadoop.util.Time;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.ConcurrentModificationException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.SortedSet;
-
-/**
- * A memory efficient implementation of RBTree. Instead of having a Node for
- * each entry each node contains an array holding 64 entries.
- *
- * Based on the Apache Harmony folded TreeMap.
- *
- * @param <E> Entry type
- */
-public class FoldedTreeSet<E> implements SortedSet<E> {
-
-  private static final boolean RED = true;
-  private static final boolean BLACK = false;
-
-  private final Comparator<E> comparator;
-  private Node<E> root;
-  private int size;
-  private int nodeCount;
-  private int modCount;
-  private Node<E> cachedNode;
-
-  /**
-   * Internal tree node that holds a sorted array of entries.
-   *
-   * @param <E> type of the elements
-   */
-  private static class Node<E> {
-
-    private static final int NODE_SIZE = 64;
-
-    // Tree structure
-    private Node<E> parent, left, right;
-    private boolean color;
-    private final E[] entries;
-    private int leftIndex = 0, rightIndex = -1;
-    private int size = 0;
-    // List for fast ordered iteration
-    private Node<E> prev, next;
-
-    @SuppressWarnings("unchecked")
-    public Node() {
-      entries = (E[]) new Object[NODE_SIZE];
-    }
-
-    public boolean isRed() {
-      return color == RED;
-    }
-
-    public boolean isBlack() {
-      return color == BLACK;
-    }
-
-    public Node<E> getLeftMostNode() {
-      Node<E> node = this;
-      while (node.left != null) {
-        node = node.left;
-      }
-      return node;
-    }
-
-    public Node<E> getRightMostNode() {
-      Node<E> node = this;
-      while (node.right != null) {
-        node = node.right;
-      }
-      return node;
-    }
-
-    public void addEntryLeft(E entry) {
-      assert rightIndex < entries.length;
-      assert !isFull();
-
-      if (leftIndex == 0) {
-        rightIndex++;
-        // Shift entries right/up
-        System.arraycopy(entries, 0, entries, 1, size);
-      } else {
-        leftIndex--;
-      }
-      size++;
-      entries[leftIndex] = entry;
-    }
-
-    public void addEntryRight(E entry) {
-      assert !isFull();
-
-      if (rightIndex == NODE_SIZE - 1) {
-        assert leftIndex > 0;
-        // Shift entries left/down
-        System.arraycopy(entries, leftIndex, entries, --leftIndex, size);
-      } else {
-        rightIndex++;
-      }
-      size++;
-      entries[rightIndex] = entry;
-    }
-
-    public void addEntryAt(E entry, int index) {
-      assert !isFull();
-
-      if (leftIndex == 0 || ((rightIndex != Node.NODE_SIZE - 1)
-                             && (rightIndex - index <= index - leftIndex))) {
-        rightIndex++;
-        System.arraycopy(entries, index,
-                         entries, index + 1, rightIndex - index);
-        entries[index] = entry;
-      } else {
-        int newLeftIndex = leftIndex - 1;
-        System.arraycopy(entries, leftIndex,
-                         entries, newLeftIndex, index - leftIndex);
-        leftIndex = newLeftIndex;
-        entries[index - 1] = entry;
-      }
-      size++;
-    }
-
-    public void addEntriesLeft(Node<E> from) {
-      leftIndex -= from.size;
-      size += from.size;
-      System.arraycopy(from.entries, from.leftIndex,
-                       entries, leftIndex, from.size);
-    }
-
-    public void addEntriesRight(Node<E> from) {
-      System.arraycopy(from.entries, from.leftIndex,
-                       entries, rightIndex + 1, from.size);
-      size += from.size;
-      rightIndex += from.size;
-    }
-
-    public E insertEntrySlideLeft(E entry, int index) {
-      E pushedEntry = entries[0];
-      System.arraycopy(entries, 1, entries, 0, index - 1);
-      entries[index - 1] = entry;
-      return pushedEntry;
-    }
-
-    public E insertEntrySlideRight(E entry, int index) {
-      E movedEntry = entries[rightIndex];
-      System.arraycopy(entries, index, entries, index + 1, rightIndex - index);
-      entries[index] = entry;
-      return movedEntry;
-    }
-
-    public E removeEntryLeft() {
-      assert !isEmpty();
-      E entry = entries[leftIndex];
-      entries[leftIndex] = null;
-      leftIndex++;
-      size--;
-      return entry;
-    }
-
-    public E removeEntryRight() {
-      assert !isEmpty();
-      E entry = entries[rightIndex];
-      entries[rightIndex] = null;
-      rightIndex--;
-      size--;
-      return entry;
-    }
-
-    public E removeEntryAt(int index) {
-      assert !isEmpty();
-
-      E entry = entries[index];
-      int rightSize = rightIndex - index;
-      int leftSize = index - leftIndex;
-      if (rightSize <= leftSize) {
-        System.arraycopy(entries, index + 1, entries, index, rightSize);
-        entries[rightIndex] = null;
-        rightIndex--;
-      } else {
-        System.arraycopy(entries, leftIndex, entries, leftIndex + 1, leftSize);
-        entries[leftIndex] = null;
-        leftIndex++;
-      }
-      size--;
-      return entry;
-    }
-
-    public boolean isFull() {
-      return size == NODE_SIZE;
-    }
-
-    public boolean isEmpty() {
-      return size == 0;
-    }
-
-    public void clear() {
-      if (leftIndex < rightIndex) {
-        Arrays.fill(entries, leftIndex, rightIndex + 1, null);
-      }
-      size = 0;
-      leftIndex = 0;
-      rightIndex = -1;
-      prev = null;
-      next = null;
-      parent = null;
-      left = null;
-      right = null;
-      color = BLACK;
-    }
-  }
-
-  private static final class TreeSetIterator<E> implements Iterator<E> {
-
-    private final FoldedTreeSet<E> tree;
-    private int iteratorModCount;
-    private Node<E> node;
-    private int index;
-    private E lastEntry;
-    private int lastIndex;
-    private Node<E> lastNode;
-
-    private TreeSetIterator(FoldedTreeSet<E> tree) {
-      this.tree = tree;
-      this.iteratorModCount = tree.modCount;
-      if (!tree.isEmpty()) {
-        this.node = tree.root.getLeftMostNode();
-        this.index = this.node.leftIndex;
-      }
-    }
-
-    @Override
-    public boolean hasNext() {
-      checkForModification();
-      return node != null;
-    }
-
-    @Override
-    public E next() {
-      if (hasNext()) {
-        lastEntry = node.entries[index];
-        lastIndex = index;
-        lastNode = node;
-        if (++index > node.rightIndex) {
-          node = node.next;
-          if (node != null) {
-            index = node.leftIndex;
-          }
-        }
-        return lastEntry;
-      } else {
-        throw new NoSuchElementException("Iterator exhausted");
-      }
-    }
-
-    @Override
-    public void remove() {
-      if (lastEntry == null) {
-        throw new IllegalStateException("No current element");
-      }
-      checkForModification();
-      if (lastNode.size == 1) {
-        // Safe to remove lastNode, the iterator is on the next node
-        tree.deleteNode(lastNode);
-      } else if (lastNode.leftIndex == lastIndex) {
-        // Safe to remove leftmost entry, the iterator is on the next index
-        lastNode.removeEntryLeft();
-      } else if (lastNode.rightIndex == lastIndex) {
-        // Safe to remove the rightmost entry, the iterator is on the next node
-        lastNode.removeEntryRight();
-      } else {
-        // Remove entry in the middle of the array
-        assert node == lastNode;
-        int oldRIndex = lastNode.rightIndex;
-        lastNode.removeEntryAt(lastIndex);
-        if (oldRIndex > lastNode.rightIndex) {
-          // Entries moved to the left in the array so index must be reset
-          index = lastIndex;
-        }
-      }
-      lastEntry = null;
-      iteratorModCount++;
-      tree.modCount++;
-      tree.size--;
-    }
-
-    private void checkForModification() {
-      if (iteratorModCount != tree.modCount) {
-        throw new ConcurrentModificationException("Tree has been modified "
-                                                  + "outside of iterator");
-      }
-    }
-  }
-
-  /**
-   * Create a new TreeSet that uses the natural ordering of objects. The element
-   * type must implement Comparable.
-   */
-  public FoldedTreeSet() {
-    this(null);
-  }
-
-  /**
-   * Create a new TreeSet that orders the elements using the supplied
-   * Comparator.
-   *
-   * @param comparator Comparator able to compare elements of type E
-   */
-  public FoldedTreeSet(Comparator<E> comparator) {
-    this.comparator = comparator;
-  }
-
-  private Node<E> cachedOrNewNode(E entry) {
-    Node<E> node = (cachedNode != null) ? cachedNode : new Node<E>();
-    cachedNode = null;
-    nodeCount++;
-    // Since BlockIDs are always increasing for new blocks it is best to
-    // add values on the left side to enable quicker inserts on the right
-    node.addEntryLeft(entry);
-    return node;
-  }
-
-  private void cacheAndClear(Node<E> node) {
-    if (cachedNode == null) {
-      node.clear();
-      cachedNode = node;
-    }
-  }
-
-  @Override
-  public Comparator<? super E> comparator() {
-    return comparator;
-  }
-
-  @Override
-  public SortedSet<E> subSet(E fromElement, E toElement) {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public SortedSet<E> headSet(E toElement) {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public SortedSet<E> tailSet(E fromElement) {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public E first() {
-    if (!isEmpty()) {
-      Node<E> node = root.getLeftMostNode();
-      return node.entries[node.leftIndex];
-    }
-    return null;
-  }
-
-  @Override
-  public E last() {
-    if (!isEmpty()) {
-      Node<E> node = root.getRightMostNode();
-      return node.entries[node.rightIndex];
-    }
-    return null;
-  }
-
-  @Override
-  public int size() {
-    return size;
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return root == null;
-  }
-
-  /**
-   * Lookup and return a stored object using a user provided comparator.
-   *
-   * @param obj Lookup key
-   * @param cmp User provided Comparator. The comparator should expect that the
-   *            proved obj will always be the first method parameter and any
-   *            stored object will be the second parameter.
-   *
-   * @return A matching stored object or null if non is found
-   */
-  public E get(Object obj, Comparator<?> cmp) {
-    Objects.requireNonNull(obj);
-
-    Node<E> node = root;
-    while (node != null) {
-      E[] entries = node.entries;
-
-      int leftIndex = node.leftIndex;
-      int result = compare(obj, entries[leftIndex], cmp);
-      if (result < 0) {
-        node = node.left;
-      } else if (result == 0) {
-        return entries[leftIndex];
-      } else {
-        int rightIndex = node.rightIndex;
-        if (leftIndex != rightIndex) {
-          result = compare(obj, entries[rightIndex], cmp);
-        }
-        if (result == 0) {
-          return entries[rightIndex];
-        } else if (result > 0) {
-          node = node.right;
-        } else {
-          int low = leftIndex + 1;
-          int high = rightIndex - 1;
-          while (low <= high) {
-            int mid = (low + high) >>> 1;
-            result = compare(obj, entries[mid], cmp);
-            if (result > 0) {
-              low = mid + 1;
-            } else if (result < 0) {
-              high = mid - 1;
-            } else {
-              return entries[mid];
-            }
-          }
-          return null;
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Lookup and return a stored object.
-   *
-   * @param entry Lookup entry
-   *
-   * @return A matching stored object or null if non is found
-   */
-  public E get(E entry) {
-    return get(entry, comparator);
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public boolean contains(Object obj) {
-    return get((E) obj) != null;
-  }
-
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  private static int compare(Object lookup, Object stored, Comparator cmp) {
-    return cmp != null
-           ? cmp.compare(lookup, stored)
-           : ((Comparable<Object>) lookup).compareTo(stored);
-  }
-
-  @Override
-  public Iterator<E> iterator() {
-    return new TreeSetIterator<>(this);
-  }
-
-  @Override
-  public Object[] toArray() {
-    Object[] objects = new Object[size];
-    if (!isEmpty()) {
-      int pos = 0;
-      for (Node<E> node = root.getLeftMostNode(); node != null;
-          pos += node.size, node = node.next) {
-        System.arraycopy(node.entries, node.leftIndex, objects, pos, node.size);
-      }
-    }
-    return objects;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public <T> T[] toArray(T[] a) {
-    T[] r = a.length >= size ? a
-            : (T[]) java.lang.reflect.Array
-        .newInstance(a.getClass().getComponentType(), size);
-    if (!isEmpty()) {
-      Node<E> node = root.getLeftMostNode();
-      int pos = 0;
-      while (node != null) {
-        System.arraycopy(node.entries, node.leftIndex, r, pos, node.size);
-        pos += node.size;
-        node = node.next;
-      }
-      if (r.length > pos) {
-        r[pos] = null;
-      }
-    } else if (a.length > 0) {
-      a[0] = null;
-    }
-    return r;
-  }
-
-  /**
-   * Add or replace an entry in the TreeSet.
-   *
-   * @param entry Entry to add or replace/update.
-   *
-   * @return the previous entry, or null if this set did not already contain the
-   *         specified entry
-   */
-  public E addOrReplace(E entry) {
-    return add(entry, true);
-  }
-
-  @Override
-  public boolean add(E entry) {
-    return add(entry, false) == null;
-  }
-
-  /**
-   * Internal add method to add a entry to the set.
-   *
-   * @param entry   Entry to add
-   * @param replace Should the entry replace an old entry which is equal to the
-   *                new entry
-   *
-   * @return null if entry added and didn't exist or the previous value (which
-   *         might not have been overwritten depending on the replace parameter)
-   */
-  private E add(E entry, boolean replace) {
-    Objects.requireNonNull(entry);
-
-    // Empty tree
-    if (isEmpty()) {
-      root = cachedOrNewNode(entry);
-      size = 1;
-      modCount++;
-      return null;
-    }
-
-    // Compare right entry first since inserts of comperatively larger entries
-    // is more likely to be inserted. BlockID is always increasing in HDFS.
-    Node<E> node = root;
-    Node<E> prevNode = null;
-    int result = 0;
-    while (node != null) {
-      prevNode = node;
-      E[] entries = node.entries;
-      int rightIndex = node.rightIndex;
-      result = compare(entry, entries[rightIndex], comparator);
-      if (result > 0) {
-        node = node.right;
-      } else if (result == 0) {
-        E prevEntry = entries[rightIndex];
-        if (replace) {
-          entries[rightIndex] = entry;
-        }
-        return prevEntry;
-      } else {
-        int leftIndex = node.leftIndex;
-        if (leftIndex != rightIndex) {
-          result = compare(entry, entries[leftIndex], comparator);
-        }
-        if (result < 0) {
-          node = node.left;
-        } else if (result == 0) {
-          E prevEntry = entries[leftIndex];
-          if (replace) {
-            entries[leftIndex] = entry;
-          }
-          return prevEntry;
-        } else {
-          // Insert in this node
-          int low = leftIndex + 1, high = rightIndex - 1;
-          while (low <= high) {
-            int mid = (low + high) >>> 1;
-            result = compare(entry, entries[mid], comparator);
-            if (result > 0) {
-              low = mid + 1;
-            } else if (result == 0) {
-              E prevEntry = entries[mid];
-              if (replace) {
-                entries[mid] = entry;
-              }
-              return prevEntry;
-            } else {
-              high = mid - 1;
-            }
-          }
-          addElementInNode(node, entry, low);
-          return null;
-        }
-      }
-    }
-
-    assert prevNode != null;
-    size++;
-    modCount++;
-    if (!prevNode.isFull()) {
-      // The previous node still has space
-      if (result < 0) {
-        prevNode.addEntryLeft(entry);
-      } else {
-        prevNode.addEntryRight(entry);
-      }
-    } else if (result < 0) {
-      // The previous node is full, add to adjencent node or a new node
-      if (prevNode.prev != null && !prevNode.prev.isFull()) {
-        prevNode.prev.addEntryRight(entry);
-      } else {
-        attachNodeLeft(prevNode, cachedOrNewNode(entry));
-      }
-    } else if (prevNode.next != null && !prevNode.next.isFull()) {
-      prevNode.next.addEntryLeft(entry);
-    } else {
-      attachNodeRight(prevNode, cachedOrNewNode(entry));
-    }
-    return null;
-  }
-
-  /**
-   * Insert an entry last in the sorted tree. The entry must be the considered
-   * larger than the currently largest entry in the set when doing
-   * current.compareTo(entry), if entry is not the largest entry the method will
-   * fall back on the regular add method.
-   *
-   * @param entry entry to add
-   *
-   * @return True if added, false if already existed in the set
-   */
-  public boolean addSortedLast(E entry) {
-
-    if (isEmpty()) {
-      root = cachedOrNewNode(entry);
-      size = 1;
-      modCount++;
-      return true;
-    } else {
-      Node<E> node = root.getRightMostNode();
-      if (compare(node.entries[node.rightIndex], entry, comparator) < 0) {
-        size++;
-        modCount++;
-        if (!node.isFull()) {
-          node.addEntryRight(entry);
-        } else {
-          attachNodeRight(node, cachedOrNewNode(entry));
-        }
-        return true;
-      }
-    }
-
-    // Fallback on normal add if entry is unsorted
-    return add(entry);
-  }
-
-  private void addElementInNode(Node<E> node, E entry, int index) {
-    size++;
-    modCount++;
-
-    if (!node.isFull()) {
-      node.addEntryAt(entry, index);
-    } else {
-      // Node is full, insert and push old entry
-      Node<E> prev = node.prev;
-      Node<E> next = node.next;
-      if (prev == null) {
-        // First check if we have space in the the next node
-        if (next != null && !next.isFull()) {
-          E movedEntry = node.insertEntrySlideRight(entry, index);
-          next.addEntryLeft(movedEntry);
-        } else {
-          // Since prev is null the left child must be null
-          assert node.left == null;
-          E movedEntry = node.insertEntrySlideLeft(entry, index);
-          Node<E> newNode = cachedOrNewNode(movedEntry);
-          attachNodeLeft(node, newNode);
-        }
-      } else if (!prev.isFull()) {
-        // Prev has space
-        E movedEntry = node.insertEntrySlideLeft(entry, index);
-        prev.addEntryRight(movedEntry);
-      } else if (next == null) {
-        // Since next is null the right child must be null
-        assert node.right == null;
-        E movedEntry = node.insertEntrySlideRight(entry, index);
-        Node<E> newNode = cachedOrNewNode(movedEntry);
-        attachNodeRight(node, newNode);
-      } else if (!next.isFull()) {
-        // Next has space
-        E movedEntry = node.insertEntrySlideRight(entry, index);
-        next.addEntryLeft(movedEntry);
-      } else {
-        // Both prev and next nodes exist and are full
-        E movedEntry = node.insertEntrySlideRight(entry, index);
-        Node<E> newNode = cachedOrNewNode(movedEntry);
-        if (node.right == null) {
-          attachNodeRight(node, newNode);
-        } else {
-          // Since our right node exist,
-          // the left node of our next node must be empty
-          assert next.left == null;
-          attachNodeLeft(next, newNode);
-        }
-      }
-    }
-  }
-
-  private void attachNodeLeft(Node<E> node, Node<E> newNode) {
-    newNode.parent = node;
-    node.left = newNode;
-
-    newNode.next = node;
-    newNode.prev = node.prev;
-    if (newNode.prev != null) {
-      newNode.prev.next = newNode;
-    }
-    node.prev = newNode;
-    balanceInsert(newNode);
-  }
-
-  private void attachNodeRight(Node<E> node, Node<E> newNode) {
-    newNode.parent = node;
-    node.right = newNode;
-
-    newNode.prev = node;
-    newNode.next = node.next;
-    if (newNode.next != null) {
-      newNode.next.prev = newNode;
-    }
-    node.next = newNode;
-    balanceInsert(newNode);
-  }
-
-  /**
-   * Balance the RB Tree after insert.
-   *
-   * @param node Added node
-   */
-  private void balanceInsert(Node<E> node) {
-    node.color = RED;
-
-    while (node != root && node.parent.isRed()) {
-      if (node.parent == node.parent.parent.left) {
-        Node<E> uncle = node.parent.parent.right;
-        if (uncle != null && uncle.isRed()) {
-          node.parent.color = BLACK;
-          uncle.color = BLACK;
-          node.parent.parent.color = RED;
-          node = node.parent.parent;
-        } else {
-          if (node == node.parent.right) {
-            node = node.parent;
-            rotateLeft(node);
-          }
-          node.parent.color = BLACK;
-          node.parent.parent.color = RED;
-          rotateRight(node.parent.parent);
-        }
-      } else {
-        Node<E> uncle = node.parent.parent.left;
-        if (uncle != null && uncle.isRed()) {
-          node.parent.color = BLACK;
-          uncle.color = BLACK;
-          node.parent.parent.color = RED;
-          node = node.parent.parent;
-        } else {
-          if (node == node.parent.left) {
-            node = node.parent;
-            rotateRight(node);
-          }
-          node.parent.color = BLACK;
-          node.parent.parent.color = RED;
-          rotateLeft(node.parent.parent);
-        }
-      }
-    }
-    root.color = BLACK;
-  }
-
-  private void rotateRight(Node<E> node) {
-    Node<E> pivot = node.left;
-    node.left = pivot.right;
-    if (pivot.right != null) {
-      pivot.right.parent = node;
-    }
-    pivot.parent = node.parent;
-    if (node.parent == null) {
-      root = pivot;
-    } else if (node == node.parent.right) {
-      node.parent.right = pivot;
-    } else {
-      node.parent.left = pivot;
-    }
-    pivot.right = node;
-    node.parent = pivot;
-  }
-
-  private void rotateLeft(Node<E> node) {
-    Node<E> pivot = node.right;
-    node.right = pivot.left;
-    if (pivot.left != null) {
-      pivot.left.parent = node;
-    }
-    pivot.parent = node.parent;
-    if (node.parent == null) {
-      root = pivot;
-    } else if (node == node.parent.left) {
-      node.parent.left = pivot;
-    } else {
-      node.parent.right = pivot;
-    }
-    pivot.left = node;
-    node.parent = pivot;
-  }
-
-  /**
-   * Remove object using a provided comparator, and return the removed entry.
-   *
-   * @param obj Lookup entry
-   * @param cmp User provided Comparator. The comparator should expect that the
-   *            proved obj will always be the first method parameter and any
-   *            stored object will be the second parameter.
-   *
-   * @return The removed entry or null if not found
-   */
-  public E removeAndGet(Object obj, Comparator<?> cmp) {
-    Objects.requireNonNull(obj);
-
-    if (!isEmpty()) {
-      Node<E> node = root;
-      while (node != null) {
-        E[] entries = node.entries;
-        int leftIndex = node.leftIndex;
-        int result = compare(obj, entries[leftIndex], cmp);
-        if (result < 0) {
-          node = node.left;
-        } else if (result == 0) {
-          return removeElementLeft(node);
-        } else {
-          int rightIndex = node.rightIndex;
-          if (leftIndex != rightIndex) {
-            result = compare(obj, entries[rightIndex], cmp);
-          }
-          if (result == 0) {
-            return removeElementRight(node);
-          } else if (result > 0) {
-            node = node.right;
-          } else {
-            int low = leftIndex + 1, high = rightIndex - 1;
-            while (low <= high) {
-              int mid = (low + high) >>> 1;
-              result = compare(obj, entries[mid], cmp);
-              if (result > 0) {
-                low = mid + 1;
-              } else if (result == 0) {
-                return removeElementAt(node, mid);
-              } else {
-                high = mid - 1;
-              }
-            }
-            return null;
-          }
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Remove object and return the removed entry.
-   *
-   * @param obj Lookup entry
-   *
-   * @return The removed entry or null if not found
-   */
-  public E removeAndGet(Object obj) {
-    return removeAndGet(obj, comparator);
-  }
-
-  /**
-   * Remove object using a provided comparator.
-   *
-   * @param obj Lookup entry
-   * @param cmp User provided Comparator. The comparator should expect that the
-   *            proved obj will always be the first method parameter and any
-   *            stored object will be the second parameter.
-   *
-   * @return True if found and removed, else false
-   */
-  public boolean remove(Object obj, Comparator<?> cmp) {
-    return removeAndGet(obj, cmp) != null;
-  }
-
-  @Override
-  public boolean remove(Object obj) {
-    return removeAndGet(obj, comparator) != null;
-  }
-
-  private E removeElementLeft(Node<E> node) {
-    modCount++;
-    size--;
-    E entry = node.removeEntryLeft();
-
-    if (node.isEmpty()) {
-      deleteNode(node);
-    } else if (node.prev != null
-               && (Node.NODE_SIZE - 1 - node.prev.rightIndex) >= node.size) {
-      // Remaining entries fit in the prev node, move them and delete this node
-      node.prev.addEntriesRight(node);
-      deleteNode(node);
-    } else if (node.next != null && node.next.leftIndex >= node.size) {
-      // Remaining entries fit in the next node, move them and delete this node
-      node.next.addEntriesLeft(node);
-      deleteNode(node);
-    } else if (node.prev != null && node.prev.size < node.leftIndex) {
-      // Entries in prev node will fit in this node, move them and delete prev
-      node.addEntriesLeft(node.prev);
-      deleteNode(node.prev);
-    }
-
-    return entry;
-  }
-
-  private E removeElementRight(Node<E> node) {
-    modCount++;
-    size--;
-    E entry = node.removeEntryRight();
-
-    if (node.isEmpty()) {
-      deleteNode(node);
-    } else if (node.prev != null
-               && (Node.NODE_SIZE - 1 - node.prev.rightIndex) >= node.size) {
-      // Remaining entries fit in the prev node, move them and delete this node
-      node.prev.addEntriesRight(node);
-      deleteNode(node);
-    } else if (node.next != null && node.next.leftIndex >= node.size) {
-      // Remaining entries fit in the next node, move them and delete this node
-      node.next.addEntriesLeft(node);
-      deleteNode(node);
-    } else if (node.next != null
-               && node.next.size < (Node.NODE_SIZE - 1 - node.rightIndex)) {
-      // Entries in next node will fit in this node, move them and delete next
-      node.addEntriesRight(node.next);
-      deleteNode(node.next);
-    }
-
-    return entry;
-  }
-
-  private E removeElementAt(Node<E> node, int index) {
-    modCount++;
-    size--;
-    E entry = node.removeEntryAt(index);
-
-    if (node.prev != null
-        && (Node.NODE_SIZE - 1 - node.prev.rightIndex) >= node.size) {
-      // Remaining entries fit in the prev node, move them and delete this node
-      node.prev.addEntriesRight(node);
-      deleteNode(node);
-    } else if (node.next != null && (node.next.leftIndex) >= node.size) {
-      // Remaining entries fit in the next node, move them and delete this node
-      node.next.addEntriesLeft(node);
-      deleteNode(node);
-    } else if (node.prev != null && node.prev.size < node.leftIndex) {
-      // Entries in prev node will fit in this node, move them and delete prev
-      node.addEntriesLeft(node.prev);
-      deleteNode(node.prev);
-    } else if (node.next != null
-               && node.next.size < (Node.NODE_SIZE - 1 - node.rightIndex)) {
-      // Entries in next node will fit in this node, move them and delete next
-      node.addEntriesRight(node.next);
-      deleteNode(node.next);
-    }
-
-    return entry;
-  }
-
-  /**
-   * Delete the node and ensure the tree is balanced.
-   *
-   * @param node node to delete
-   */
-  private void deleteNode(final Node<E> node) {
-    if (node.right == null) {
-      if (node.left != null) {
-        attachToParent(node, node.left);
-      } else {
-        attachNullToParent(node);
-      }
-    } else if (node.left == null) {
-      attachToParent(node, node.right);
-    } else {
-      // node.left != null && node.right != null
-      // node.next should replace node in tree
-      // node.next != null guaranteed since node.left != null
-      // node.next.left == null since node.next.prev is node
-      // node.next.right may be null or non-null
-      Node<E> toMoveUp = node.next;
-      if (toMoveUp.right == null) {
-        attachNullToParent(toMoveUp);
-      } else {
-        attachToParent(toMoveUp, toMoveUp.right);
-      }
-      toMoveUp.left = node.left;
-      if (toMoveUp.left != null) {
-        toMoveUp.left.parent = toMoveUp;
-      }
-      toMoveUp.right = node.right;
-      if (toMoveUp.right != null) {
-        toMoveUp.right.parent = toMoveUp;
-      }
-      attachToParentNoBalance(node, toMoveUp);
-      toMoveUp.color = node.color;
-    }
-
-    // Remove node from ordered list of nodes
-    if (node.prev != null) {
-      node.prev.next = node.next;
-    }
-    if (node.next != null) {
-      node.next.prev = node.prev;
-    }
-
-    nodeCount--;
-    cacheAndClear(node);
-  }
-
-  private void attachToParentNoBalance(Node<E> toDelete, Node<E> toConnect) {
-    Node<E> parent = toDelete.parent;
-    toConnect.parent = parent;
-    if (parent == null) {
-      root = toConnect;
-    } else if (toDelete == parent.left) {
-      parent.left = toConnect;
-    } else {
-      parent.right = toConnect;
-    }
-  }
-
-  private void attachToParent(Node<E> toDelete, Node<E> toConnect) {
-    attachToParentNoBalance(toDelete, toConnect);
-    if (toDelete.isBlack()) {
-      balanceDelete(toConnect);
-    }
-  }
-
-  private void attachNullToParent(Node<E> toDelete) {
-    Node<E> parent = toDelete.parent;
-    if (parent == null) {
-      root = null;
-    } else {
-      if (toDelete == parent.left) {
-        parent.left = null;
-      } else {
-        parent.right = null;
-      }
-      if (toDelete.isBlack()) {
-        balanceDelete(parent);
-      }
-    }
-  }
-
-  /**
-   * Balance tree after removing a node.
-   *
-   * @param node Node to balance after deleting another node
-   */
-  private void balanceDelete(Node<E> node) {
-    while (node != root && node.isBlack()) {
-      if (node == node.parent.left) {
-        Node<E> sibling = node.parent.right;
-        if (sibling == null) {
-          node = node.parent;
-          continue;
-        }
-        if (sibling.isRed()) {
-          sibling.color = BLACK;
-          node.parent.color = RED;
-          rotateLeft(node.parent);
-          sibling = node.parent.right;
-          if (sibling == null) {
-            node = node.parent;
-            continue;
-          }
-        }
-        if ((sibling.left == null || !sibling.left.isRed())
-            && (sibling.right == null || !sibling.right.isRed())) {
-          sibling.color = RED;
-          node = node.parent;
-        } else {
-          if (sibling.right == null || !sibling.right.isRed()) {
-            sibling.left.color = BLACK;
-            sibling.color = RED;
-            rotateRight(sibling);
-            sibling = node.parent.right;
-          }
-          sibling.color = node.parent.color;
-          node.parent.color = BLACK;
-          sibling.right.color = BLACK;
-          rotateLeft(node.parent);
-          node = root;
-        }
-      } else {
-        Node<E> sibling = node.parent.left;
-        if (sibling == null) {
-          node = node.parent;
-          continue;
-        }
-        if (sibling.isRed()) {
-          sibling.color = BLACK;
-          node.parent.color = RED;
-          rotateRight(node.parent);
-          sibling = node.parent.left;
-          if (sibling == null) {
-            node = node.parent;
-            continue;
-          }
-        }
-        if ((sibling.left == null || sibling.left.isBlack())
-            && (sibling.right == null || sibling.right.isBlack())) {
-          sibling.color = RED;
-          node = node.parent;
-        } else {
-          if (sibling.left == null || sibling.left.isBlack()) {
-            sibling.right.color = BLACK;
-            sibling.color = RED;
-            rotateLeft(sibling);
-            sibling = node.parent.left;
-          }
-          sibling.color = node.parent.color;
-          node.parent.color = BLACK;
-          sibling.left.color = BLACK;
-          rotateRight(node.parent);
-          node = root;
-        }
-      }
-    }
-    node.color = BLACK;
-  }
-
-  @Override
-  public boolean containsAll(Collection<?> c) {
-    for (Object entry : c) {
-      if (!contains(entry)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public boolean addAll(Collection<? extends E> c) {
-    boolean modified = false;
-    for (E entry : c) {
-      if (add(entry)) {
-        modified = true;
-      }
-    }
-    return modified;
-  }
-
-  @Override
-  public boolean retainAll(Collection<?> c) {
-    boolean modified = false;
-    Iterator<E> it = iterator();
-    while (it.hasNext()) {
-      if (!c.contains(it.next())) {
-        it.remove();
-        modified = true;
-      }
-    }
-    return modified;
-  }
-
-  @Override
-  public boolean removeAll(Collection<?> c) {
-    boolean modified = false;
-    for (Object entry : c) {
-      if (remove(entry)) {
-        modified = true;
-      }
-    }
-    return modified;
-  }
-
-  @Override
-  public void clear() {
-    modCount++;
-    if (!isEmpty()) {
-      size = 0;
-      nodeCount = 0;
-      cacheAndClear(root);
-      root = null;
-    }
-  }
-
-  /**
-   * Returns the current size divided by the capacity of the tree. A value
-   * between 0.0 and 1.0, where 1.0 means that every allocated node in the tree
-   * is completely full.
-   *
-   * An empty set will return 1.0
-   *
-   * @return the fill ratio of the tree
-   */
-  public double fillRatio() {
-    if (nodeCount > 1) {
-      // Count the last node as completely full since it can't be compacted
-      return (size + (Node.NODE_SIZE - root.getRightMostNode().size))
-             / (double) (nodeCount * Node.NODE_SIZE);
-    }
-    return 1.0;
-  }
-
-  /**
-   * Compact all the entries to use the fewest number of nodes in the tree.
-   *
-   * Having a compact tree minimize memory usage, but can cause inserts to get
-   * slower due to new nodes needs to be allocated as there is no space in any
-   * of the existing nodes anymore for entries added in the middle of the set.
-   *
-   * Useful to do to reduce memory consumption and if the tree is know to not
-   * change after compaction or mainly added to at either extreme.
-   *
-   * @param timeout Maximum time to spend compacting the tree set in
-   *                milliseconds.
-   *
-   * @return true if compaction completed, false if aborted
-   */
-  public boolean compact(long timeout) {
-
-    if (!isEmpty()) {
-      long start = Time.monotonicNow();
-      Node<E> node = root.getLeftMostNode();
-      while (node != null) {
-        if (node.prev != null && !node.prev.isFull()) {
-          Node<E> prev = node.prev;
-          int count = Math.min(Node.NODE_SIZE - prev.size, node.size);
-          System.arraycopy(node.entries, node.leftIndex,
-                           prev.entries, prev.rightIndex + 1, count);
-          node.leftIndex += count;
-          node.size -= count;
-          prev.rightIndex += count;
-          prev.size += count;
-        }
-        if (node.isEmpty()) {
-          Node<E> temp = node.next;
-          deleteNode(node);
-          node = temp;
-          continue;
-        } else if (!node.isFull()) {
-          if (node.leftIndex != 0) {
-            System.arraycopy(node.entries, node.leftIndex,
-                             node.entries, 0, node.size);
-            Arrays.fill(node.entries, node.size, node.rightIndex + 1, null);
-            node.leftIndex = 0;
-            node.rightIndex = node.size - 1;
-          }
-        }
-        node = node.next;
-
-        if (Time.monotonicNow() - start > timeout) {
-          return false;
-        }
-      }
-    }
-
-    return true;
-  }
-}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 0e24130..4a98f2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -257,8 +257,9 @@ message BlockReportContextProto  {
   // bypass rate-limiting.
   optional uint64 leaseId = 4 [ default = 0 ];
 
+  // for compatibility, field number 5 should not be reused, see HDFS-13671.
   // True if the reported blocks are sorted by increasing block IDs
-  optional bool sorted = 5 [default = false];
+  // optional bool sorted = 5 [default = false];
 }
 
 /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 6e8e36e..e540a67 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5196,32 +5196,6 @@
 </property>
 
 <property>
-  <name>dfs.namenode.storageinfo.defragment.timeout.ms</name>
-  <value>4</value>
-  <description>
-    Timeout value in ms for the StorageInfo compaction run.
-  </description>
-</property>
-
-<property>
-  <name>dfs.namenode.storageinfo.defragment.interval.ms</name>
-  <value>600000</value>
-  <description>
-    The thread for checking the StorageInfo for defragmentation will
-    run periodically.  The time between runs is determined by this
-    property.
-  </description>
-</property>
-
-<property>
-  <name>dfs.namenode.storageinfo.defragment.ratio</name>
-  <value>0.75</value>
-  <description>
-    The defragmentation threshold for the StorageInfo.
-  </description>
-</property>
-
-<property>
   <name>dfs.namenode.snapshot.capture.openfiles</name>
   <value>false</value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java
index df6a7dc..917f0db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java
@@ -173,7 +173,7 @@ public class TestCrcCorruption {
       final DataNode dn = cluster.getDataNodes().get(dnIdx);
       final String bpid = cluster.getNamesystem().getBlockPoolId();
       List<ReplicaInfo> replicas =
-          dn.getFSDataset().getSortedFinalizedBlocks(bpid);
+          dn.getFSDataset().getFinalizedBlocks(bpid);
       assertTrue("Replicas do not exist", !replicas.isEmpty());
 
       for (int idx = 0; idx < replicas.size(); idx++) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
index 67df6d8..fa3c1aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
@@ -568,7 +568,7 @@ public class TestReconstructStripedFile {
     writeFile(fs, "/ec-xmits-weight", fileLen);
 
     DataNode dn = cluster.getDataNodes().get(0);
-    int corruptBlocks = dn.getFSDataset().getSortedFinalizedBlocks(
+    int corruptBlocks = dn.getFSDataset().getFinalizedBlocks(
         cluster.getNameNode().getNamesystem().getBlockPoolId()).size();
     int expectedXmits = corruptBlocks * expectedWeight;
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
index 0aff861a1..17b3939 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
@@ -240,7 +240,7 @@ public class TestBlockListAsLongs {
     request.set(null);
     nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
     nn.blockReport(reg, "pool", sbr,
-        new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L));
     BlockReportRequestProto proto = request.get();
     assertNotNull(proto);
     assertTrue(proto.getReports(0).getBlocksList().isEmpty());
@@ -253,7 +253,7 @@ public class TestBlockListAsLongs {
     StorageBlockReport[] obp = new StorageBlockReport[] {
         new StorageBlockReport(new DatanodeStorage("s1"), blockList) };
     nn.blockReport(reg, "pool", obp,
-        new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L));
     proto = request.get();
     assertNotNull(proto);
     assertFalse(proto.getReports(0).getBlocksList().isEmpty());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
index 3c5c5d9..70f13eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
@@ -19,6 +19,12 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID;
 import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Random;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -28,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.junit.Assert;
 import org.junit.Test;
@@ -133,4 +140,85 @@ public class TestBlockInfo {
         "storageID", "127.0.0.1");
     blockInfo1.addStorage(storage, blockInfo2);
   }
+
+  @Test
+  public void testBlockListMoveToHead() throws Exception {
+    LOG.info("BlockInfo moveToHead tests...");
+
+    final int maxBlocks = 10;
+
+    DatanodeStorageInfo dd =
+        DFSTestUtil.createDatanodeStorageInfo("s1", "1.1.1.1");
+    ArrayList<Block> blockList = new ArrayList<Block>(maxBlocks);
+    ArrayList<BlockInfo> blockInfoList = new ArrayList<BlockInfo>();
+    int headIndex;
+    int curIndex;
+
+    LOG.info("Building block list...");
+    for (int i = 0; i < maxBlocks; i++) {
+      blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
+      blockInfoList.add(new BlockInfoContiguous(blockList.get(i), (short) 3));
+      dd.addBlock(blockInfoList.get(i));
+
+      // index of the datanode should be 0
+      assertEquals("Find datanode should be 0", 0, blockInfoList.get(i)
+          .findStorageInfo(dd));
+    }
+
+    // list length should be equal to the number of blocks we inserted
+    LOG.info("Checking list length...");
+    assertEquals("Length should be MAX_BLOCK", maxBlocks, dd.numBlocks());
+    Iterator<BlockInfo> it = dd.getBlockIterator();
+    int len = 0;
+    while (it.hasNext()) {
+      it.next();
+      len++;
+    }
+    assertEquals("There should be MAX_BLOCK blockInfo's", maxBlocks, len);
+
+    headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd);
+
+    LOG.info("Moving each block to the head of the list...");
+    for (int i = 0; i < maxBlocks; i++) {
+      curIndex = blockInfoList.get(i).findStorageInfo(dd);
+      headIndex = dd.moveBlockToHead(blockInfoList.get(i), curIndex, headIndex);
+      // the moved element must be at the head of the list
+      assertEquals("Block should be at the head of the list now.",
+          blockInfoList.get(i), dd.getBlockListHeadForTesting());
+    }
+
+    // move head of the list to the head - this should not change the list
+    LOG.info("Moving head to the head...");
+
+    BlockInfo temp = dd.getBlockListHeadForTesting();
+    curIndex = 0;
+    headIndex = 0;
+    dd.moveBlockToHead(temp, curIndex, headIndex);
+    assertEquals(
+        "Moving head to the head of the list shopuld not change the list",
+        temp, dd.getBlockListHeadForTesting());
+
+    // check all elements of the list against the original blockInfoList
+    LOG.info("Checking elements of the list...");
+    temp = dd.getBlockListHeadForTesting();
+    assertNotNull("Head should not be null", temp);
+    int c = maxBlocks - 1;
+    while (temp != null) {
+      assertEquals("Expected element is not on the list",
+          blockInfoList.get(c--), temp);
+      temp = temp.getNext(0);
+    }
+
+    LOG.info("Moving random blocks to the head of the list...");
+    headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd);
+    Random rand = new Random();
+    for (int i = 0; i < maxBlocks; i++) {
+      int j = rand.nextInt(maxBlocks);
+      curIndex = blockInfoList.get(j).findStorageInfo(dd);
+      headIndex = dd.moveBlockToHead(blockInfoList.get(j), curIndex, headIndex);
+      // the moved element must be at the head of the list
+      assertEquals("Block should be at the head of the list now.",
+          blockInfoList.get(j), dd.getBlockListHeadForTesting());
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index a8fd71b..d5e0a99 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -65,7 +65,6 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
-import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -97,7 +96,6 @@ import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -1037,8 +1035,7 @@ public class TestBlockManager {
     // Make sure it's the first full report
     assertEquals(0, ds.getBlockReportCount());
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        builder.build(),
-        new BlockReportContext(1, 0, System.nanoTime(), 0, true));
+        builder.build(), null);
     assertEquals(1, ds.getBlockReportCount());
 
     // verify the storage info is correct
@@ -1105,67 +1102,6 @@ public class TestBlockManager {
   }
 
   @Test
-  public void testFullBR() throws Exception {
-    doReturn(true).when(fsn).isRunning();
-
-    DatanodeDescriptor node = nodes.get(0);
-    DatanodeStorageInfo ds = node.getStorageInfos()[0];
-    node.setAlive(true);
-    DatanodeRegistration nodeReg =  new DatanodeRegistration(node, null, null, "");
-
-    // register new node
-    bm.getDatanodeManager().registerDatanode(nodeReg);
-    bm.getDatanodeManager().addDatanode(node);
-    assertEquals(node, bm.getDatanodeManager().getDatanode(node));
-    assertEquals(0, ds.getBlockReportCount());
-
-    ArrayList<BlockInfo> blocks = new ArrayList<>();
-    for (int id = 24; id > 0; id--) {
-      blocks.add(addBlockToBM(id));
-    }
-
-    // Make sure it's the first full report
-    assertEquals(0, ds.getBlockReportCount());
-    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-                     generateReport(blocks),
-                     new BlockReportContext(1, 0, System.nanoTime(), 0, false));
-    assertEquals(1, ds.getBlockReportCount());
-    // verify the storage info is correct
-    for (BlockInfo block : blocks) {
-      assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
-    }
-
-    // Send unsorted report
-    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-                     generateReport(blocks),
-                     new BlockReportContext(1, 0, System.nanoTime(), 0, false));
-    assertEquals(2, ds.getBlockReportCount());
-    // verify the storage info is correct
-    for (BlockInfo block : blocks) {
-      assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
-    }
-
-    // Sort list and send a sorted report
-    Collections.sort(blocks);
-    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-                     generateReport(blocks),
-                     new BlockReportContext(1, 0, System.nanoTime(), 0, true));
-    assertEquals(3, ds.getBlockReportCount());
-    // verify the storage info is correct
-    for (BlockInfo block : blocks) {
-      assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
-    }
-  }
-
-  private BlockListAsLongs generateReport(List<BlockInfo> blocks) {
-    BlockListAsLongs.Builder builder = BlockListAsLongs.builder();
-    for (BlockInfo block : blocks) {
-      builder.add(new FinalizedReplica(block, null, null));
-    }
-    return builder.build();
-  }
-
-  @Test
   public void testUCBlockNotConsideredMissing() throws Exception {
     DatanodeDescriptor node = nodes.get(0);
     DatanodeStorageInfo ds = node.getStorageInfos()[0];
@@ -1695,8 +1631,8 @@ public class TestBlockManager {
     LocatedBlock lb = DFSTestUtil.getAllBlocks(dfs, file).get(0);
     BlockInfo blockInfo =
         blockManager.getStoredBlock(lb.getBlock().getLocalBlock());
-    Iterator<DatanodeStorageInfo> itr = blockInfo.getStorageInfos();
     LOG.info("Block " + blockInfo + " storages: ");
+    Iterator<DatanodeStorageInfo> itr = blockInfo.getStorageInfos();
     while (itr.hasNext()) {
       DatanodeStorageInfo dn = itr.next();
       LOG.info(" Rack: " + dn.getDatanodeDescriptor().getNetworkLocation()
@@ -1971,4 +1907,4 @@ public class TestBlockManager {
     // validateReconstructionWork return false, need to perform resetTargets().
     assertNull(work.getTargets());
   }
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java
index 40408b1..a5acc14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java
@@ -97,14 +97,13 @@ public class TestBlockReportLease {
       DelayAnswer delayer = new DelayAnswer(BlockManager.LOG);
       doAnswer(delayer).when(spyBlockManager).processReport(
           any(DatanodeStorageInfo.class),
-          any(BlockListAsLongs.class),
-          any(BlockReportContext.class));
+          any(BlockListAsLongs.class));
 
       ExecutorService pool = Executors.newFixedThreadPool(1);
 
       // Trigger sendBlockReport
       BlockReportContext brContext = new BlockReportContext(1, 0,
-          rand.nextLong(), hbResponse.getFullBlockReportLeaseId(), true);
+          rand.nextLong(), hbResponse.getFullBlockReportLeaseId());
       Future<DatanodeCommand> sendBRfuturea = pool.submit(() -> {
         // Build every storage with 100 blocks for sending report
         DatanodeStorage[] datanodeStorages
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
index 43f3243..19b8450 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
@@ -46,6 +46,7 @@ import org.slf4j.event.Level;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
 
 public class TestReconstructStripedBlocksWithRackAwareness {
@@ -172,7 +173,9 @@ public class TestReconstructStripedBlocksWithRackAwareness {
 
     // we now should have 9 internal blocks distributed in 5 racks
     Set<String> rackSet = new HashSet<>();
-    for (DatanodeStorageInfo storage : blockInfo.storages) {
+    Iterator<DatanodeStorageInfo> it = blockInfo.getStorageInfos();
+    while (it.hasNext()){
+      DatanodeStorageInfo storage = it.next();
       rackSet.add(storage.getDatanodeDescriptor().getNetworkLocation());
     }
     Assert.assertEquals("rackSet size is wrong: " + rackSet, dataBlocks - 1,
@@ -203,7 +206,9 @@ public class TestReconstructStripedBlocksWithRackAwareness {
     // check if redundancy monitor correctly schedule the reconstruction work.
     boolean scheduled = false;
     for (int i = 0; i < 5; i++) { // retry 5 times
-      for (DatanodeStorageInfo storage : blockInfo.storages) {
+      it = blockInfo.getStorageInfos();
+      while (it.hasNext()){
+        DatanodeStorageInfo storage = it.next();
         if (storage != null) {
           DatanodeDescriptor dn = storage.getDatanodeDescriptor();
           Assert.assertEquals("Block to be erasure coded is wrong for datanode:"
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index c4398ab..b56fc81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1516,7 +1516,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
-  public List<ReplicaInfo> getSortedFinalizedBlocks(String bpid) {
+  public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
     throw new UnsupportedOperationException();
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
index fb65d0e..958b3e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -118,13 +116,12 @@ public class TestBlockHasMultipleReplicasOnSameDN {
     StorageBlockReport reports[] =
         new StorageBlockReport[cluster.getStoragesPerDatanode()];
 
-    ArrayList<ReplicaInfo> blocks = new ArrayList<>();
+    ArrayList<Replica> blocks = new ArrayList<>();
 
     for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
       Block localBlock = locatedBlock.getBlock().getLocalBlock();
       blocks.add(new FinalizedReplica(localBlock, null, null));
     }
-    Collections.sort(blocks);
 
     try (FsDatasetSpi.FsVolumeReferences volumes =
       dn.getFSDataset().getFsVolumeReferences()) {
@@ -137,7 +134,7 @@ public class TestBlockHasMultipleReplicasOnSameDN {
 
     // Should not assert!
     cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports,
-        new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L));
 
     // Get the block locations once again.
     locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestLargeBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestLargeBlockReport.java
index 21e264a..1ea52a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestLargeBlockReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestLargeBlockReport.java
@@ -58,7 +58,6 @@ public class TestLargeBlockReport {
   private DatanodeStorage dnStorage;
   private final long reportId = 1;
   private final long fullBrLeaseId = 0;
-  private final boolean sorted = true;
 
   @BeforeClass
   public static void init() {
@@ -84,7 +83,7 @@ public class TestLargeBlockReport {
     StorageBlockReport[] reports = createReports(6000000);
     try {
       nnProxy.blockReport(bpRegistration, bpId, reports,
-          new BlockReportContext(1, 0, reportId, fullBrLeaseId, sorted));
+          new BlockReportContext(1, 0, reportId, fullBrLeaseId));
       fail("Should have failed because of the too long RPC data length");
     } catch (Exception e) {
       // Expected.  We can't reliably assert anything about the exception type
@@ -99,7 +98,7 @@ public class TestLargeBlockReport {
     initCluster();
     StorageBlockReport[] reports = createReports(6000000);
     nnProxy.blockReport(bpRegistration, bpId, reports,
-        new BlockReportContext(1, 0, reportId, fullBrLeaseId, sorted));
+        new BlockReportContext(1, 0, reportId, fullBrLeaseId));
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
index 791ee20..b1742cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
@@ -39,7 +39,7 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
       LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
       StorageBlockReport[] singletonReport = { report };
       cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
-          new BlockReportContext(reports.length, i, System.nanoTime(), 0L, true));
+          new BlockReportContext(reports.length, i, System.nanoTime(), 0L));
       i++;
     }
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
index a35fa48..fd19ba6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
@@ -36,6 +36,6 @@ public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase {
                                   StorageBlockReport[] reports) throws IOException {
     LOG.info("Sending combined block reports for " + dnR);
     cluster.getNameNodeRpc().blockReport(dnR, poolId, reports,
-        new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L));
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 2c27cc1..3fbd4de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -91,7 +91,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public List<ReplicaInfo> getSortedFinalizedBlocks(String bpid) {
+  public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
     return null;
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index dbb9548..9b659d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -88,7 +88,6 @@ import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.HashSet;
 import java.util.List;
@@ -774,41 +773,6 @@ public class TestFsDatasetImpl {
 
     FsDatasetTestUtil.assertFileLockReleased(badDir.toString());
   }
-
-  @Test
-  /**
-   * This test is here primarily to catch any case where the datanode replica
-   * map structure is changed to a new structure which is not sorted and hence
-   * reading the blocks from it directly would not be sorted.
-   */
-  public void testSortedFinalizedBlocksAreSorted() throws IOException {
-    this.conf = new HdfsConfiguration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
-    try {
-      cluster.waitActive();
-      DataNode dn = cluster.getDataNodes().get(0);
-
-      FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
-      ds.addBlockPool(BLOCKPOOL, conf);
-
-      // Load 1000 blocks with random blockIDs
-      for (int i=0; i<=1000; i++) {
-        ExtendedBlock eb = new ExtendedBlock(
-            BLOCKPOOL, new Random().nextInt(), 1000, 1000 + i);
-        cluster.getFsDatasetTestUtils(0).createFinalizedReplica(eb);
-      }
-      // Get the sorted blocks and validate the arrayList is sorted
-      List<ReplicaInfo> replicaList = ds.getSortedFinalizedBlocks(BLOCKPOOL);
-      for (int i=0; i<replicaList.size() - 1; i++) {
-        if (replicaList.get(i).compareTo(replicaList.get(i+1)) > 0) {
-          // Not sorted so fail the test
-          fail("ArrayList is not sorted, and it should be");
-        }
-      }
-    } finally {
-      cluster.shutdown();
-    }
-  }
   
   @Test
   public void testDeletingBlocks() throws IOException {
@@ -1766,7 +1730,7 @@ public class TestFsDatasetImpl {
       DataNode dn = cluster.getDataNodes().get(0);
       FsDatasetSpi fsdataset = dn.getFSDataset();
       List<ReplicaInfo> replicaInfos =
-          fsdataset.getSortedFinalizedBlocks(bpid);
+          fsdataset.getFinalizedBlocks(bpid);
       assertEquals(1, replicaInfos.size());
 
       ReplicaInfo replicaInfo = replicaInfos.get(0);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index 542f149..e861a34 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -958,7 +958,7 @@ public class NNThroughputBenchmark implements Tool {
           new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
       };
       dataNodeProto.blockReport(dnRegistration, bpid, reports,
-              new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
+              new BlockReportContext(1, 0, System.nanoTime(), 0L));
     }
 
     /**
@@ -1240,7 +1240,7 @@ public class NNThroughputBenchmark implements Tool {
       StorageBlockReport[] report = { new StorageBlockReport(
           dn.storage, dn.getBlockReportList()) };
       dataNodeProto.blockReport(dn.dnRegistration, bpid, report,
-          new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
+          new BlockReportContext(1, 0, System.nanoTime(), 0L));
       long end = Time.now();
       return end-start;
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
index ef91fc3..aec6811 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
-import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
@@ -351,8 +350,7 @@ public class TestAddStripedBlocks {
       StorageBlockReport[] reports = {new StorageBlockReport(storage,
           bll)};
       cluster.getNameNodeRpc().blockReport(dn.getDNRegistrationForBP(bpId),
-          bpId, reports,
-          new BlockReportContext(1, 0, System.nanoTime(), 0, true));
+          bpId, reports, null);
     }
 
     DatanodeStorageInfo[] locs = lastBlock.getUnderConstructionFeature()
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 16f4de9..01ea148 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -127,7 +127,7 @@ public class TestDeadDatanode {
         BlockListAsLongs.EMPTY) };
     try {
       dnp.blockReport(reg, poolId, report,
-          new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
+          new BlockReportContext(1, 0, System.nanoTime(), 0L));
       fail("Expected IOException is not thrown");
     } catch (IOException ex) {
       // Expected
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java
deleted file mode 100644
index d554b1b..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java
+++ /dev/null
@@ -1,644 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.util;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Random;
-
-/**
- * Test of TreeSet
- */
-public class FoldedTreeSetTest {
-
-  private static Random srand;
-
-  public FoldedTreeSetTest() {
-  }
-
-  @BeforeClass
-  public static void setUpClass() {
-    long seed = System.nanoTime();
-    System.out.println("This run uses the random seed " + seed);
-    srand = new Random(seed);
-  }
-
-  @AfterClass
-  public static void tearDownClass() {
-  }
-
-  @Before
-  public void setUp() {
-  }
-
-  @After
-  public void tearDown() {
-  }
-
-  /**
-   * Test of comparator method, of class TreeSet.
-   */
-  @Test
-  public void testComparator() {
-    Comparator<String> comparator = new Comparator<String>() {
-
-      @Override
-      public int compare(String o1, String o2) {
-        return o1.compareTo(o2);
-      }
-    };
-    assertEquals(null, new FoldedTreeSet<>().comparator());
-    assertEquals(comparator, new FoldedTreeSet<>(comparator).comparator());
-
-    FoldedTreeSet<String> set = new FoldedTreeSet<>(comparator);
-    set.add("apa3");
-    set.add("apa2");
-    set.add("apa");
-    set.add("apa5");
-    set.add("apa4");
-    assertEquals(5, set.size());
-    assertEquals("apa", set.get("apa"));
-  }
-
-  /**
-   * Test of first method, of class TreeSet.
-   */
-  @Test
-  public void testFirst() {
-    FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
-    for (int i = 0; i < 256; i++) {
-      tree.add(1024 + i);
-      assertEquals(1024, tree.first().intValue());
-    }
-    for (int i = 1; i < 256; i++) {
-      tree.remove(1024 + i);
-      assertEquals(1024, tree.first().intValue());
-    }
-  }
-
-  /**
-   * Test of last method, of class TreeSet.
-   */
-  @Test
-  public void testLast() {
-    FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
-    for (int i = 0; i < 256; i++) {
-      tree.add(1024 + i);
-      assertEquals(1024 + i, tree.last().intValue());
-    }
-    for (int i = 0; i < 255; i++) {
-      tree.remove(1024 + i);
-      assertEquals(1279, tree.last().intValue());
-    }
-  }
-
-  /**
-   * Test of size method, of class TreeSet.
-   */
-  @Test
-  public void testSize() {
-    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
-    String entry = "apa";
-    assertEquals(0, instance.size());
-    instance.add(entry);
-    assertEquals(1, instance.size());
-    instance.remove(entry);
-    assertEquals(0, instance.size());
-  }
-
-  /**
-   * Test of isEmpty method, of class TreeSet.
-   */
-  @Test
-  public void testIsEmpty() {
-    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
-    boolean expResult = true;
-    boolean result = instance.isEmpty();
-    assertEquals(expResult, result);
-    instance.add("apa");
-    instance.remove("apa");
-    assertEquals(expResult, result);
-  }
-
-  /**
-   * Test of contains method, of class TreeSet.
-   */
-  @Test
-  public void testContains() {
-    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
-    String entry = "apa";
-    assertEquals(false, instance.contains(entry));
-    instance.add(entry);
-    assertEquals(true, instance.contains(entry));
-    assertEquals(false, instance.contains(entry + entry));
-  }
-
-  /**
-   * Test of iterator method, of class TreeSet.
-   */
-  @Test
-  public void testIterator() {
-
-    for (int iter = 0; iter < 10; iter++) {
-      FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
-      long[] longs = new long[64723];
-      for (int i = 0; i < longs.length; i++) {
-        Holder val = new Holder(srand.nextLong());
-        while (set.contains(val)) {
-          val = new Holder(srand.nextLong());
-        }
-        longs[i] = val.getId();
-        set.add(val);
-      }
-      assertEquals(longs.length, set.size());
-      Arrays.sort(longs);
-
-      Iterator<Holder> it = set.iterator();
-      for (int i = 0; i < longs.length; i++) {
-        assertTrue(it.hasNext());
-        Holder val = it.next();
-        assertEquals(longs[i], val.getId());
-        // remove randomly to force non linear removes
-        if (srand.nextBoolean()) {
-          it.remove();
-        }
-      }
-    }
-  }
-
-  /**
-   * Test of toArray method, of class TreeSet.
-   */
-  @Test
-  public void testToArray() {
-    FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
-    ArrayList<Integer> list = new ArrayList<>(256);
-    for (int i = 0; i < 256; i++) {
-      list.add(1024 + i);
-    }
-    tree.addAll(list);
-    assertArrayEquals(list.toArray(), tree.toArray());
-  }
-
-  /**
-   * Test of toArray method, of class TreeSet.
-   */
-  @Test
-  public void testToArray_GenericType() {
-    FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
-    ArrayList<Integer> list = new ArrayList<>(256);
-    for (int i = 0; i < 256; i++) {
-      list.add(1024 + i);
-    }
-    tree.addAll(list);
-    assertArrayEquals(list.toArray(new Integer[tree.size()]), tree.toArray(new Integer[tree.size()]));
-    assertArrayEquals(list.toArray(new Integer[tree.size() + 100]), tree.toArray(new Integer[tree.size() + 100]));
-  }
-
-  /**
-   * Test of add method, of class TreeSet.
-   */
-  @Test
-  public void testAdd() {
-    FoldedTreeSet<String> simpleSet = new FoldedTreeSet<>();
-    String entry = "apa";
-    assertTrue(simpleSet.add(entry));
-    assertFalse(simpleSet.add(entry));
-
-    FoldedTreeSet<Integer> intSet = new FoldedTreeSet<>();
-    for (int i = 512; i < 1024; i++) {
-      assertTrue(intSet.add(i));
-    }
-    for (int i = -1024; i < -512; i++) {
-      assertTrue(intSet.add(i));
-    }
-    for (int i = 0; i < 512; i++) {
-      assertTrue(intSet.add(i));
-    }
-    for (int i = -512; i < 0; i++) {
-      assertTrue(intSet.add(i));
-    }
-    assertEquals(2048, intSet.size());
-
-    FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
-    long[] longs = new long[23432];
-    for (int i = 0; i < longs.length; i++) {
-      Holder val = new Holder(srand.nextLong());
-      while (set.contains(val)) {
-        val = new Holder(srand.nextLong());
-      }
-      longs[i] = val.getId();
-      assertTrue(set.add(val));
-    }
-    assertEquals(longs.length, set.size());
-    Arrays.sort(longs);
-
-    Iterator<Holder> it = set.iterator();
-    for (int i = 0; i < longs.length; i++) {
-      assertTrue(it.hasNext());
-      Holder val = it.next();
-      assertEquals(longs[i], val.getId());
-    }
-
-    // Specially constructed adds to exercise all code paths
-    FoldedTreeSet<Integer> specialAdds = new FoldedTreeSet<>();
-    // Fill node with even numbers
-    for (int i = 0; i < 128; i += 2) {
-      assertTrue(specialAdds.add(i));
-    }
-    // Remove left and add left
-    assertTrue(specialAdds.remove(0));
-    assertTrue(specialAdds.add(-1));
-    assertTrue(specialAdds.remove(-1));
-    // Add right and shift everything left
-    assertTrue(specialAdds.add(127));
-    assertTrue(specialAdds.remove(127));
-
-    // Empty at both ends
-    assertTrue(specialAdds.add(0));
-    assertTrue(specialAdds.remove(0));
-    assertTrue(specialAdds.remove(126));
-    // Add in the middle left to slide entries left
-    assertTrue(specialAdds.add(11));
-    assertTrue(specialAdds.remove(11));
-    // Add in the middle right to slide entries right
-    assertTrue(specialAdds.add(99));
-    assertTrue(specialAdds.remove(99));
-    // Add existing entry in the middle of a node
-    assertFalse(specialAdds.add(64));
-  }
-
-  @Test
-  public void testAddOrReplace() {
-    FoldedTreeSet<String> simpleSet = new FoldedTreeSet<>();
-    String entry = "apa";
-    assertNull(simpleSet.addOrReplace(entry));
-    assertEquals(entry, simpleSet.addOrReplace(entry));
-
-    FoldedTreeSet<Integer> intSet = new FoldedTreeSet<>();
-    for (int i = 0; i < 1024; i++) {
-      assertNull(intSet.addOrReplace(i));
-    }
-    for (int i = 0; i < 1024; i++) {
-      assertEquals(i, intSet.addOrReplace(i).intValue());
-    }
-  }
-
-  private static class Holder implements Comparable<Holder> {
-
-    private final long id;
-
-    public Holder(long id) {
-      this.id = id;
-    }
-
-    public long getId() {
-      return id;
-    }
-
-    @Override
-    public int compareTo(Holder o) {
-      return id < o.getId() ? -1
-             : id > o.getId() ? 1 : 0;
-    }
-  }
-
-  @Test
-  public void testRemoveWithComparator() {
-    FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
-    long[] longs = new long[98327];
-    for (int i = 0; i < longs.length; i++) {
-      Holder val = new Holder(srand.nextLong());
-      while (set.contains(val)) {
-        val = new Holder(srand.nextLong());
-      }
-      longs[i] = val.getId();
-      set.add(val);
-    }
-    assertEquals(longs.length, set.size());
-    Comparator<Object> cmp = new Comparator<Object>() {
-      @Override
-      public int compare(Object o1, Object o2) {
-        long lookup = (long) o1;
-        long stored = ((Holder) o2).getId();
-        return lookup < stored ? -1
-               : lookup > stored ? 1 : 0;
-      }
-    };
-
-    for (long val : longs) {
-      set.remove(val, cmp);
-    }
-    assertEquals(0, set.size());
-    assertTrue(set.isEmpty());
-  }
-
-  @Test
-  public void testGetWithComparator() {
-    FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
-    long[] longs = new long[32147];
-    for (int i = 0; i < longs.length; i++) {
-      Holder val = new Holder(srand.nextLong());
-      while (set.contains(val)) {
-        val = new Holder(srand.nextLong());
-      }
-      longs[i] = val.getId();
-      set.add(val);
-    }
-    assertEquals(longs.length, set.size());
-    Comparator<Object> cmp = new Comparator<Object>() {
-      @Override
-      public int compare(Object o1, Object o2) {
-        long lookup = (long) o1;
-        long stored = ((Holder) o2).getId();
-        return lookup < stored ? -1
-               : lookup > stored ? 1 : 0;
-      }
-    };
-
-    for (long val : longs) {
-      assertEquals(val, set.get(val, cmp).getId());
-    }
-  }
-
-  @Test
-  public void testGet() {
-    FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
-    long[] longs = new long[43277];
-    for (int i = 0; i < longs.length; i++) {
-      Holder val = new Holder(srand.nextLong());
-      while (set.contains(val)) {
-        val = new Holder(srand.nextLong());
-      }
-      longs[i] = val.getId();
-      set.add(val);
-    }
-    assertEquals(longs.length, set.size());
-
-    for (long val : longs) {
-      assertEquals(val, set.get(new Holder(val)).getId());
-    }
-  }
-
-  /**
-   * Test of remove method, of class TreeSet.
-   */
-  @Test
-  public void testRemove() {
-    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
-    assertEquals(false, instance.remove("apa"));
-    instance.add("apa");
-    assertEquals(true, instance.remove("apa"));
-
-    removeLeft();
-    removeRight();
-    removeAt();
-    removeRandom();
-  }
-
-  public void removeLeft() {
-    FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
-    for (int i = 1; i <= 320; i++) {
-      set.add(i);
-    }
-    for (int i = 193; i < 225; i++) {
-      assertEquals(true, set.remove(i));
-      assertEquals(false, set.remove(i));
-    }
-    for (int i = 129; i < 161; i++) {
-      assertEquals(true, set.remove(i));
-      assertEquals(false, set.remove(i));
-    }
-    for (int i = 256; i > 224; i--) {
-      assertEquals(true, set.remove(i));
-      assertEquals(false, set.remove(i));
-    }
-    for (int i = 257; i < 289; i++) {
-      assertEquals(true, set.remove(i));
-      assertEquals(false, set.remove(i));
-    }
-    while (!set.isEmpty()) {
-      assertTrue(set.remove(set.first()));
-    }
-  }
-
-  public void removeRight() {
-    FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
-    for (int i = 1; i <= 320; i++) {
-      set.add(i);
-    }
-    for (int i = 193; i < 225; i++) {
-      assertEquals(true, set.remove(i));
-      assertEquals(false, set.remove(i));
-    }
-    for (int i = 192; i > 160; i--) {
-      assertEquals(true, set.remove(i));
-      assertEquals(false, set.remove(i));
-    }
-    for (int i = 256; i > 224; i--) {
-      assertEquals(true, set.remove(i));
-      assertEquals(false, set.remove(i));
-    }
-    for (int i = 320; i > 288; i--) {
-      assertEquals(true, set.remove(i));
-      assertEquals(false, set.remove(i));
-    }
-    while (!set.isEmpty()) {
-      assertTrue(set.remove(set.last()));
-    }
-  }
-
-  public void removeAt() {
-    FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
-    for (int i = 1; i <= 320; i++) {
-      set.add(i);
-    }
-    for (int i = 193; i < 225; i++) {
-      assertEquals(true, set.remove(i));
-      assertEquals(false, set.remove(i));
-    }
-    for (int i = 160; i < 192; i++) {
-      assertEquals(true, set.remove(i));
-      assertEquals(false, set.remove(i));
-    }
-    for (int i = 225; i < 257; i++) {
-      assertEquals(true, set.remove(i));
-      assertEquals(false, set.remove(i));
-    }
-    for (int i = 288; i < 320; i++) {
-      assertEquals(true, set.remove(i));
-      assertEquals(false, set.remove(i));
-    }
-  }
-
-  public void removeRandom() {
-    FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
-    int[] integers = new int[2048];
-    for (int i = 0; i < 2048; i++) {
-      int val = srand.nextInt();
-      while (set.contains(val)) {
-        val = srand.nextInt();
-      }
-      integers[i] = val;
-      set.add(val);
-    }
-    assertEquals(2048, set.size());
-
-    for (int val : integers) {
-      assertEquals(true, set.remove(val));
-      assertEquals(false, set.remove(val));
-    }
-    assertEquals(true, set.isEmpty());
-  }
-
-  /**
-   * Test of containsAll method, of class TreeSet.
-   */
-  @Test
-  public void testContainsAll() {
-    Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
-    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
-    assertEquals(false, instance.containsAll(list));
-    instance.addAll(list);
-    assertEquals(true, instance.containsAll(list));
-  }
-
-  /**
-   * Test of addAll method, of class TreeSet.
-   */
-  @Test
-  public void testAddAll() {
-    Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
-    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
-    assertEquals(true, instance.addAll(list));
-    assertEquals(false, instance.addAll(list)); // add same entries again
-  }
-
-  /**
-   * Test of retainAll method, of class TreeSet.
-   */
-  @Test
-  public void testRetainAll() {
-    Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
-    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
-    instance.addAll(list);
-    assertEquals(false, instance.retainAll(list));
-    assertEquals(2, instance.size());
-    Collection<String> list2 = Arrays.asList(new String[]{"apa"});
-    assertEquals(true, instance.retainAll(list2));
-    assertEquals(1, instance.size());
-  }
-
-  /**
-   * Test of removeAll method, of class TreeSet.
-   */
-  @Test
-  public void testRemoveAll() {
-    Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
-    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
-    assertEquals(false, instance.removeAll(list));
-    instance.addAll(list);
-    assertEquals(true, instance.removeAll(list));
-    assertEquals(true, instance.isEmpty());
-  }
-
-  /**
-   * Test of clear method, of class TreeSet.
-   */
-  @Test
-  public void testClear() {
-    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
-    instance.clear();
-    assertEquals(true, instance.isEmpty());
-    instance.add("apa");
-    assertEquals(false, instance.isEmpty());
-    instance.clear();
-    assertEquals(true, instance.isEmpty());
-  }
-
-  @Test
-  public void testFillRatio() {
-    FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
-    final int size = 1024;
-    for (int i = 1; i <= size; i++) {
-      set.add(i);
-      assertEquals("Iteration: " + i, 1.0, set.fillRatio(), 0.0);
-    }
-
-    for (int i = 1; i <= size / 2; i++) {
-      set.remove(i * 2);
-      // Need the max since all the removes from the last node doesn't
-      // affect the fill ratio
-      assertEquals("Iteration: " + i,
-                   Math.max((size - i) / (double) size, 0.53125),
-                   set.fillRatio(), 0.0);
-    }
-  }
-
-  @Test
-  public void testCompact() {
-    FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
-    long[] longs = new long[24553];
-    for (int i = 0; i < longs.length; i++) {
-      Holder val = new Holder(srand.nextLong());
-      while (set.contains(val)) {
-        val = new Holder(srand.nextLong());
-      }
-      longs[i] = val.getId();
-      set.add(val);
-    }
-    assertEquals(longs.length, set.size());
-
-    long[] longs2 = new long[longs.length];
-    for (int i = 0; i < longs2.length; i++) {
-      Holder val = new Holder(srand.nextLong());
-      while (set.contains(val)) {
-        val = new Holder(srand.nextLong());
-      }
-      longs2[i] = val.getId();
-      set.add(val);
-    }
-    assertEquals(longs.length + longs2.length, set.size());
-
-    // Create fragementation
-    for (long val : longs) {
-      assertTrue(set.remove(new Holder(val)));
-    }
-    assertEquals(longs2.length, set.size());
-
-    assertFalse(set.compact(0));
-    assertTrue(set.compact(Long.MAX_VALUE));
-    assertEquals(longs2.length, set.size());
-    for (long val : longs) {
-      assertFalse(set.remove(new Holder(val)));
-    }
-    for (long val : longs2) {
-      assertEquals(val, set.get(new Holder(val)).getId());
-    }
-  }
-}

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org