You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2016/02/02 20:28:54 UTC

[1/2] hadoop git commit: HDFS-9260. Improve the performance and GC friendliness of NameNode startup and full block reports (Staffan Friberg via cmccabe)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 2da03b48e -> dd9ebf6ee


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java
new file mode 100644
index 0000000..1c6be1d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java
@@ -0,0 +1,1285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import org.apache.hadoop.util.Time;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * A memory efficient implementation of RBTree. Instead of having a Node for
+ * each entry each node contains an array holding 64 entries.
+ *
+ * Based on the Apache Harmony folded TreeMap.
+ *
+ * @param <E> Entry type
+ */
+public class FoldedTreeSet<E> implements SortedSet<E> {
+
+  private static final boolean RED = true;
+  private static final boolean BLACK = false;
+
+  private final Comparator<E> comparator;
+  private Node<E> root;
+  private int size;
+  private int nodeCount;
+  private int modCount;
+  private Node<E> cachedNode;
+
+  /**
+   * Internal tree node that holds a sorted array of entries.
+   *
+   * @param <E> type of the elements
+   */
+  private static class Node<E> {
+
+    private static final int NODE_SIZE = 64;
+
+    // Tree structure
+    private Node<E> parent, left, right;
+    private boolean color;
+    private final E[] entries;
+    private int leftIndex = 0, rightIndex = -1;
+    private int size = 0;
+    // List for fast ordered iteration
+    private Node<E> prev, next;
+
+    @SuppressWarnings("unchecked")
+    public Node() {
+      entries = (E[]) new Object[NODE_SIZE];
+    }
+
+    public boolean isRed() {
+      return color == RED;
+    }
+
+    public boolean isBlack() {
+      return color == BLACK;
+    }
+
+    public Node<E> getLeftMostNode() {
+      Node<E> node = this;
+      while (node.left != null) {
+        node = node.left;
+      }
+      return node;
+    }
+
+    public Node<E> getRightMostNode() {
+      Node<E> node = this;
+      while (node.right != null) {
+        node = node.right;
+      }
+      return node;
+    }
+
+    public void addEntryLeft(E entry) {
+      assert rightIndex < entries.length;
+      assert !isFull();
+
+      if (leftIndex == 0) {
+        rightIndex++;
+        // Shift entries right/up
+        System.arraycopy(entries, 0, entries, 1, size);
+      } else {
+        leftIndex--;
+      }
+      size++;
+      entries[leftIndex] = entry;
+    }
+
+    public void addEntryRight(E entry) {
+      assert !isFull();
+
+      if (rightIndex == NODE_SIZE - 1) {
+        assert leftIndex > 0;
+        // Shift entries left/down
+        System.arraycopy(entries, leftIndex, entries, --leftIndex, size);
+      } else {
+        rightIndex++;
+      }
+      size++;
+      entries[rightIndex] = entry;
+    }
+
+    public void addEntryAt(E entry, int index) {
+      assert !isFull();
+
+      if (leftIndex == 0 || ((rightIndex != Node.NODE_SIZE - 1)
+                             && (rightIndex - index <= index - leftIndex))) {
+        rightIndex++;
+        System.arraycopy(entries, index,
+                         entries, index + 1, rightIndex - index);
+        entries[index] = entry;
+      } else {
+        int newLeftIndex = leftIndex - 1;
+        System.arraycopy(entries, leftIndex,
+                         entries, newLeftIndex, index - leftIndex);
+        leftIndex = newLeftIndex;
+        entries[index - 1] = entry;
+      }
+      size++;
+    }
+
+    public void addEntriesLeft(Node<E> from) {
+      leftIndex -= from.size;
+      size += from.size;
+      System.arraycopy(from.entries, from.leftIndex,
+                       entries, leftIndex, from.size);
+    }
+
+    public void addEntriesRight(Node<E> from) {
+      System.arraycopy(from.entries, from.leftIndex,
+                       entries, rightIndex + 1, from.size);
+      size += from.size;
+      rightIndex += from.size;
+    }
+
+    public E insertEntrySlideLeft(E entry, int index) {
+      E pushedEntry = entries[0];
+      System.arraycopy(entries, 1, entries, 0, index - 1);
+      entries[index - 1] = entry;
+      return pushedEntry;
+    }
+
+    public E insertEntrySlideRight(E entry, int index) {
+      E movedEntry = entries[rightIndex];
+      System.arraycopy(entries, index, entries, index + 1, rightIndex - index);
+      entries[index] = entry;
+      return movedEntry;
+    }
+
+    public E removeEntryLeft() {
+      assert !isEmpty();
+      E entry = entries[leftIndex];
+      entries[leftIndex] = null;
+      leftIndex++;
+      size--;
+      return entry;
+    }
+
+    public E removeEntryRight() {
+      assert !isEmpty();
+      E entry = entries[rightIndex];
+      entries[rightIndex] = null;
+      rightIndex--;
+      size--;
+      return entry;
+    }
+
+    public E removeEntryAt(int index) {
+      assert !isEmpty();
+
+      E entry = entries[index];
+      int rightSize = rightIndex - index;
+      int leftSize = index - leftIndex;
+      if (rightSize <= leftSize) {
+        System.arraycopy(entries, index + 1, entries, index, rightSize);
+        entries[rightIndex] = null;
+        rightIndex--;
+      } else {
+        System.arraycopy(entries, leftIndex, entries, leftIndex + 1, leftSize);
+        entries[leftIndex] = null;
+        leftIndex++;
+      }
+      size--;
+      return entry;
+    }
+
+    public boolean isFull() {
+      return size == NODE_SIZE;
+    }
+
+    public boolean isEmpty() {
+      return size == 0;
+    }
+
+    public void clear() {
+      if (leftIndex < rightIndex) {
+        Arrays.fill(entries, leftIndex, rightIndex + 1, null);
+      }
+      size = 0;
+      leftIndex = 0;
+      rightIndex = -1;
+      prev = null;
+      next = null;
+      parent = null;
+      left = null;
+      right = null;
+      color = BLACK;
+    }
+  }
+
+  private static final class TreeSetIterator<E> implements Iterator<E> {
+
+    private final FoldedTreeSet<E> tree;
+    private int iteratorModCount;
+    private Node<E> node;
+    private int index;
+    private E lastEntry;
+    private int lastIndex;
+    private Node<E> lastNode;
+
+    private TreeSetIterator(FoldedTreeSet<E> tree) {
+      this.tree = tree;
+      this.iteratorModCount = tree.modCount;
+      if (!tree.isEmpty()) {
+        this.node = tree.root.getLeftMostNode();
+        this.index = this.node.leftIndex;
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      checkForModification();
+      return node != null;
+    }
+
+    @Override
+    public E next() {
+      if (hasNext()) {
+        lastEntry = node.entries[index];
+        lastIndex = index;
+        lastNode = node;
+        if (++index > node.rightIndex) {
+          node = node.next;
+          if (node != null) {
+            index = node.leftIndex;
+          }
+        }
+        return lastEntry;
+      } else {
+        throw new NoSuchElementException("Iterator exhausted");
+      }
+    }
+
+    @Override
+    public void remove() {
+      if (lastEntry == null) {
+        throw new IllegalStateException("No current element");
+      }
+      checkForModification();
+      if (lastNode.size == 1) {
+        // Safe to remove lastNode, the iterator is on the next node
+        tree.deleteNode(lastNode);
+      } else if (lastNode.leftIndex == lastIndex) {
+        // Safe to remove leftmost entry, the iterator is on the next index
+        lastNode.removeEntryLeft();
+      } else if (lastNode.rightIndex == lastIndex) {
+        // Safe to remove the rightmost entry, the iterator is on the next node
+        lastNode.removeEntryRight();
+      } else {
+        // Remove entry in the middle of the array
+        assert node == lastNode;
+        int oldRIndex = lastNode.rightIndex;
+        lastNode.removeEntryAt(lastIndex);
+        if (oldRIndex > lastNode.rightIndex) {
+          // Entries moved to the left in the array so index must be reset
+          index = lastIndex;
+        }
+      }
+      lastEntry = null;
+      iteratorModCount++;
+      tree.modCount++;
+      tree.size--;
+    }
+
+    private void checkForModification() {
+      if (iteratorModCount != tree.modCount) {
+        throw new ConcurrentModificationException("Tree has been modified "
+                                                  + "outside of iterator");
+      }
+    }
+  }
+
+  /**
+   * Create a new TreeSet that uses the natural ordering of objects. The element
+   * type must implement Comparable.
+   */
+  public FoldedTreeSet() {
+    this(null);
+  }
+
+  /**
+   * Create a new TreeSet that orders the elements using the supplied
+   * Comparator.
+   *
+   * @param comparator Comparator able to compare elements of type E
+   */
+  public FoldedTreeSet(Comparator<E> comparator) {
+    this.comparator = comparator;
+  }
+
+  private Node<E> cachedOrNewNode(E entry) {
+    Node<E> node = (cachedNode != null) ? cachedNode : new Node<E>();
+    cachedNode = null;
+    nodeCount++;
+    // Since BlockIDs are always increasing for new blocks it is best to
+    // add values on the left side to enable quicker inserts on the right
+    node.addEntryLeft(entry);
+    return node;
+  }
+
+  private void cacheAndClear(Node<E> node) {
+    if (cachedNode == null) {
+      node.clear();
+      cachedNode = node;
+    }
+  }
+
+  @Override
+  public Comparator<? super E> comparator() {
+    return comparator;
+  }
+
+  @Override
+  public SortedSet<E> subSet(E fromElement, E toElement) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public SortedSet<E> headSet(E toElement) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public SortedSet<E> tailSet(E fromElement) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public E first() {
+    if (!isEmpty()) {
+      Node<E> node = root.getLeftMostNode();
+      return node.entries[node.leftIndex];
+    }
+    return null;
+  }
+
+  @Override
+  public E last() {
+    if (!isEmpty()) {
+      Node<E> node = root.getRightMostNode();
+      return node.entries[node.rightIndex];
+    }
+    return null;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return root == null;
+  }
+
+  /**
+   * Lookup and return a stored object using a user provided comparator.
+   *
+   * @param obj Lookup key
+   * @param cmp User provided Comparator. The comparator should expect that the
+   *            proved obj will always be the first method parameter and any
+   *            stored object will be the second parameter.
+   *
+   * @return A matching stored object or null if non is found
+   */
+  public E get(Object obj, Comparator<?> cmp) {
+    Objects.requireNonNull(obj);
+
+    Node<E> node = root;
+    while (node != null) {
+      E[] entries = node.entries;
+
+      int leftIndex = node.leftIndex;
+      int result = compare(obj, entries[leftIndex], cmp);
+      if (result < 0) {
+        node = node.left;
+      } else if (result == 0) {
+        return entries[leftIndex];
+      } else {
+        int rightIndex = node.rightIndex;
+        if (leftIndex != rightIndex) {
+          result = compare(obj, entries[rightIndex], cmp);
+        }
+        if (result == 0) {
+          return entries[rightIndex];
+        } else if (result > 0) {
+          node = node.right;
+        } else {
+          int low = leftIndex + 1;
+          int high = rightIndex - 1;
+          while (low <= high) {
+            int mid = (low + high) >>> 1;
+            result = compare(obj, entries[mid], cmp);
+            if (result > 0) {
+              low = mid + 1;
+            } else if (result < 0) {
+              high = mid - 1;
+            } else {
+              return entries[mid];
+            }
+          }
+          return null;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Lookup and return a stored object.
+   *
+   * @param entry Lookup entry
+   *
+   * @return A matching stored object or null if non is found
+   */
+  public E get(E entry) {
+    return get(entry, comparator);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public boolean contains(Object obj) {
+    return get((E) obj) != null;
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private static int compare(Object lookup, Object stored, Comparator cmp) {
+    return cmp != null
+           ? cmp.compare(lookup, stored)
+           : ((Comparable<Object>) lookup).compareTo(stored);
+  }
+
+  @Override
+  public Iterator<E> iterator() {
+    return new TreeSetIterator<>(this);
+  }
+
+  @Override
+  public Object[] toArray() {
+    Object[] objects = new Object[size];
+    if (!isEmpty()) {
+      int pos = 0;
+      for (Node<E> node = root.getLeftMostNode(); node != null;
+          pos += node.size, node = node.next) {
+        System.arraycopy(node.entries, node.leftIndex, objects, pos, node.size);
+      }
+    }
+    return objects;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> T[] toArray(T[] a) {
+    T[] r = a.length >= size ? a
+            : (T[]) java.lang.reflect.Array
+        .newInstance(a.getClass().getComponentType(), size);
+    if (!isEmpty()) {
+      Node<E> node = root.getLeftMostNode();
+      int pos = 0;
+      while (node != null) {
+        System.arraycopy(node.entries, node.leftIndex, r, pos, node.size);
+        pos += node.size;
+        node = node.next;
+      }
+      if (r.length > pos) {
+        r[pos] = null;
+      }
+    } else if (a.length > 0) {
+      a[0] = null;
+    }
+    return r;
+  }
+
+  /**
+   * Add or replace an entry in the TreeSet.
+   *
+   * @param entry Entry to add or replace/update.
+   *
+   * @return the previous entry, or null if this set did not already contain the
+   *         specified entry
+   */
+  public E addOrReplace(E entry) {
+    return add(entry, true);
+  }
+
+  @Override
+  public boolean add(E entry) {
+    return add(entry, false) == null;
+  }
+
+  /**
+   * Internal add method to add a entry to the set.
+   *
+   * @param entry   Entry to add
+   * @param replace Should the entry replace an old entry which is equal to the
+   *                new entry
+   *
+   * @return null if entry added and didn't exist or the previous value (which
+   *         might not have been overwritten depending on the replace parameter)
+   */
+  private E add(E entry, boolean replace) {
+    Objects.requireNonNull(entry);
+
+    // Empty tree
+    if (isEmpty()) {
+      root = cachedOrNewNode(entry);
+      size = 1;
+      modCount++;
+      return null;
+    }
+
+    // Compare right entry first since inserts of comperatively larger entries
+    // is more likely to be inserted. BlockID is always increasing in HDFS.
+    Node<E> node = root;
+    Node<E> prevNode = null;
+    int result = 0;
+    while (node != null) {
+      prevNode = node;
+      E[] entries = node.entries;
+      int rightIndex = node.rightIndex;
+      result = compare(entry, entries[rightIndex], comparator);
+      if (result > 0) {
+        node = node.right;
+      } else if (result == 0) {
+        E prevEntry = entries[rightIndex];
+        if (replace) {
+          entries[rightIndex] = entry;
+        }
+        return prevEntry;
+      } else {
+        int leftIndex = node.leftIndex;
+        if (leftIndex != rightIndex) {
+          result = compare(entry, entries[leftIndex], comparator);
+        }
+        if (result < 0) {
+          node = node.left;
+        } else if (result == 0) {
+          E prevEntry = entries[leftIndex];
+          if (replace) {
+            entries[leftIndex] = entry;
+          }
+          return prevEntry;
+        } else {
+          // Insert in this node
+          int low = leftIndex + 1, high = rightIndex - 1;
+          while (low <= high) {
+            int mid = (low + high) >>> 1;
+            result = compare(entry, entries[mid], comparator);
+            if (result > 0) {
+              low = mid + 1;
+            } else if (result == 0) {
+              E prevEntry = entries[mid];
+              if (replace) {
+                entries[mid] = entry;
+              }
+              return prevEntry;
+            } else {
+              high = mid - 1;
+            }
+          }
+          addElementInNode(node, entry, low);
+          return null;
+        }
+      }
+    }
+
+    assert prevNode != null;
+    size++;
+    modCount++;
+    if (!prevNode.isFull()) {
+      // The previous node still has space
+      if (result < 0) {
+        prevNode.addEntryLeft(entry);
+      } else {
+        prevNode.addEntryRight(entry);
+      }
+    } else if (result < 0) {
+      // The previous node is full, add to adjencent node or a new node
+      if (prevNode.prev != null && !prevNode.prev.isFull()) {
+        prevNode.prev.addEntryRight(entry);
+      } else {
+        attachNodeLeft(prevNode, cachedOrNewNode(entry));
+      }
+    } else if (prevNode.next != null && !prevNode.next.isFull()) {
+      prevNode.next.addEntryLeft(entry);
+    } else {
+      attachNodeRight(prevNode, cachedOrNewNode(entry));
+    }
+    return null;
+  }
+
+  /**
+   * Insert an entry last in the sorted tree. The entry must be the considered
+   * larger than the currently largest entry in the set when doing
+   * current.compareTo(entry), if entry is not the largest entry the method will
+   * fall back on the regular add method.
+   *
+   * @param entry entry to add
+   *
+   * @return True if added, false if already existed in the set
+   */
+  public boolean addSortedLast(E entry) {
+
+    if (isEmpty()) {
+      root = cachedOrNewNode(entry);
+      size = 1;
+      modCount++;
+      return true;
+    } else {
+      Node<E> node = root.getRightMostNode();
+      if (compare(node.entries[node.rightIndex], entry, comparator) < 0) {
+        size++;
+        modCount++;
+        if (!node.isFull()) {
+          node.addEntryRight(entry);
+        } else {
+          attachNodeRight(node, cachedOrNewNode(entry));
+        }
+        return true;
+      }
+    }
+
+    // Fallback on normal add if entry is unsorted
+    return add(entry);
+  }
+
+  private void addElementInNode(Node<E> node, E entry, int index) {
+    size++;
+    modCount++;
+
+    if (!node.isFull()) {
+      node.addEntryAt(entry, index);
+    } else {
+      // Node is full, insert and push old entry
+      Node<E> prev = node.prev;
+      Node<E> next = node.next;
+      if (prev == null) {
+        // First check if we have space in the the next node
+        if (next != null && !next.isFull()) {
+          E movedEntry = node.insertEntrySlideRight(entry, index);
+          next.addEntryLeft(movedEntry);
+        } else {
+          // Since prev is null the left child must be null
+          assert node.left == null;
+          E movedEntry = node.insertEntrySlideLeft(entry, index);
+          Node<E> newNode = cachedOrNewNode(movedEntry);
+          attachNodeLeft(node, newNode);
+        }
+      } else if (!prev.isFull()) {
+        // Prev has space
+        E movedEntry = node.insertEntrySlideLeft(entry, index);
+        prev.addEntryRight(movedEntry);
+      } else if (next == null) {
+        // Since next is null the right child must be null
+        assert node.right == null;
+        E movedEntry = node.insertEntrySlideRight(entry, index);
+        Node<E> newNode = cachedOrNewNode(movedEntry);
+        attachNodeRight(node, newNode);
+      } else if (!next.isFull()) {
+        // Next has space
+        E movedEntry = node.insertEntrySlideRight(entry, index);
+        next.addEntryLeft(movedEntry);
+      } else {
+        // Both prev and next nodes exist and are full
+        E movedEntry = node.insertEntrySlideRight(entry, index);
+        Node<E> newNode = cachedOrNewNode(movedEntry);
+        if (node.right == null) {
+          attachNodeRight(node, newNode);
+        } else {
+          // Since our right node exist,
+          // the left node of our next node must be empty
+          assert next.left == null;
+          attachNodeLeft(next, newNode);
+        }
+      }
+    }
+  }
+
+  private void attachNodeLeft(Node<E> node, Node<E> newNode) {
+    newNode.parent = node;
+    node.left = newNode;
+
+    newNode.next = node;
+    newNode.prev = node.prev;
+    if (newNode.prev != null) {
+      newNode.prev.next = newNode;
+    }
+    node.prev = newNode;
+    balanceInsert(newNode);
+  }
+
+  private void attachNodeRight(Node<E> node, Node<E> newNode) {
+    newNode.parent = node;
+    node.right = newNode;
+
+    newNode.prev = node;
+    newNode.next = node.next;
+    if (newNode.next != null) {
+      newNode.next.prev = newNode;
+    }
+    node.next = newNode;
+    balanceInsert(newNode);
+  }
+
+  /**
+   * Balance the RB Tree after insert.
+   *
+   * @param node Added node
+   */
+  private void balanceInsert(Node<E> node) {
+    node.color = RED;
+
+    while (node != root && node.parent.isRed()) {
+      if (node.parent == node.parent.parent.left) {
+        Node<E> uncle = node.parent.parent.right;
+        if (uncle != null && uncle.isRed()) {
+          node.parent.color = BLACK;
+          uncle.color = BLACK;
+          node.parent.parent.color = RED;
+          node = node.parent.parent;
+        } else {
+          if (node == node.parent.right) {
+            node = node.parent;
+            rotateLeft(node);
+          }
+          node.parent.color = BLACK;
+          node.parent.parent.color = RED;
+          rotateRight(node.parent.parent);
+        }
+      } else {
+        Node<E> uncle = node.parent.parent.left;
+        if (uncle != null && uncle.isRed()) {
+          node.parent.color = BLACK;
+          uncle.color = BLACK;
+          node.parent.parent.color = RED;
+          node = node.parent.parent;
+        } else {
+          if (node == node.parent.left) {
+            node = node.parent;
+            rotateRight(node);
+          }
+          node.parent.color = BLACK;
+          node.parent.parent.color = RED;
+          rotateLeft(node.parent.parent);
+        }
+      }
+    }
+    root.color = BLACK;
+  }
+
+  private void rotateRight(Node<E> node) {
+    Node<E> pivot = node.left;
+    node.left = pivot.right;
+    if (pivot.right != null) {
+      pivot.right.parent = node;
+    }
+    pivot.parent = node.parent;
+    if (node.parent == null) {
+      root = pivot;
+    } else if (node == node.parent.right) {
+      node.parent.right = pivot;
+    } else {
+      node.parent.left = pivot;
+    }
+    pivot.right = node;
+    node.parent = pivot;
+  }
+
+  private void rotateLeft(Node<E> node) {
+    Node<E> pivot = node.right;
+    node.right = pivot.left;
+    if (pivot.left != null) {
+      pivot.left.parent = node;
+    }
+    pivot.parent = node.parent;
+    if (node.parent == null) {
+      root = pivot;
+    } else if (node == node.parent.left) {
+      node.parent.left = pivot;
+    } else {
+      node.parent.right = pivot;
+    }
+    pivot.left = node;
+    node.parent = pivot;
+  }
+
+  /**
+   * Remove object using a provided comparator, and return the removed entry.
+   *
+   * @param obj Lookup entry
+   * @param cmp User provided Comparator. The comparator should expect that the
+   *            proved obj will always be the first method parameter and any
+   *            stored object will be the second parameter.
+   *
+   * @return The removed entry or null if not found
+   */
+  public E removeAndGet(Object obj, Comparator<?> cmp) {
+    Objects.requireNonNull(obj);
+
+    if (!isEmpty()) {
+      Node<E> node = root;
+      while (node != null) {
+        E[] entries = node.entries;
+        int leftIndex = node.leftIndex;
+        int result = compare(obj, entries[leftIndex], cmp);
+        if (result < 0) {
+          node = node.left;
+        } else if (result == 0) {
+          return removeElementLeft(node);
+        } else {
+          int rightIndex = node.rightIndex;
+          if (leftIndex != rightIndex) {
+            result = compare(obj, entries[rightIndex], cmp);
+          }
+          if (result == 0) {
+            return removeElementRight(node);
+          } else if (result > 0) {
+            node = node.right;
+          } else {
+            int low = leftIndex + 1, high = rightIndex - 1;
+            while (low <= high) {
+              int mid = (low + high) >>> 1;
+              result = compare(obj, entries[mid], cmp);
+              if (result > 0) {
+                low = mid + 1;
+              } else if (result == 0) {
+                return removeElementAt(node, mid);
+              } else {
+                high = mid - 1;
+              }
+            }
+            return null;
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Remove object and return the removed entry.
+   *
+   * @param obj Lookup entry
+   *
+   * @return The removed entry or null if not found
+   */
+  public E removeAndGet(Object obj) {
+    return removeAndGet(obj, comparator);
+  }
+
+  /**
+   * Remove object using a provided comparator.
+   *
+   * @param obj Lookup entry
+   * @param cmp User provided Comparator. The comparator should expect that the
+   *            proved obj will always be the first method parameter and any
+   *            stored object will be the second parameter.
+   *
+   * @return True if found and removed, else false
+   */
+  public boolean remove(Object obj, Comparator<?> cmp) {
+    return removeAndGet(obj, cmp) != null;
+  }
+
+  @Override
+  public boolean remove(Object obj) {
+    return removeAndGet(obj, comparator) != null;
+  }
+
+  private E removeElementLeft(Node<E> node) {
+    modCount++;
+    size--;
+    E entry = node.removeEntryLeft();
+
+    if (node.isEmpty()) {
+      deleteNode(node);
+    } else if (node.prev != null
+               && (Node.NODE_SIZE - 1 - node.prev.rightIndex) >= node.size) {
+      // Remaining entries fit in the prev node, move them and delete this node
+      node.prev.addEntriesRight(node);
+      deleteNode(node);
+    } else if (node.next != null && node.next.leftIndex >= node.size) {
+      // Remaining entries fit in the next node, move them and delete this node
+      node.next.addEntriesLeft(node);
+      deleteNode(node);
+    } else if (node.prev != null && node.prev.size < node.leftIndex) {
+      // Entries in prev node will fit in this node, move them and delete prev
+      node.addEntriesLeft(node.prev);
+      deleteNode(node.prev);
+    }
+
+    return entry;
+  }
+
+  private E removeElementRight(Node<E> node) {
+    modCount++;
+    size--;
+    E entry = node.removeEntryRight();
+
+    if (node.isEmpty()) {
+      deleteNode(node);
+    } else if (node.prev != null
+               && (Node.NODE_SIZE - 1 - node.prev.rightIndex) >= node.size) {
+      // Remaining entries fit in the prev node, move them and delete this node
+      node.prev.addEntriesRight(node);
+      deleteNode(node);
+    } else if (node.next != null && node.next.leftIndex >= node.size) {
+      // Remaining entries fit in the next node, move them and delete this node
+      node.next.addEntriesLeft(node);
+      deleteNode(node);
+    } else if (node.next != null
+               && node.next.size < (Node.NODE_SIZE - 1 - node.rightIndex)) {
+      // Entries in next node will fit in this node, move them and delete next
+      node.addEntriesRight(node.next);
+      deleteNode(node.next);
+    }
+
+    return entry;
+  }
+
+  private E removeElementAt(Node<E> node, int index) {
+    modCount++;
+    size--;
+    E entry = node.removeEntryAt(index);
+
+    if (node.prev != null
+        && (Node.NODE_SIZE - 1 - node.prev.rightIndex) >= node.size) {
+      // Remaining entries fit in the prev node, move them and delete this node
+      node.prev.addEntriesRight(node);
+      deleteNode(node);
+    } else if (node.next != null && (node.next.leftIndex) >= node.size) {
+      // Remaining entries fit in the next node, move them and delete this node
+      node.next.addEntriesLeft(node);
+      deleteNode(node);
+    } else if (node.prev != null && node.prev.size < node.leftIndex) {
+      // Entries in prev node will fit in this node, move them and delete prev
+      node.addEntriesLeft(node.prev);
+      deleteNode(node.prev);
+    } else if (node.next != null
+               && node.next.size < (Node.NODE_SIZE - 1 - node.rightIndex)) {
+      // Entries in next node will fit in this node, move them and delete next
+      node.addEntriesRight(node.next);
+      deleteNode(node.next);
+    }
+
+    return entry;
+  }
+
+  /**
+   * Delete the node and ensure the tree is balanced.
+   *
+   * @param node node to delete
+   */
+  private void deleteNode(final Node<E> node) {
+    if (node.right == null) {
+      if (node.left != null) {
+        attachToParent(node, node.left);
+      } else {
+        attachNullToParent(node);
+      }
+    } else if (node.left == null) {
+      attachToParent(node, node.right);
+    } else {
+      // node.left != null && node.right != null
+      // node.next should replace node in tree
+      // node.next != null guaranteed since node.left != null
+      // node.next.left == null since node.next.prev is node
+      // node.next.right may be null or non-null
+      Node<E> toMoveUp = node.next;
+      if (toMoveUp.right == null) {
+        attachNullToParent(toMoveUp);
+      } else {
+        attachToParent(toMoveUp, toMoveUp.right);
+      }
+      toMoveUp.left = node.left;
+      if (toMoveUp.left != null) {
+        toMoveUp.left.parent = toMoveUp;
+      }
+      toMoveUp.right = node.right;
+      if (toMoveUp.right != null) {
+        toMoveUp.right.parent = toMoveUp;
+      }
+      attachToParentNoBalance(node, toMoveUp);
+      toMoveUp.color = node.color;
+    }
+
+    // Remove node from ordered list of nodes
+    if (node.prev != null) {
+      node.prev.next = node.next;
+    }
+    if (node.next != null) {
+      node.next.prev = node.prev;
+    }
+
+    nodeCount--;
+    cacheAndClear(node);
+  }
+
+  private void attachToParentNoBalance(Node<E> toDelete, Node<E> toConnect) {
+    Node<E> parent = toDelete.parent;
+    toConnect.parent = parent;
+    if (parent == null) {
+      root = toConnect;
+    } else if (toDelete == parent.left) {
+      parent.left = toConnect;
+    } else {
+      parent.right = toConnect;
+    }
+  }
+
+  private void attachToParent(Node<E> toDelete, Node<E> toConnect) {
+    attachToParentNoBalance(toDelete, toConnect);
+    if (toDelete.isBlack()) {
+      balanceDelete(toConnect);
+    }
+  }
+
+  private void attachNullToParent(Node<E> toDelete) {
+    Node<E> parent = toDelete.parent;
+    if (parent == null) {
+      root = null;
+    } else {
+      if (toDelete == parent.left) {
+        parent.left = null;
+      } else {
+        parent.right = null;
+      }
+      if (toDelete.isBlack()) {
+        balanceDelete(parent);
+      }
+    }
+  }
+
+  /**
+   * Balance tree after removing a node.
+   *
+   * @param node Node to balance after deleting another node
+   */
+  private void balanceDelete(Node<E> node) {
+    while (node != root && node.isBlack()) {
+      if (node == node.parent.left) {
+        Node<E> sibling = node.parent.right;
+        if (sibling == null) {
+          node = node.parent;
+          continue;
+        }
+        if (sibling.isRed()) {
+          sibling.color = BLACK;
+          node.parent.color = RED;
+          rotateLeft(node.parent);
+          sibling = node.parent.right;
+          if (sibling == null) {
+            node = node.parent;
+            continue;
+          }
+        }
+        if ((sibling.left == null || !sibling.left.isRed())
+            && (sibling.right == null || !sibling.right.isRed())) {
+          sibling.color = RED;
+          node = node.parent;
+        } else {
+          if (sibling.right == null || !sibling.right.isRed()) {
+            sibling.left.color = BLACK;
+            sibling.color = RED;
+            rotateRight(sibling);
+            sibling = node.parent.right;
+          }
+          sibling.color = node.parent.color;
+          node.parent.color = BLACK;
+          sibling.right.color = BLACK;
+          rotateLeft(node.parent);
+          node = root;
+        }
+      } else {
+        Node<E> sibling = node.parent.left;
+        if (sibling == null) {
+          node = node.parent;
+          continue;
+        }
+        if (sibling.isRed()) {
+          sibling.color = BLACK;
+          node.parent.color = RED;
+          rotateRight(node.parent);
+          sibling = node.parent.left;
+          if (sibling == null) {
+            node = node.parent;
+            continue;
+          }
+        }
+        if ((sibling.left == null || sibling.left.isBlack())
+            && (sibling.right == null || sibling.right.isBlack())) {
+          sibling.color = RED;
+          node = node.parent;
+        } else {
+          if (sibling.left == null || sibling.left.isBlack()) {
+            sibling.right.color = BLACK;
+            sibling.color = RED;
+            rotateLeft(sibling);
+            sibling = node.parent.left;
+          }
+          sibling.color = node.parent.color;
+          node.parent.color = BLACK;
+          sibling.left.color = BLACK;
+          rotateRight(node.parent);
+          node = root;
+        }
+      }
+    }
+    node.color = BLACK;
+  }
+
+  @Override
+  public boolean containsAll(Collection<?> c) {
+    for (Object entry : c) {
+      if (!contains(entry)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean addAll(Collection<? extends E> c) {
+    boolean modified = false;
+    for (E entry : c) {
+      if (add(entry)) {
+        modified = true;
+      }
+    }
+    return modified;
+  }
+
+  @Override
+  public boolean retainAll(Collection<?> c) {
+    boolean modified = false;
+    Iterator<E> it = iterator();
+    while (it.hasNext()) {
+      if (!c.contains(it.next())) {
+        it.remove();
+        modified = true;
+      }
+    }
+    return modified;
+  }
+
+  @Override
+  public boolean removeAll(Collection<?> c) {
+    boolean modified = false;
+    for (Object entry : c) {
+      if (remove(entry)) {
+        modified = true;
+      }
+    }
+    return modified;
+  }
+
+  @Override
+  public void clear() {
+    modCount++;
+    if (!isEmpty()) {
+      size = 0;
+      nodeCount = 0;
+      cacheAndClear(root);
+      root = null;
+    }
+  }
+
+  /**
+   * Returns the current size divided by the capacity of the tree. A value
+   * between 0.0 and 1.0, where 1.0 means that every allocated node in the tree
+   * is completely full.
+   *
+   * An empty set will return 1.0
+   *
+   * @return the fill ratio of the tree
+   */
+  public double fillRatio() {
+    if (nodeCount > 1) {
+      // Count the last node as completely full since it can't be compacted
+      return (size + (Node.NODE_SIZE - root.getRightMostNode().size))
+             / (double) (nodeCount * Node.NODE_SIZE);
+    }
+    return 1.0;
+  }
+
+  /**
+   * Compact all the entries to use the fewest number of nodes in the tree.
+   *
+   * Having a compact tree minimize memory usage, but can cause inserts to get
+   * slower due to new nodes needs to be allocated as there is no space in any
+   * of the existing nodes anymore for entries added in the middle of the set.
+   *
+   * Useful to do to reduce memory consumption and if the tree is know to not
+   * change after compaction or mainly added to at either extreme.
+   *
+   * @param timeout Maximum time to spend compacting the tree set in
+   *                milliseconds.
+   *
+   * @return true if compaction completed, false if aborted
+   */
+  public boolean compact(long timeout) {
+
+    if (!isEmpty()) {
+      long start = Time.monotonicNow();
+      Node<E> node = root.getLeftMostNode();
+      while (node != null) {
+        if (node.prev != null && !node.prev.isFull()) {
+          Node<E> prev = node.prev;
+          int count = Math.min(Node.NODE_SIZE - prev.size, node.size);
+          System.arraycopy(node.entries, node.leftIndex,
+                           prev.entries, prev.rightIndex + 1, count);
+          node.leftIndex += count;
+          node.size -= count;
+          prev.rightIndex += count;
+          prev.size += count;
+        }
+        if (node.isEmpty()) {
+          Node<E> temp = node.next;
+          deleteNode(node);
+          node = temp;
+          continue;
+        } else if (!node.isFull()) {
+          if (node.leftIndex != 0) {
+            System.arraycopy(node.entries, node.leftIndex,
+                             node.entries, 0, node.size);
+            Arrays.fill(node.entries, node.size, node.rightIndex + 1, null);
+            node.leftIndex = 0;
+            node.rightIndex = node.size - 1;
+          }
+        }
+        node = node.next;
+
+        if (Time.monotonicNow() - start > timeout) {
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 05a6830..02d5b81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -261,6 +261,9 @@ message BlockReportContextProto  {
   // The block report lease ID, or 0 if we are sending without a lease to
   // bypass rate-limiting.
   optional uint64 leaseId = 4 [ default = 0 ];
+
+  // True if the reported blocks are sorted by increasing block IDs
+  optional bool sorted = 5 [default = false];
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
index bf29373..2cc1f7d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
@@ -228,7 +228,7 @@ public class TestBlockListAsLongs {
     request.set(null);
     nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
     nn.blockReport(reg, "pool", sbr,
-        new BlockReportContext(1, 0, System.nanoTime(), 0L));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
     BlockReportRequestProto proto = request.get();
     assertNotNull(proto);
     assertTrue(proto.getReports(0).getBlocksList().isEmpty());
@@ -238,7 +238,7 @@ public class TestBlockListAsLongs {
     request.set(null);
     nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
     nn.blockReport(reg, "pool", sbr,
-        new BlockReportContext(1, 0, System.nanoTime(), 0L));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
     proto = request.get();
     assertNotNull(proto);
     assertFalse(proto.getReports(0).getBlocksList().isEmpty());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
index d6213ff..4e7cf3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
@@ -19,19 +19,11 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.junit.Assert;
 import org.junit.Test;
@@ -91,84 +83,4 @@ public class TestBlockInfo {
     Assert.assertThat(added, is(false));
     Assert.assertThat(blockInfos[NUM_BLOCKS/2].getStorageInfo(0), is(storage2));
   }
-
-  @Test
-  public void testBlockListMoveToHead() throws Exception {
-    LOG.info("BlockInfo moveToHead tests...");
-
-    final int MAX_BLOCKS = 10;
-
-    DatanodeStorageInfo dd = DFSTestUtil.createDatanodeStorageInfo("s1", "1.1.1.1");
-    ArrayList<Block> blockList = new ArrayList<Block>(MAX_BLOCKS);
-    ArrayList<BlockInfo> blockInfoList = new ArrayList<BlockInfo>();
-    int headIndex;
-    int curIndex;
-
-    LOG.info("Building block list...");
-    for (int i = 0; i < MAX_BLOCKS; i++) {
-      blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
-      blockInfoList.add(new BlockInfoContiguous(blockList.get(i), (short) 3));
-      dd.addBlock(blockInfoList.get(i));
-
-      // index of the datanode should be 0
-      assertEquals("Find datanode should be 0", 0, blockInfoList.get(i)
-          .findStorageInfo(dd));
-    }
-
-    // list length should be equal to the number of blocks we inserted
-    LOG.info("Checking list length...");
-    assertEquals("Length should be MAX_BLOCK", MAX_BLOCKS, dd.numBlocks());
-    Iterator<BlockInfo> it = dd.getBlockIterator();
-    int len = 0;
-    while (it.hasNext()) {
-      it.next();
-      len++;
-    }
-    assertEquals("There should be MAX_BLOCK blockInfo's", MAX_BLOCKS, len);
-
-    headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd);
-
-    LOG.info("Moving each block to the head of the list...");
-    for (int i = 0; i < MAX_BLOCKS; i++) {
-      curIndex = blockInfoList.get(i).findStorageInfo(dd);
-      headIndex = dd.moveBlockToHead(blockInfoList.get(i), curIndex, headIndex);
-      // the moved element must be at the head of the list
-      assertEquals("Block should be at the head of the list now.",
-          blockInfoList.get(i), dd.getBlockListHeadForTesting());
-    }
-
-    // move head of the list to the head - this should not change the list
-    LOG.info("Moving head to the head...");
-
-    BlockInfo temp = dd.getBlockListHeadForTesting();
-    curIndex = 0;
-    headIndex = 0;
-    dd.moveBlockToHead(temp, curIndex, headIndex);
-    assertEquals(
-        "Moving head to the head of the list shopuld not change the list",
-        temp, dd.getBlockListHeadForTesting());
-
-    // check all elements of the list against the original blockInfoList
-    LOG.info("Checking elements of the list...");
-    temp = dd.getBlockListHeadForTesting();
-    assertNotNull("Head should not be null", temp);
-    int c = MAX_BLOCKS - 1;
-    while (temp != null) {
-      assertEquals("Expected element is not on the list",
-          blockInfoList.get(c--), temp);
-      temp = temp.getNext(0);
-    }
-
-    LOG.info("Moving random blocks to the head of the list...");
-    headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd);
-    Random rand = new Random();
-    for (int i = 0; i < MAX_BLOCKS; i++) {
-      int j = rand.nextInt(MAX_BLOCKS);
-      curIndex = blockInfoList.get(j).findStorageInfo(dd);
-      headIndex = dd.moveBlockToHead(blockInfoList.get(j), curIndex, headIndex);
-      // the moved element must be at the head of the list
-      assertEquals("Block should be at the head of the list now.",
-          blockInfoList.get(j), dd.getBlockListHeadForTesting());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 0e4e167..a970d77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.verify;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -75,6 +76,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -806,7 +808,8 @@ public class TestBlockManager {
     // Make sure it's the first full report
     assertEquals(0, ds.getBlockReportCount());
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        builder.build(), null, false);
+        builder.build(),
+        new BlockReportContext(1, 0, System.nanoTime(), 0, true), false);
     assertEquals(1, ds.getBlockReportCount());
 
     // verify the storage info is correct
@@ -821,6 +824,70 @@ public class TestBlockManager {
         (ds) >= 0);
   }
 
+  @Test
+  public void testFullBR() throws Exception {
+    doReturn(true).when(fsn).isRunning();
+
+    DatanodeDescriptor node = nodes.get(0);
+    DatanodeStorageInfo ds = node.getStorageInfos()[0];
+    node.setAlive(true);
+    DatanodeRegistration nodeReg =  new DatanodeRegistration(node, null, null, "");
+
+    // register new node
+    bm.getDatanodeManager().registerDatanode(nodeReg);
+    bm.getDatanodeManager().addDatanode(node);
+    assertEquals(node, bm.getDatanodeManager().getDatanode(node));
+    assertEquals(0, ds.getBlockReportCount());
+
+    ArrayList<BlockInfo> blocks = new ArrayList<>();
+    for (int id = 24; id > 0; id--) {
+      blocks.add(addBlockToBM(id));
+    }
+
+    // Make sure it's the first full report
+    assertEquals(0, ds.getBlockReportCount());
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
+                     generateReport(blocks),
+                     new BlockReportContext(1, 0, System.nanoTime(), 0, false),
+                     false);
+    assertEquals(1, ds.getBlockReportCount());
+    // verify the storage info is correct
+    for (BlockInfo block : blocks) {
+      assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
+    }
+
+    // Send unsorted report
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
+                     generateReport(blocks),
+                     new BlockReportContext(1, 0, System.nanoTime(), 0, false),
+                     false);
+    assertEquals(2, ds.getBlockReportCount());
+    // verify the storage info is correct
+    for (BlockInfo block : blocks) {
+      assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
+    }
+
+    // Sort list and send a sorted report
+    Collections.sort(blocks);
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
+                     generateReport(blocks),
+                     new BlockReportContext(1, 0, System.nanoTime(), 0, true),
+                     false);
+    assertEquals(3, ds.getBlockReportCount());
+    // verify the storage info is correct
+    for (BlockInfo block : blocks) {
+      assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
+    }
+  }
+
+  private BlockListAsLongs generateReport(List<BlockInfo> blocks) {
+    BlockListAsLongs.Builder builder = BlockListAsLongs.builder();
+    for (BlockInfo block : blocks) {
+      builder.add(new FinalizedReplica(block, null, null));
+    }
+    return builder.build();
+  }
+
   private BlockInfo addBlockToBM(long blkId) {
     Block block = new Block(blkId);
     BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);
@@ -1061,4 +1128,4 @@ public class TestBlockManager {
       cluster.shutdown();
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index c843938..f4e88b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -29,6 +29,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.management.NotCompliantMBeanException;
@@ -566,7 +567,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       }
       Map<Block, BInfo> map = blockMap.get(bpid);
       if (map == null) {
-        map = new HashMap<Block, BInfo>();
+        map = new TreeMap<>();
         blockMap.put(bpid, map);
       }
       
@@ -1206,7 +1207,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override // FsDatasetSpi
   public void addBlockPool(String bpid, Configuration conf) {
-    Map<Block, BInfo> map = new HashMap<Block, BInfo>();
+    Map<Block, BInfo> map = new TreeMap<>();
     blockMap.put(bpid, map);
     storage.addBlockPool(bpid);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
index 27d1cea..61321e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
@@ -19,14 +19,23 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
 import java.util.ArrayList;
-
+import java.util.Collections;
+import java.util.Comparator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -108,12 +117,13 @@ public class TestBlockHasMultipleReplicasOnSameDN {
     StorageBlockReport reports[] =
         new StorageBlockReport[cluster.getStoragesPerDatanode()];
 
-    ArrayList<Replica> blocks = new ArrayList<Replica>();
+    ArrayList<ReplicaInfo> blocks = new ArrayList<>();
 
     for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
       Block localBlock = locatedBlock.getBlock().getLocalBlock();
       blocks.add(new FinalizedReplica(localBlock, null, null));
     }
+    Collections.sort(blocks);
 
     try (FsDatasetSpi.FsVolumeReferences volumes =
       dn.getFSDataset().getFsVolumeReferences()) {
@@ -126,7 +136,7 @@ public class TestBlockHasMultipleReplicasOnSameDN {
 
     // Should not assert!
     cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports,
-        new BlockReportContext(1, 0, System.nanoTime(), 0L));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
 
     // Get the block locations once again.
     locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index 212d2e6..27029a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -82,6 +82,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.timeout;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index 90e000b..66f804c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -190,7 +191,8 @@ public class TestDataNodeVolumeFailure {
             new StorageBlockReport(dnStorage, blockList);
     }
     
-    cluster.getNameNodeRpc().blockReport(dnR, bpid, reports, null);
+    cluster.getNameNodeRpc().blockReport(dnR, bpid, reports,
+        new BlockReportContext(1, 0, System.nanoTime(), 0, true));
 
     // verify number of blocks and files...
     verify(filename, filesize);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
index aadd9b2..badd593 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
@@ -134,7 +134,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
     Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
         any(DatanodeRegistration.class),
         anyString(),
-        captor.capture(),  Mockito.<BlockReportContext>anyObject());
+        captor.capture(), Mockito.<BlockReportContext>anyObject());
 
     verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
   }
@@ -166,7 +166,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
     Mockito.verify(nnSpy, times(1)).blockReport(
         any(DatanodeRegistration.class),
         anyString(),
-        captor.capture(),  Mockito.<BlockReportContext>anyObject());
+        captor.capture(), Mockito.<BlockReportContext>anyObject());
 
     verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
index 67bbefe..791ee20 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
-import org.apache.hadoop.util.Time;
 
 
 /**
@@ -40,7 +39,7 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
       LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
       StorageBlockReport[] singletonReport = { report };
       cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
-          new BlockReportContext(reports.length, i, System.nanoTime(), 0L));
+          new BlockReportContext(reports.length, i, System.nanoTime(), 0L, true));
       i++;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
index fd19ba6..a35fa48 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
@@ -36,6 +36,6 @@ public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase {
                                   StorageBlockReport[] reports) throws IOException {
     LOG.info("Sending combined block reports for " + dnR);
     cluster.getNameNodeRpc().blockReport(dnR, poolId, reports,
-        new BlockReportContext(1, 0, System.nanoTime(), 0L));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
index 00c0f22..f12bc18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.timeout;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index 42cb72f..7fa3803 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -973,7 +973,7 @@ public class NNThroughputBenchmark implements Tool {
           new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
       };
       dataNodeProto.blockReport(dnRegistration, bpid, reports,
-              new BlockReportContext(1, 0, System.nanoTime(), 0L));
+              new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
     }
 
     /**
@@ -1247,7 +1247,7 @@ public class NNThroughputBenchmark implements Tool {
       StorageBlockReport[] report = { new StorageBlockReport(
           dn.storage, dn.getBlockReportList()) };
       dataNodeProto.blockReport(dn.dnRegistration, bpid, report,
-          new BlockReportContext(1, 0, System.nanoTime(), 0L));
+          new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
       long end = Time.now();
       return end-start;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
index 542c616..dfe9cbe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
@@ -303,7 +304,8 @@ public class TestAddStripedBlocks {
       StorageBlockReport[] reports = {new StorageBlockReport(storage,
           bll)};
       cluster.getNameNodeRpc().blockReport(dn.getDNRegistrationForBP(bpId),
-          bpId, reports, null);
+          bpId, reports,
+          new BlockReportContext(1, 0, System.nanoTime(), 0, true));
     }
 
     DatanodeStorageInfo[] locs = lastBlock.getUnderConstructionFeature()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 7fd0c30..ff8f81b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -118,7 +118,7 @@ public class TestDeadDatanode {
         BlockListAsLongs.EMPTY) };
     try {
       dnp.blockReport(reg, poolId, report,
-          new BlockReportContext(1, 0, System.nanoTime(), 0L));
+          new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
       fail("Expected IOException is not thrown");
     } catch (IOException ex) {
       // Expected

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java
new file mode 100644
index 0000000..d554b1b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * Test of TreeSet
+ */
+public class FoldedTreeSetTest {
+
+  private static Random srand;
+
+  public FoldedTreeSetTest() {
+  }
+
+  @BeforeClass
+  public static void setUpClass() {
+    long seed = System.nanoTime();
+    System.out.println("This run uses the random seed " + seed);
+    srand = new Random(seed);
+  }
+
+  @AfterClass
+  public static void tearDownClass() {
+  }
+
+  @Before
+  public void setUp() {
+  }
+
+  @After
+  public void tearDown() {
+  }
+
+  /**
+   * Test of comparator method, of class TreeSet.
+   */
+  @Test
+  public void testComparator() {
+    Comparator<String> comparator = new Comparator<String>() {
+
+      @Override
+      public int compare(String o1, String o2) {
+        return o1.compareTo(o2);
+      }
+    };
+    assertEquals(null, new FoldedTreeSet<>().comparator());
+    assertEquals(comparator, new FoldedTreeSet<>(comparator).comparator());
+
+    FoldedTreeSet<String> set = new FoldedTreeSet<>(comparator);
+    set.add("apa3");
+    set.add("apa2");
+    set.add("apa");
+    set.add("apa5");
+    set.add("apa4");
+    assertEquals(5, set.size());
+    assertEquals("apa", set.get("apa"));
+  }
+
+  /**
+   * Test of first method, of class TreeSet.
+   */
+  @Test
+  public void testFirst() {
+    FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
+    for (int i = 0; i < 256; i++) {
+      tree.add(1024 + i);
+      assertEquals(1024, tree.first().intValue());
+    }
+    for (int i = 1; i < 256; i++) {
+      tree.remove(1024 + i);
+      assertEquals(1024, tree.first().intValue());
+    }
+  }
+
+  /**
+   * Test of last method, of class TreeSet.
+   */
+  @Test
+  public void testLast() {
+    FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
+    for (int i = 0; i < 256; i++) {
+      tree.add(1024 + i);
+      assertEquals(1024 + i, tree.last().intValue());
+    }
+    for (int i = 0; i < 255; i++) {
+      tree.remove(1024 + i);
+      assertEquals(1279, tree.last().intValue());
+    }
+  }
+
+  /**
+   * Test of size method, of class TreeSet.
+   */
+  @Test
+  public void testSize() {
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    String entry = "apa";
+    assertEquals(0, instance.size());
+    instance.add(entry);
+    assertEquals(1, instance.size());
+    instance.remove(entry);
+    assertEquals(0, instance.size());
+  }
+
+  /**
+   * Test of isEmpty method, of class TreeSet.
+   */
+  @Test
+  public void testIsEmpty() {
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    boolean expResult = true;
+    boolean result = instance.isEmpty();
+    assertEquals(expResult, result);
+    instance.add("apa");
+    instance.remove("apa");
+    assertEquals(expResult, result);
+  }
+
+  /**
+   * Test of contains method, of class TreeSet.
+   */
+  @Test
+  public void testContains() {
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    String entry = "apa";
+    assertEquals(false, instance.contains(entry));
+    instance.add(entry);
+    assertEquals(true, instance.contains(entry));
+    assertEquals(false, instance.contains(entry + entry));
+  }
+
+  /**
+   * Test of iterator method, of class TreeSet.
+   */
+  @Test
+  public void testIterator() {
+
+    for (int iter = 0; iter < 10; iter++) {
+      FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+      long[] longs = new long[64723];
+      for (int i = 0; i < longs.length; i++) {
+        Holder val = new Holder(srand.nextLong());
+        while (set.contains(val)) {
+          val = new Holder(srand.nextLong());
+        }
+        longs[i] = val.getId();
+        set.add(val);
+      }
+      assertEquals(longs.length, set.size());
+      Arrays.sort(longs);
+
+      Iterator<Holder> it = set.iterator();
+      for (int i = 0; i < longs.length; i++) {
+        assertTrue(it.hasNext());
+        Holder val = it.next();
+        assertEquals(longs[i], val.getId());
+        // remove randomly to force non linear removes
+        if (srand.nextBoolean()) {
+          it.remove();
+        }
+      }
+    }
+  }
+
+  /**
+   * Test of toArray method, of class TreeSet.
+   */
+  @Test
+  public void testToArray() {
+    FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
+    ArrayList<Integer> list = new ArrayList<>(256);
+    for (int i = 0; i < 256; i++) {
+      list.add(1024 + i);
+    }
+    tree.addAll(list);
+    assertArrayEquals(list.toArray(), tree.toArray());
+  }
+
+  /**
+   * Test of toArray method, of class TreeSet.
+   */
+  @Test
+  public void testToArray_GenericType() {
+    FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
+    ArrayList<Integer> list = new ArrayList<>(256);
+    for (int i = 0; i < 256; i++) {
+      list.add(1024 + i);
+    }
+    tree.addAll(list);
+    assertArrayEquals(list.toArray(new Integer[tree.size()]), tree.toArray(new Integer[tree.size()]));
+    assertArrayEquals(list.toArray(new Integer[tree.size() + 100]), tree.toArray(new Integer[tree.size() + 100]));
+  }
+
+  /**
+   * Test of add method, of class TreeSet.
+   */
+  @Test
+  public void testAdd() {
+    FoldedTreeSet<String> simpleSet = new FoldedTreeSet<>();
+    String entry = "apa";
+    assertTrue(simpleSet.add(entry));
+    assertFalse(simpleSet.add(entry));
+
+    FoldedTreeSet<Integer> intSet = new FoldedTreeSet<>();
+    for (int i = 512; i < 1024; i++) {
+      assertTrue(intSet.add(i));
+    }
+    for (int i = -1024; i < -512; i++) {
+      assertTrue(intSet.add(i));
+    }
+    for (int i = 0; i < 512; i++) {
+      assertTrue(intSet.add(i));
+    }
+    for (int i = -512; i < 0; i++) {
+      assertTrue(intSet.add(i));
+    }
+    assertEquals(2048, intSet.size());
+
+    FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+    long[] longs = new long[23432];
+    for (int i = 0; i < longs.length; i++) {
+      Holder val = new Holder(srand.nextLong());
+      while (set.contains(val)) {
+        val = new Holder(srand.nextLong());
+      }
+      longs[i] = val.getId();
+      assertTrue(set.add(val));
+    }
+    assertEquals(longs.length, set.size());
+    Arrays.sort(longs);
+
+    Iterator<Holder> it = set.iterator();
+    for (int i = 0; i < longs.length; i++) {
+      assertTrue(it.hasNext());
+      Holder val = it.next();
+      assertEquals(longs[i], val.getId());
+    }
+
+    // Specially constructed adds to exercise all code paths
+    FoldedTreeSet<Integer> specialAdds = new FoldedTreeSet<>();
+    // Fill node with even numbers
+    for (int i = 0; i < 128; i += 2) {
+      assertTrue(specialAdds.add(i));
+    }
+    // Remove left and add left
+    assertTrue(specialAdds.remove(0));
+    assertTrue(specialAdds.add(-1));
+    assertTrue(specialAdds.remove(-1));
+    // Add right and shift everything left
+    assertTrue(specialAdds.add(127));
+    assertTrue(specialAdds.remove(127));
+
+    // Empty at both ends
+    assertTrue(specialAdds.add(0));
+    assertTrue(specialAdds.remove(0));
+    assertTrue(specialAdds.remove(126));
+    // Add in the middle left to slide entries left
+    assertTrue(specialAdds.add(11));
+    assertTrue(specialAdds.remove(11));
+    // Add in the middle right to slide entries right
+    assertTrue(specialAdds.add(99));
+    assertTrue(specialAdds.remove(99));
+    // Add existing entry in the middle of a node
+    assertFalse(specialAdds.add(64));
+  }
+
+  @Test
+  public void testAddOrReplace() {
+    FoldedTreeSet<String> simpleSet = new FoldedTreeSet<>();
+    String entry = "apa";
+    assertNull(simpleSet.addOrReplace(entry));
+    assertEquals(entry, simpleSet.addOrReplace(entry));
+
+    FoldedTreeSet<Integer> intSet = new FoldedTreeSet<>();
+    for (int i = 0; i < 1024; i++) {
+      assertNull(intSet.addOrReplace(i));
+    }
+    for (int i = 0; i < 1024; i++) {
+      assertEquals(i, intSet.addOrReplace(i).intValue());
+    }
+  }
+
+  private static class Holder implements Comparable<Holder> {
+
+    private final long id;
+
+    public Holder(long id) {
+      this.id = id;
+    }
+
+    public long getId() {
+      return id;
+    }
+
+    @Override
+    public int compareTo(Holder o) {
+      return id < o.getId() ? -1
+             : id > o.getId() ? 1 : 0;
+    }
+  }
+
+  @Test
+  public void testRemoveWithComparator() {
+    FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+    long[] longs = new long[98327];
+    for (int i = 0; i < longs.length; i++) {
+      Holder val = new Holder(srand.nextLong());
+      while (set.contains(val)) {
+        val = new Holder(srand.nextLong());
+      }
+      longs[i] = val.getId();
+      set.add(val);
+    }
+    assertEquals(longs.length, set.size());
+    Comparator<Object> cmp = new Comparator<Object>() {
+      @Override
+      public int compare(Object o1, Object o2) {
+        long lookup = (long) o1;
+        long stored = ((Holder) o2).getId();
+        return lookup < stored ? -1
+               : lookup > stored ? 1 : 0;
+      }
+    };
+
+    for (long val : longs) {
+      set.remove(val, cmp);
+    }
+    assertEquals(0, set.size());
+    assertTrue(set.isEmpty());
+  }
+
+  @Test
+  public void testGetWithComparator() {
+    FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+    long[] longs = new long[32147];
+    for (int i = 0; i < longs.length; i++) {
+      Holder val = new Holder(srand.nextLong());
+      while (set.contains(val)) {
+        val = new Holder(srand.nextLong());
+      }
+      longs[i] = val.getId();
+      set.add(val);
+    }
+    assertEquals(longs.length, set.size());
+    Comparator<Object> cmp = new Comparator<Object>() {
+      @Override
+      public int compare(Object o1, Object o2) {
+        long lookup = (long) o1;
+        long stored = ((Holder) o2).getId();
+        return lookup < stored ? -1
+               : lookup > stored ? 1 : 0;
+      }
+    };
+
+    for (long val : longs) {
+      assertEquals(val, set.get(val, cmp).getId());
+    }
+  }
+
+  @Test
+  public void testGet() {
+    FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+    long[] longs = new long[43277];
+    for (int i = 0; i < longs.length; i++) {
+      Holder val = new Holder(srand.nextLong());
+      while (set.contains(val)) {
+        val = new Holder(srand.nextLong());
+      }
+      longs[i] = val.getId();
+      set.add(val);
+    }
+    assertEquals(longs.length, set.size());
+
+    for (long val : longs) {
+      assertEquals(val, set.get(new Holder(val)).getId());
+    }
+  }
+
+  /**
+   * Test of remove method, of class TreeSet.
+   */
+  @Test
+  public void testRemove() {
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    assertEquals(false, instance.remove("apa"));
+    instance.add("apa");
+    assertEquals(true, instance.remove("apa"));
+
+    removeLeft();
+    removeRight();
+    removeAt();
+    removeRandom();
+  }
+
+  public void removeLeft() {
+    FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
+    for (int i = 1; i <= 320; i++) {
+      set.add(i);
+    }
+    for (int i = 193; i < 225; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 129; i < 161; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 256; i > 224; i--) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 257; i < 289; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    while (!set.isEmpty()) {
+      assertTrue(set.remove(set.first()));
+    }
+  }
+
+  public void removeRight() {
+    FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
+    for (int i = 1; i <= 320; i++) {
+      set.add(i);
+    }
+    for (int i = 193; i < 225; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 192; i > 160; i--) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 256; i > 224; i--) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 320; i > 288; i--) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    while (!set.isEmpty()) {
+      assertTrue(set.remove(set.last()));
+    }
+  }
+
+  public void removeAt() {
+    FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
+    for (int i = 1; i <= 320; i++) {
+      set.add(i);
+    }
+    for (int i = 193; i < 225; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 160; i < 192; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 225; i < 257; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 288; i < 320; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+  }
+
+  public void removeRandom() {
+    FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
+    int[] integers = new int[2048];
+    for (int i = 0; i < 2048; i++) {
+      int val = srand.nextInt();
+      while (set.contains(val)) {
+        val = srand.nextInt();
+      }
+      integers[i] = val;
+      set.add(val);
+    }
+    assertEquals(2048, set.size());
+
+    for (int val : integers) {
+      assertEquals(true, set.remove(val));
+      assertEquals(false, set.remove(val));
+    }
+    assertEquals(true, set.isEmpty());
+  }
+
+  /**
+   * Test of containsAll method, of class TreeSet.
+   */
+  @Test
+  public void testContainsAll() {
+    Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    assertEquals(false, instance.containsAll(list));
+    instance.addAll(list);
+    assertEquals(true, instance.containsAll(list));
+  }
+
+  /**
+   * Test of addAll method, of class TreeSet.
+   */
+  @Test
+  public void testAddAll() {
+    Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    assertEquals(true, instance.addAll(list));
+    assertEquals(false, instance.addAll(list)); // add same entries again
+  }
+
+  /**
+   * Test of retainAll method, of class TreeSet.
+   */
+  @Test
+  public void testRetainAll() {
+    Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    instance.addAll(list);
+    assertEquals(false, instance.retainAll(list));
+    assertEquals(2, instance.size());
+    Collection<String> list2 = Arrays.asList(new String[]{"apa"});
+    assertEquals(true, instance.retainAll(list2));
+    assertEquals(1, instance.size());
+  }
+
+  /**
+   * Test of removeAll method, of class TreeSet.
+   */
+  @Test
+  public void testRemoveAll() {
+    Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    assertEquals(false, instance.removeAll(list));
+    instance.addAll(list);
+    assertEquals(true, instance.removeAll(list));
+    assertEquals(true, instance.isEmpty());
+  }
+
+  /**
+   * Test of clear method, of class TreeSet.
+   */
+  @Test
+  public void testClear() {
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    instance.clear();
+    assertEquals(true, instance.isEmpty());
+    instance.add("apa");
+    assertEquals(false, instance.isEmpty());
+    instance.clear();
+    assertEquals(true, instance.isEmpty());
+  }
+
+  @Test
+  public void testFillRatio() {
+    FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
+    final int size = 1024;
+    for (int i = 1; i <= size; i++) {
+      set.add(i);
+      assertEquals("Iteration: " + i, 1.0, set.fillRatio(), 0.0);
+    }
+
+    for (int i = 1; i <= size / 2; i++) {
+      set.remove(i * 2);
+      // Need the max since all the removes from the last node doesn't
+      // affect the fill ratio
+      assertEquals("Iteration: " + i,
+                   Math.max((size - i) / (double) size, 0.53125),
+                   set.fillRatio(), 0.0);
+    }
+  }
+
+  @Test
+  public void testCompact() {
+    FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+    long[] longs = new long[24553];
+    for (int i = 0; i < longs.length; i++) {
+      Holder val = new Holder(srand.nextLong());
+      while (set.contains(val)) {
+        val = new Holder(srand.nextLong());
+      }
+      longs[i] = val.getId();
+      set.add(val);
+    }
+    assertEquals(longs.length, set.size());
+
+    long[] longs2 = new long[longs.length];
+    for (int i = 0; i < longs2.length; i++) {
+      Holder val = new Holder(srand.nextLong());
+      while (set.contains(val)) {
+        val = new Holder(srand.nextLong());
+      }
+      longs2[i] = val.getId();
+      set.add(val);
+    }
+    assertEquals(longs.length + longs2.length, set.size());
+
+    // Create fragementation
+    for (long val : longs) {
+      assertTrue(set.remove(new Holder(val)));
+    }
+    assertEquals(longs2.length, set.size());
+
+    assertFalse(set.compact(0));
+    assertTrue(set.compact(Long.MAX_VALUE));
+    assertEquals(longs2.length, set.size());
+    for (long val : longs) {
+      assertFalse(set.remove(new Holder(val)));
+    }
+    for (long val : longs2) {
+      assertEquals(val, set.get(new Holder(val)).getId());
+    }
+  }
+}


[2/2] hadoop git commit: HDFS-9260. Improve the performance and GC friendliness of NameNode startup and full block reports (Staffan Friberg via cmccabe)

Posted by cm...@apache.org.
HDFS-9260. Improve the performance and GC friendliness of NameNode startup and full block reports (Staffan Friberg via cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dd9ebf6e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dd9ebf6e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dd9ebf6e

Branch: refs/heads/trunk
Commit: dd9ebf6eedfd4ff8b3486eae2a446de6b0c7fa8a
Parents: 2da03b4
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Tue Feb 2 11:23:00 2016 -0800
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Tue Feb 2 11:23:00 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |    3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   12 +
 .../DatanodeProtocolClientSideTranslatorPB.java |    5 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |    5 +-
 .../hdfs/server/blockmanagement/BlockInfo.java  |  192 +--
 .../blockmanagement/BlockInfoContiguous.java    |   29 +-
 .../blockmanagement/BlockInfoStriped.java       |   30 +-
 .../server/blockmanagement/BlockManager.java    |  458 +++++--
 .../hdfs/server/blockmanagement/BlocksMap.java  |   66 +-
 .../blockmanagement/DatanodeStorageInfo.java    |  123 +-
 .../hdfs/server/datanode/BPServiceActor.java    |    4 +-
 .../datanode/fsdataset/impl/ReplicaMap.java     |   71 +-
 .../server/protocol/BlockReportContext.java     |   10 +-
 .../hdfs/server/protocol/DatanodeProtocol.java  |    1 -
 .../apache/hadoop/hdfs/util/FoldedTreeSet.java  | 1285 ++++++++++++++++++
 .../src/main/proto/DatanodeProtocol.proto       |    3 +
 .../hdfs/protocol/TestBlockListAsLongs.java     |    4 +-
 .../server/blockmanagement/TestBlockInfo.java   |   88 --
 .../blockmanagement/TestBlockManager.java       |   71 +-
 .../server/datanode/SimulatedFSDataset.java     |    5 +-
 .../TestBlockHasMultipleReplicasOnSameDN.java   |   20 +-
 .../datanode/TestDataNodeHotSwapVolumes.java    |    1 +
 .../datanode/TestDataNodeVolumeFailure.java     |    4 +-
 ...TestDnRespectsBlockReportSplitThreshold.java |    4 +-
 .../TestNNHandlesBlockReportPerStorage.java     |    3 +-
 .../TestNNHandlesCombinedBlockReport.java       |    2 +-
 .../server/datanode/TestTriggerBlockReport.java |    1 +
 .../server/namenode/NNThroughputBenchmark.java  |    4 +-
 .../server/namenode/TestAddStripedBlocks.java   |    4 +-
 .../hdfs/server/namenode/TestDeadDatanode.java  |    2 +-
 .../hadoop/hdfs/util/FoldedTreeSetTest.java     |  644 +++++++++
 31 files changed, 2565 insertions(+), 589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 2eac881..38cb3df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -971,6 +971,9 @@ Release 2.9.0 - UNRELEASED
     HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had
     an error (Rakesh R via cmccabe)
 
+    HDFS-9260. Improve the performance and GC friendliness of NameNode startup
+    and full block reports (Staffan Friberg via cmccabe)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 5217740..76915cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -219,6 +219,18 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;
   public static final String  DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit";
   public static final int     DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4;
+  public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY
+      = "dfs.namenode.storageinfo.defragment.interval.ms";
+  public static final int
+      DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT = 10 * 60 * 1000;
+  public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY
+      = "dfs.namenode.storageinfo.defragment.timeout.ms";
+  public static final int
+      DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT = 4;
+  public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY
+      = "dfs.namenode.storageinfo.defragment.ratio";
+  public static final double
+      DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT = 0.75;
   public static final String  DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY = "dfs.web.authentication.filter";
   /* Phrased as below to avoid javac inlining as a constant, to match the behavior when
      this was AuthFilter.class.getName(). Note that if you change the import for AuthFilter, you

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 81c23e1..79113dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -174,12 +174,13 @@ public class DatanodeProtocolClientSideTranslatorPB implements
 
   @Override
   public DatanodeCommand blockReport(DatanodeRegistration registration,
-      String poolId, StorageBlockReport[] reports, BlockReportContext context)
+      String poolId, StorageBlockReport[] reports,
+      BlockReportContext context)
         throws IOException {
     BlockReportRequestProto.Builder builder = BlockReportRequestProto
         .newBuilder().setRegistration(PBHelper.convert(registration))
         .setBlockPoolId(poolId);
-    
+
     boolean useBlocksBuffer = registration.getNamespaceInfo()
         .isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 4b6baf2..e70cdf0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -824,8 +824,8 @@ public class PBHelper {
 
 
   public static BlockReportContext convert(BlockReportContextProto proto) {
-    return new BlockReportContext(proto.getTotalRpcs(),
-        proto.getCurRpc(), proto.getId(), proto.getLeaseId());
+    return new BlockReportContext(proto.getTotalRpcs(), proto.getCurRpc(),
+        proto.getId(), proto.getLeaseId(), proto.getSorted());
   }
 
   public static BlockReportContextProto convert(BlockReportContext context) {
@@ -834,6 +834,7 @@ public class PBHelper {
         setCurRpc(context.getCurRpc()).
         setId(context.getReportId()).
         setLeaseId(context.getLeaseId()).
+        setSorted(context.isSorted()).
         build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index e9fa123..5da2140 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -18,8 +18,9 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.IOException;
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -55,19 +56,9 @@ public abstract class BlockInfo extends Block
   /** For implementing {@link LightWeightGSet.LinkedElement} interface. */
   private LightWeightGSet.LinkedElement nextLinkedElement;
 
-  /**
-   * This array contains triplets of references. For each i-th storage, the
-   * block belongs to triplets[3*i] is the reference to the
-   * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
-   * references to the previous and the next blocks, respectively, in the list
-   * of blocks belonging to this storage.
-   *
-   * Using previous and next in Object triplets is done instead of a
-   * {@link LinkedList} list to efficiently use memory. With LinkedList the cost
-   * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16
-   * bytes using the triplets.
-   */
-  protected Object[] triplets;
+
+  // Storages this block is replicated on
+  protected DatanodeStorageInfo[] storages;
 
   private BlockUnderConstructionFeature uc;
 
@@ -77,14 +68,14 @@ public abstract class BlockInfo extends Block
    *             in the block group
    */
   public BlockInfo(short size) {
-    this.triplets = new Object[3 * size];
+    this.storages = new DatanodeStorageInfo[size];
     this.bcId = INVALID_INODE_ID;
     this.replication = isStriped() ? 0 : size;
   }
 
   public BlockInfo(Block blk, short size) {
     super(blk);
-    this.triplets = new Object[3*size];
+    this.storages = new DatanodeStorageInfo[size];
     this.bcId = INVALID_INODE_ID;
     this.replication = isStriped() ? 0 : size;
   }
@@ -109,79 +100,52 @@ public abstract class BlockInfo extends Block
     return bcId == INVALID_INODE_ID;
   }
 
-  public DatanodeDescriptor getDatanode(int index) {
-    DatanodeStorageInfo storage = getStorageInfo(index);
-    return storage == null ? null : storage.getDatanodeDescriptor();
-  }
+  public Iterator<DatanodeStorageInfo> getStorageInfos() {
+    return new Iterator<DatanodeStorageInfo>() {
 
-  DatanodeStorageInfo getStorageInfo(int index) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
-    return (DatanodeStorageInfo)triplets[index*3];
-  }
+      private int index = 0;
 
-  BlockInfo getPrevious(int index) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
-    BlockInfo info = (BlockInfo)triplets[index*3+1];
-    assert info == null ||
-        info.getClass().getName().startsWith(BlockInfo.class.getName()) :
-        "BlockInfo is expected at " + index*3;
-    return info;
-  }
+      @Override
+      public boolean hasNext() {
+        while (index < storages.length && storages[index] == null) {
+          index++;
+        }
+        return index < storages.length;
+      }
 
-  BlockInfo getNext(int index) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
-    BlockInfo info = (BlockInfo)triplets[index*3+2];
-    assert info == null || info.getClass().getName().startsWith(
-        BlockInfo.class.getName()) :
-        "BlockInfo is expected at " + index*3;
-    return info;
+      @Override
+      public DatanodeStorageInfo next() {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+        return storages[index++];
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException("Sorry. can't remove.");
+      }
+    };
   }
 
-  void setStorageInfo(int index, DatanodeStorageInfo storage) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
-    triplets[index*3] = storage;
+  public DatanodeDescriptor getDatanode(int index) {
+    DatanodeStorageInfo storage = getStorageInfo(index);
+    return storage == null ? null : storage.getDatanodeDescriptor();
   }
 
-  /**
-   * Return the previous block on the block list for the datanode at
-   * position index. Set the previous block on the list to "to".
-   *
-   * @param index - the datanode index
-   * @param to - block to be set to previous on the list of blocks
-   * @return current previous block on the list of blocks
-   */
-  BlockInfo setPrevious(int index, BlockInfo to) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
-    BlockInfo info = (BlockInfo) triplets[index*3+1];
-    triplets[index*3+1] = to;
-    return info;
+  DatanodeStorageInfo getStorageInfo(int index) {
+    assert this.storages != null : "BlockInfo is not initialized";
+    return storages[index];
   }
 
-  /**
-   * Return the next block on the block list for the datanode at
-   * position index. Set the next block on the list to "to".
-   *
-   * @param index - the datanode index
-   * @param to - block to be set to next on the list of blocks
-   * @return current next block on the list of blocks
-   */
-  BlockInfo setNext(int index, BlockInfo to) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
-    BlockInfo info = (BlockInfo) triplets[index*3+2];
-    triplets[index*3+2] = to;
-    return info;
+  void setStorageInfo(int index, DatanodeStorageInfo storage) {
+    assert this.storages != null : "BlockInfo is not initialized";
+    this.storages[index] = storage;
   }
 
   public int getCapacity() {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
-    return triplets.length / 3;
+    assert this.storages != null : "BlockInfo is not initialized";
+    return storages.length;
   }
 
   /**
@@ -240,80 +204,6 @@ public abstract class BlockInfo extends Block
     return -1;
   }
 
-  /**
-   * Insert this block into the head of the list of blocks
-   * related to the specified DatanodeStorageInfo.
-   * If the head is null then form a new list.
-   * @return current block as the new head of the list.
-   */
-  BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
-    int dnIndex = this.findStorageInfo(storage);
-    assert dnIndex >= 0 : "Data node is not found: current";
-    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
-        "Block is already in the list and cannot be inserted.";
-    this.setPrevious(dnIndex, null);
-    this.setNext(dnIndex, head);
-    if (head != null) {
-      head.setPrevious(head.findStorageInfo(storage), this);
-    }
-    return this;
-  }
-
-  /**
-   * Remove this block from the list of blocks
-   * related to the specified DatanodeStorageInfo.
-   * If this block is the head of the list then return the next block as
-   * the new head.
-   * @return the new head of the list or null if the list becomes
-   * empy after deletion.
-   */
-  BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) {
-    if (head == null) {
-      return null;
-    }
-    int dnIndex = this.findStorageInfo(storage);
-    if (dnIndex < 0) { // this block is not on the data-node list
-      return head;
-    }
-
-    BlockInfo next = this.getNext(dnIndex);
-    BlockInfo prev = this.getPrevious(dnIndex);
-    this.setNext(dnIndex, null);
-    this.setPrevious(dnIndex, null);
-    if (prev != null) {
-      prev.setNext(prev.findStorageInfo(storage), next);
-    }
-    if (next != null) {
-      next.setPrevious(next.findStorageInfo(storage), prev);
-    }
-    if (this == head) { // removing the head
-      head = next;
-    }
-    return head;
-  }
-
-  /**
-   * Remove this block from the list of blocks related to the specified
-   * DatanodeDescriptor. Insert it into the head of the list of blocks.
-   *
-   * @return the new head of the list.
-   */
-  public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage,
-      int curIndex, int headIndex) {
-    if (head == this) {
-      return this;
-    }
-    BlockInfo next = this.setNext(curIndex, head);
-    BlockInfo prev = this.setPrevious(curIndex, null);
-
-    head.setPrevious(headIndex, this);
-    prev.setNext(prev.findStorageInfo(storage), next);
-    if (next != null) {
-      next.setPrevious(next.findStorageInfo(storage), prev);
-    }
-    return this;
-  }
-
   @Override
   public int hashCode() {
     // Super implementation is sufficient

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index 746e298..f729c4f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@ -35,20 +35,20 @@ public class BlockInfoContiguous extends BlockInfo {
   }
 
   /**
-   * Ensure that there is enough  space to include num more triplets.
-   * @return first free triplet index.
+   * Ensure that there is enough  space to include num more storages.
+   * @return first free storage index.
    */
   private int ensureCapacity(int num) {
-    assert this.triplets != null : "BlockInfo is not initialized";
+    assert this.storages != null : "BlockInfo is not initialized";
     int last = numNodes();
-    if (triplets.length >= (last+num)*3) {
+    if (storages.length >= (last+num)) {
       return last;
     }
     /* Not enough space left. Create a new array. Should normally
      * happen only when replication is manually increased by the user. */
-    Object[] old = triplets;
-    triplets = new Object[(last+num)*3];
-    System.arraycopy(old, 0, triplets, 0, last * 3);
+    DatanodeStorageInfo[] old = storages;
+    storages = new DatanodeStorageInfo[(last+num)];
+    System.arraycopy(old, 0, storages, 0, last);
     return last;
   }
 
@@ -57,8 +57,6 @@ public class BlockInfoContiguous extends BlockInfo {
     // find the last null node
     int lastNode = ensureCapacity(1);
     setStorageInfo(lastNode, storage);
-    setNext(lastNode, null);
-    setPrevious(lastNode, null);
     return true;
   }
 
@@ -68,25 +66,18 @@ public class BlockInfoContiguous extends BlockInfo {
     if (dnIndex < 0) { // the node is not found
       return false;
     }
-    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
-        "Block is still in the list and must be removed first.";
     // find the last not null node
     int lastNode = numNodes()-1;
-    // replace current node triplet by the lastNode one
+    // replace current node entry by the lastNode one
     setStorageInfo(dnIndex, getStorageInfo(lastNode));
-    setNext(dnIndex, getNext(lastNode));
-    setPrevious(dnIndex, getPrevious(lastNode));
-    // set the last triplet to null
+    // set the last entry to null
     setStorageInfo(lastNode, null);
-    setNext(lastNode, null);
-    setPrevious(lastNode, null);
     return true;
   }
 
   @Override
   public int numNodes() {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+    assert this.storages != null : "BlockInfo is not initialized";
 
     for (int idx = getCapacity()-1; idx >= 0; idx--) {
       if (getDatanode(idx) != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 20d5858..c6e26ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -26,21 +26,20 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 /**
  * Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
  *
- * We still use triplets to store DatanodeStorageInfo for each block in the
- * block group, as well as the previous/next block in the corresponding
- * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
+ * We still use a storage array to store DatanodeStorageInfo for each block in
+ * the block group. For a (m+k) block group, the first (m+k) storage units
  * are sorted and strictly mapped to the corresponding block.
  *
  * Normally each block belonging to group is stored in only one DataNode.
- * However, it is possible that some block is over-replicated. Thus the triplet
+ * However, it is possible that some block is over-replicated. Thus the storage
  * array's size can be larger than (m+k). Thus currently we use an extra byte
- * array to record the block index for each triplet.
+ * array to record the block index for each entry.
  */
 @InterfaceAudience.Private
 public class BlockInfoStriped extends BlockInfo {
   private final ErasureCodingPolicy ecPolicy;
   /**
-   * Always the same size with triplets. Record the block index for each triplet
+   * Always the same size with storage. Record the block index for each entry
    * TODO: actually this is only necessary for over-replicated block. Thus can
    * be further optimized to save memory usage.
    */
@@ -104,7 +103,7 @@ public class BlockInfoStriped extends BlockInfo {
         return i;
       }
     }
-    // need to expand the triplet size
+    // need to expand the storage size
     ensureCapacity(i + 1, true);
     return i;
   }
@@ -130,8 +129,6 @@ public class BlockInfoStriped extends BlockInfo {
   private void addStorage(DatanodeStorageInfo storage, int index,
       int blockIndex) {
     setStorageInfo(index, storage);
-    setNext(index, null);
-    setPrevious(index, null);
     indices[index] = (byte) blockIndex;
   }
 
@@ -173,26 +170,22 @@ public class BlockInfoStriped extends BlockInfo {
     if (dnIndex < 0) { // the node is not found
       return false;
     }
-    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
-        "Block is still in the list and must be removed first.";
-    // set the triplet to null
+    // set the entry to null
     setStorageInfo(dnIndex, null);
-    setNext(dnIndex, null);
-    setPrevious(dnIndex, null);
     indices[dnIndex] = -1;
     return true;
   }
 
   private void ensureCapacity(int totalSize, boolean keepOld) {
     if (getCapacity() < totalSize) {
-      Object[] old = triplets;
+      DatanodeStorageInfo[] old = storages;
       byte[] oldIndices = indices;
-      triplets = new Object[totalSize * 3];
+      storages = new DatanodeStorageInfo[totalSize];
       indices = new byte[totalSize];
       initIndices();
 
       if (keepOld) {
-        System.arraycopy(old, 0, triplets, 0, old.length);
+        System.arraycopy(old, 0, storages, 0, old.length);
         System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
       }
     }
@@ -214,8 +207,7 @@ public class BlockInfoStriped extends BlockInfo {
 
   @Override
   public int numNodes() {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+    assert this.storages != null : "BlockInfo is not initialized";
     int num = 0;
     for (int idx = getCapacity()-1; idx >= 0; idx--) {
       if (getStorageInfo(idx) != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 587e6b6..25cec8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@@ -93,6 +94,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.hdfs.util.FoldedTreeSet;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 
@@ -106,6 +108,7 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.LightWeightGSet;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.VersionInfo;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -195,7 +198,12 @@ public class BlockManager implements BlockStatsMXBean {
 
   /**replicationRecheckInterval is how often namenode checks for new replication work*/
   private final long replicationRecheckInterval;
-  
+
+  /** How often to check and the limit for the storageinfo efficiency. */
+  private final long storageInfoDefragmentInterval;
+  private final long storageInfoDefragmentTimeout;
+  private final double storageInfoDefragmentRatio;
+
   /**
    * Mapping: Block -> { BlockCollection, datanodes, self ref }
    * Updated only in response to client-sent information.
@@ -204,6 +212,10 @@ public class BlockManager implements BlockStatsMXBean {
 
   /** Replication thread. */
   final Daemon replicationThread = new Daemon(new ReplicationMonitor());
+
+  /** StorageInfoDefragmenter thread. */
+  private final Daemon storageInfoDefragmenterThread =
+      new Daemon(new StorageInfoDefragmenter());
   
   /** Block report thread for handling async reports. */
   private final BlockReportProcessingThread blockReportThread =
@@ -376,7 +388,20 @@ public class BlockManager implements BlockStatsMXBean {
     this.replicationRecheckInterval = 
       conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
                   DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
-    
+
+    this.storageInfoDefragmentInterval =
+      conf.getLong(
+          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY,
+          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT);
+    this.storageInfoDefragmentTimeout =
+      conf.getLong(
+          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY,
+          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT);
+    this.storageInfoDefragmentRatio =
+      conf.getDouble(
+          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY,
+          DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT);
+
     this.encryptDataTransfer =
         conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
             DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
@@ -508,6 +533,8 @@ public class BlockManager implements BlockStatsMXBean {
     datanodeManager.activate(conf);
     this.replicationThread.setName("ReplicationMonitor");
     this.replicationThread.start();
+    storageInfoDefragmenterThread.setName("StorageInfoMonitor");
+    storageInfoDefragmenterThread.start();
     this.blockReportThread.start();
     mxBeanName = MBeans.register("NameNode", "BlockStats", this);
     bmSafeMode.activate(blockTotal);
@@ -517,8 +544,10 @@ public class BlockManager implements BlockStatsMXBean {
     bmSafeMode.close();
     try {
       replicationThread.interrupt();
+      storageInfoDefragmenterThread.interrupt();
       blockReportThread.interrupt();
       replicationThread.join(3000);
+      storageInfoDefragmenterThread.join(3000);
       blockReportThread.join(3000);
     } catch (InterruptedException ie) {
     }
@@ -1165,9 +1194,15 @@ public class BlockManager implements BlockStatsMXBean {
    
   /** Remove the blocks associated to the given datanode. */
   void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
-    final Iterator<BlockInfo> it = node.getBlockIterator();
-    while(it.hasNext()) {
-      removeStoredBlock(it.next(), node);
+    for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+      final Iterator<BlockInfo> it = storage.getBlockIterator();
+      while (it.hasNext()) {
+        BlockInfo block = it.next();
+        // DatanodeStorageInfo must be removed using the iterator to avoid
+        // ConcurrentModificationException in the underlying storage
+        it.remove();
+        removeStoredBlock(block, node);
+      }
     }
     // Remove all pending DN messages referencing this DN.
     pendingDNMessages.removeAllMessagesForDatanode(node);
@@ -1183,6 +1218,9 @@ public class BlockManager implements BlockStatsMXBean {
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     while(it.hasNext()) {
       BlockInfo block = it.next();
+      // DatanodeStorageInfo must be removed using the iterator to avoid
+      // ConcurrentModificationException in the underlying storage
+      it.remove();
       removeStoredBlock(block, node);
       final Block b = getBlockOnStorage(block, storageInfo);
       if (b != null) {
@@ -2033,8 +2071,8 @@ public class BlockManager implements BlockStatsMXBean {
    */
   public boolean processReport(final DatanodeID nodeID,
       final DatanodeStorage storage,
-      final BlockListAsLongs newReport, BlockReportContext context,
-      boolean lastStorageInRpc) throws IOException {
+      final BlockListAsLongs newReport,
+      BlockReportContext context, boolean lastStorageInRpc) throws IOException {
     namesystem.writeLock();
     final long startTime = Time.monotonicNow(); //after acquiring write lock
     final long endTime;
@@ -2079,7 +2117,8 @@ public class BlockManager implements BlockStatsMXBean {
             nodeID.getDatanodeUuid());
         processFirstBlockReport(storageInfo, newReport);
       } else {
-        invalidatedBlocks = processReport(storageInfo, newReport);
+        invalidatedBlocks = processReport(storageInfo, newReport,
+            context != null ? context.isSorted() : false);
       }
       
       storageInfo.receivedBlockReport();
@@ -2149,6 +2188,9 @@ public class BlockManager implements BlockStatsMXBean {
       // TODO: remove this assumption in case we want to put a block on
       // more than one storage on a datanode (and because it's a difficult
       // assumption to really enforce)
+      // DatanodeStorageInfo must be removed using the iterator to avoid
+      // ConcurrentModificationException in the underlying storage
+      iter.remove();
       removeStoredBlock(block, zombie.getDatanodeDescriptor());
       Block b = getBlockOnStorage(block, zombie);
       if (b != null) {
@@ -2238,7 +2280,7 @@ public class BlockManager implements BlockStatsMXBean {
   
   private Collection<Block> processReport(
       final DatanodeStorageInfo storageInfo,
-      final BlockListAsLongs report) throws IOException {
+      final BlockListAsLongs report, final boolean sorted) throws IOException {
     // Normal case:
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
@@ -2248,9 +2290,29 @@ public class BlockManager implements BlockStatsMXBean {
     Collection<Block> toInvalidate = new LinkedList<>();
     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<>();
-    reportDiff(storageInfo, report,
-        toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-   
+
+    Iterable<BlockReportReplica> sortedReport;
+    if (!sorted) {
+      blockLog.warn("BLOCK* processReport: Report from the DataNode ({}) is "
+                    + "unsorted. This will cause overhead on the NameNode "
+                    + "which needs to sort the Full BR. Please update the "
+                    + "DataNode to the same version of Hadoop HDFS as the "
+                    + "NameNode ({}).",
+                    storageInfo.getDatanodeDescriptor().getDatanodeUuid(),
+                    VersionInfo.getVersion());
+      Set<BlockReportReplica> set = new FoldedTreeSet<>();
+      for (BlockReportReplica iblk : report) {
+        set.add(new BlockReportReplica(iblk));
+      }
+      sortedReport = set;
+    } else {
+      sortedReport = report;
+    }
+
+    reportDiffSorted(storageInfo, sortedReport,
+                     toAdd, toRemove, toInvalidate, toCorrupt, toUC);
+
+
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     // Process the blocks on each queue
     for (StatefulBlockInfo b : toUC) { 
@@ -2399,126 +2461,111 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
-  private void reportDiff(DatanodeStorageInfo storageInfo, 
-      BlockListAsLongs newReport,
+  private void reportDiffSorted(DatanodeStorageInfo storageInfo,
+      Iterable<BlockReportReplica> newReport,
       Collection<BlockInfoToAdd> toAdd,     // add to DatanodeDescriptor
       Collection<BlockInfo> toRemove,       // remove from DatanodeDescriptor
       Collection<Block> toInvalidate,       // should be removed from DN
       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
       Collection<StatefulBlockInfo> toUC) { // add to under-construction list
 
-    // place a delimiter in the list which separates blocks 
-    // that have been reported from those that have not
-    Block delimiterBlock = new Block();
-    BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
-        (short) 1);
-    AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
-    assert result == AddBlockResult.ADDED 
-        : "Delimiting block cannot be present in the node";
-    int headIndex = 0; //currently the delimiter is in the head of the list
-    int curIndex;
-
-    if (newReport == null) {
-      newReport = BlockListAsLongs.EMPTY;
-    }
-    // scan the report and process newly reported blocks
-    for (BlockReportReplica iblk : newReport) {
-      ReplicaState iState = iblk.getState();
-      BlockInfo storedBlock = processReportedBlock(storageInfo,
-          iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
-
-      // move block to the head of the list
-      if (storedBlock != null &&
-          (curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) {
-        headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
+    // The blocks must be sorted and the storagenodes blocks must be sorted
+    Iterator<BlockInfo> storageBlocksIterator = storageInfo.getBlockIterator();
+    DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
+    BlockInfo storageBlock = null;
+
+    for (BlockReportReplica replica : newReport) {
+
+      long replicaID = replica.getBlockId();
+      if (BlockIdManager.isStripedBlockID(replicaID)
+          && (!hasNonEcBlockUsingStripedID ||
+              !blocksMap.containsBlock(replica))) {
+        replicaID = BlockIdManager.convertToStripedID(replicaID);
+      }
+
+      ReplicaState reportedState = replica.getState();
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Reported block " + replica
+                  + " on " + dn + " size " + replica.getNumBytes()
+                  + " replicaState = " + reportedState);
+      }
+
+      if (shouldPostponeBlocksFromFuture
+          && isGenStampInFuture(replica)) {
+        queueReportedBlock(storageInfo, replica, reportedState,
+                           QUEUE_REASON_FUTURE_GENSTAMP);
+        continue;
+      }
+
+      if (storageBlock == null && storageBlocksIterator.hasNext()) {
+        storageBlock = storageBlocksIterator.next();
       }
+
+      do {
+        int cmp;
+        if (storageBlock == null ||
+            (cmp = Long.compare(replicaID, storageBlock.getBlockId())) < 0) {
+          // Check if block is available in NN but not yet on this storage
+          BlockInfo nnBlock = blocksMap.getStoredBlock(new Block(replicaID));
+          if (nnBlock != null) {
+            reportDiffSortedInner(storageInfo, replica, reportedState,
+                                  nnBlock, toAdd, toCorrupt, toUC);
+          } else {
+            // Replica not found anywhere so it should be invalidated
+            toInvalidate.add(new Block(replica));
+          }
+          break;
+        } else if (cmp == 0) {
+          // Replica matched current storageblock
+          reportDiffSortedInner(storageInfo, replica, reportedState,
+                                storageBlock, toAdd, toCorrupt, toUC);
+          storageBlock = null;
+        } else {
+          // replica has higher ID than storedBlock
+          // Remove all stored blocks with IDs lower than replica
+          do {
+            toRemove.add(storageBlock);
+            storageBlock = storageBlocksIterator.hasNext()
+                           ? storageBlocksIterator.next() : null;
+          } while (storageBlock != null &&
+                   Long.compare(replicaID, storageBlock.getBlockId()) > 0);
+        }
+      } while (storageBlock != null);
     }
 
-    // collect blocks that have not been reported
-    // all of them are next to the delimiter
-    Iterator<BlockInfo> it =
-        storageInfo.new BlockIterator(delimiter.getNext(0));
-    while (it.hasNext()) {
-      toRemove.add(it.next());
+    // Iterate any remaing blocks that have not been reported and remove them
+    while (storageBlocksIterator.hasNext()) {
+      toRemove.add(storageBlocksIterator.next());
     }
-    storageInfo.removeBlock(delimiter);
   }
 
-  /**
-   * Process a block replica reported by the data-node.
-   * No side effects except adding to the passed-in Collections.
-   * 
-   * <ol>
-   * <li>If the block is not known to the system (not in blocksMap) then the
-   * data-node should be notified to invalidate this block.</li>
-   * <li>If the reported replica is valid that is has the same generation stamp
-   * and length as recorded on the name-node, then the replica location should
-   * be added to the name-node.</li>
-   * <li>If the reported replica is not valid, then it is marked as corrupt,
-   * which triggers replication of the existing valid replicas.
-   * Corrupt replicas are removed from the system when the block
-   * is fully replicated.</li>
-   * <li>If the reported replica is for a block currently marked "under
-   * construction" in the NN, then it should be added to the 
-   * BlockUnderConstructionFeature's list of replicas.</li>
-   * </ol>
-   * 
-   * @param storageInfo DatanodeStorageInfo that sent the report.
-   * @param block reported block replica
-   * @param reportedState reported replica state
-   * @param toAdd add to DatanodeDescriptor
-   * @param toInvalidate missing blocks (not in the blocks map)
-   *        should be removed from the data-node
-   * @param toCorrupt replicas with unexpected length or generation stamp;
-   *        add to corrupt replicas
-   * @param toUC replicas of blocks currently under construction
-   * @return the up-to-date stored block, if it should be kept.
-   *         Otherwise, null.
-   */
-  private BlockInfo processReportedBlock(
+  private void reportDiffSortedInner(
       final DatanodeStorageInfo storageInfo,
-      final Block block, final ReplicaState reportedState,
+      final BlockReportReplica replica, final ReplicaState reportedState,
+      final BlockInfo storedBlock,
       final Collection<BlockInfoToAdd> toAdd,
-      final Collection<Block> toInvalidate, 
       final Collection<BlockToMarkCorrupt> toCorrupt,
       final Collection<StatefulBlockInfo> toUC) {
-    
-    DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
 
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Reported block " + block
-          + " on " + dn + " size " + block.getNumBytes()
-          + " replicaState = " + reportedState);
-    }
-  
-    if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) {
-      queueReportedBlock(storageInfo, block, reportedState,
-          QUEUE_REASON_FUTURE_GENSTAMP);
-      return null;
-    }
-    
-    // find block by blockId
-    BlockInfo storedBlock = getStoredBlock(block);
-    if(storedBlock == null) {
-      // If blocksMap does not contain reported block id,
-      // the replica should be removed from the data-node.
-      toInvalidate.add(new Block(block));
-      return null;
-    }
+    assert replica != null;
+    assert storedBlock != null;
+
+    DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
     BlockUCState ucState = storedBlock.getBlockUCState();
-    
+
     // Block is on the NN
-    if(LOG.isDebugEnabled()) {
+    if (LOG.isDebugEnabled()) {
       LOG.debug("In memory blockUCState = " + ucState);
     }
 
     // Ignore replicas already scheduled to be removed from the DN
-    if(invalidateBlocks.contains(dn, block)) {
-      return storedBlock;
+    if (invalidateBlocks.contains(dn, replica)) {
+      return;
     }
 
-    BlockToMarkCorrupt c = checkReplicaCorrupt(
-        block, reportedState, storedBlock, ucState, dn);
+    BlockToMarkCorrupt c = checkReplicaCorrupt(replica, reportedState,
+                                               storedBlock, ucState, dn);
     if (c != null) {
       if (shouldPostponeBlocksFromFuture) {
         // If the block is an out-of-date generation stamp or state,
@@ -2532,23 +2579,16 @@ public class BlockManager implements BlockStatsMXBean {
       } else {
         toCorrupt.add(c);
       }
-      return storedBlock;
-    }
-
-    if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
-      toUC.add(new StatefulBlockInfo(storedBlock,
-          new Block(block), reportedState));
-      return storedBlock;
-    }
-
-    // Add replica if appropriate. If the replica was previously corrupt
-    // but now okay, it might need to be updated.
-    if (reportedState == ReplicaState.FINALIZED
-        && (storedBlock.findStorageInfo(storageInfo) == -1 ||
-            corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
-      toAdd.add(new BlockInfoToAdd(storedBlock, block));
+    } else if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+      toUC.add(new StatefulBlockInfo(storedBlock, new Block(replica),
+          reportedState));
+    } else if (reportedState == ReplicaState.FINALIZED &&
+               (storedBlock.findStorageInfo(storageInfo) == -1 ||
+                corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
+      // Add replica if appropriate. If the replica was previously corrupt
+      // but now okay, it might need to be updated.
+      toAdd.add(new BlockInfoToAdd(storedBlock, replica));
     }
-    return storedBlock;
   }
 
   /**
@@ -2774,7 +2814,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // just add it
-    AddBlockResult result = storageInfo.addBlock(storedBlock, reported);
+    AddBlockResult result = storageInfo.addBlockInitial(storedBlock, reported);
 
     // Now check for completion of blocks and safe block count
     int numCurrentReplica = countLiveNodes(storedBlock);
@@ -3497,40 +3537,75 @@ public class BlockManager implements BlockStatsMXBean {
       DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, DatanodeDescriptor delHintNode)
       throws IOException {
-    // blockReceived reports a finalized block
-    Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
-    Collection<Block> toInvalidate = new LinkedList<Block>();
-    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
-    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+
     final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
 
-    processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
-        toCorrupt, toUC);
-    // the block is only in one of the to-do lists
-    // if it is in none then data-node already has it
-    assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
-      : "The block should be only in one of the lists.";
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Reported block " + block
+          + " on " + node + " size " + block.getNumBytes()
+          + " replicaState = " + reportedState);
+    }
 
-    for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b, storageInfo);
+    if (shouldPostponeBlocksFromFuture &&
+        isGenStampInFuture(block)) {
+      queueReportedBlock(storageInfo, block, reportedState,
+          QUEUE_REASON_FUTURE_GENSTAMP);
+      return;
     }
-    long numBlocksLogged = 0;
-    for (BlockInfoToAdd b : toAdd) {
-      addStoredBlock(b.stored, b.reported, storageInfo, delHintNode,
-          numBlocksLogged < maxNumBlocksToLog);
-      numBlocksLogged++;
+
+    // find block by blockId
+    BlockInfo storedBlock = getStoredBlock(block);
+    if(storedBlock == null) {
+      // If blocksMap does not contain reported block id,
+      // the replica should be removed from the data-node.
+      blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
+          "belong to any file", block, node, block.getNumBytes());
+      addToInvalidates(new Block(block), node);
+      return;
     }
-    if (numBlocksLogged > maxNumBlocksToLog) {
-      blockLog.debug("BLOCK* addBlock: logged info for {} of {} reported.",
-          maxNumBlocksToLog, numBlocksLogged);
+
+    BlockUCState ucState = storedBlock.getBlockUCState();
+    // Block is on the NN
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("In memory blockUCState = " + ucState);
     }
-    for (Block b : toInvalidate) {
-      blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
-          "belong to any file", b, node, b.getNumBytes());
-      addToInvalidates(b, node);
+
+    // Ignore replicas already scheduled to be removed from the DN
+    if(invalidateBlocks.contains(node, block)) {
+      return;
     }
-    for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, storageInfo, node);
+
+    BlockToMarkCorrupt c = checkReplicaCorrupt(
+        block, reportedState, storedBlock, ucState, node);
+    if (c != null) {
+      if (shouldPostponeBlocksFromFuture) {
+        // If the block is an out-of-date generation stamp or state,
+        // but we're the standby, we shouldn't treat it as corrupt,
+        // but instead just queue it for later processing.
+        // TODO: Pretty confident this should be s/storedBlock/block below,
+        // since we should be postponing the info of the reported block, not
+        // the stored block. See HDFS-6289 for more context.
+        queueReportedBlock(storageInfo, storedBlock, reportedState,
+            QUEUE_REASON_CORRUPT_STATE);
+      } else {
+        markBlockAsCorrupt(c, storageInfo, node);
+      }
+      return;
+    }
+
+    if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+      addStoredBlockUnderConstruction(
+          new StatefulBlockInfo(storedBlock, new Block(block), reportedState),
+          storageInfo);
+      return;
+    }
+
+    // Add replica if appropriate. If the replica was previously corrupt
+    // but now okay, it might need to be updated.
+    if (reportedState == ReplicaState.FINALIZED
+        && (storedBlock.findStorageInfo(storageInfo) == -1 ||
+            corruptReplicas.isReplicaCorrupt(storedBlock, node))) {
+      addStoredBlock(storedBlock, block, storageInfo, delHintNode, true);
     }
   }
 
@@ -4060,6 +4135,87 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
+  /**
+   * Runnable that monitors the fragmentation of the StorageInfo TreeSet and
+   * compacts it when it falls under a certain threshold.
+   */
+  private class StorageInfoDefragmenter implements Runnable {
+
+    @Override
+    public void run() {
+      while (namesystem.isRunning()) {
+        try {
+          // Check storage efficiency only when active NN is out of safe mode.
+          if (isPopulatingReplQueues()) {
+            scanAndCompactStorages();
+          }
+          Thread.sleep(storageInfoDefragmentInterval);
+        } catch (Throwable t) {
+          if (!namesystem.isRunning()) {
+            LOG.info("Stopping thread.");
+            if (!(t instanceof InterruptedException)) {
+              LOG.info("Received an exception while shutting down.", t);
+            }
+            break;
+          } else if (!checkNSRunning && t instanceof InterruptedException) {
+            LOG.info("Stopping for testing.");
+            break;
+          }
+          LOG.error("Thread received Runtime exception.", t);
+          terminate(1, t);
+        }
+      }
+    }
+
+    private void scanAndCompactStorages() throws InterruptedException {
+      ArrayList<String> datanodesAndStorages = new ArrayList<>();
+      for (DatanodeDescriptor node
+          : datanodeManager.getDatanodeListForReport(DatanodeReportType.ALL)) {
+        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+          try {
+            namesystem.readLock();
+            double ratio = storage.treeSetFillRatio();
+            if (ratio < storageInfoDefragmentRatio) {
+              datanodesAndStorages.add(node.getDatanodeUuid());
+              datanodesAndStorages.add(storage.getStorageID());
+            }
+            LOG.info("StorageInfo TreeSet fill ratio {} : {}{}",
+                     storage.getStorageID(), ratio,
+                     (ratio < storageInfoDefragmentRatio)
+                     ? " (queued for defragmentation)" : "");
+          } finally {
+            namesystem.readUnlock();
+          }
+        }
+      }
+      if (!datanodesAndStorages.isEmpty()) {
+        for (int i = 0; i < datanodesAndStorages.size(); i += 2) {
+          namesystem.writeLock();
+          try {
+            DatanodeStorageInfo storage = datanodeManager.
+                getDatanode(datanodesAndStorages.get(i)).
+                getStorageInfo(datanodesAndStorages.get(i + 1));
+            if (storage != null) {
+              boolean aborted =
+                  !storage.treeSetCompact(storageInfoDefragmentTimeout);
+              if (aborted) {
+                // Compaction timed out, reset iterator to continue with
+                // the same storage next iteration.
+                i -= 2;
+              }
+              LOG.info("StorageInfo TreeSet defragmented {} : {}{}",
+                       storage.getStorageID(), storage.treeSetFillRatio(),
+                       aborted ? " (aborted)" : "");
+            }
+          } finally {
+            namesystem.writeUnlock();
+          }
+          // Wait between each iteration
+          Thread.sleep(1000);
+        }
+      }
+    }
+  }
 
   /**
    * Compute block replication and block invalidation work that can be scheduled

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index 47a21fe..71d0598 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import java.util.Collections;
 import java.util.Iterator;
 
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -30,37 +31,6 @@ import org.apache.hadoop.util.LightWeightGSet;
  * the datanodes that store the block.
  */
 class BlocksMap {
-  private static class StorageIterator implements Iterator<DatanodeStorageInfo> {
-    private final BlockInfo blockInfo;
-    private int nextIdx = 0;
-      
-    StorageIterator(BlockInfo blkInfo) {
-      this.blockInfo = blkInfo;
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (blockInfo == null) {
-        return false;
-      }
-      while (nextIdx < blockInfo.getCapacity() &&
-          blockInfo.getDatanode(nextIdx) == null) {
-        // note that for striped blocks there may be null in the triplets
-        nextIdx++;
-      }
-      return nextIdx < blockInfo.getCapacity();
-    }
-
-    @Override
-    public DatanodeStorageInfo next() {
-      return blockInfo.getStorageInfo(nextIdx++);
-    }
-
-    @Override
-    public void remove()  {
-      throw new UnsupportedOperationException("Sorry. can't remove.");
-    }
-  }
 
   /** Constant {@link LightWeightGSet} capacity. */
   private final int capacity;
@@ -132,6 +102,16 @@ class BlocksMap {
     }
   }
 
+  /**
+   * Check if BlocksMap contains the block.
+   *
+   * @param b Block to check
+   * @return true if block is in the map, otherwise false
+   */
+  boolean containsBlock(Block b) {
+    return blocks.contains(b);
+  }
+
   /** Returns the block object if it exists in the map. */
   BlockInfo getStoredBlock(Block b) {
     return blocks.get(b);
@@ -142,7 +122,9 @@ class BlocksMap {
    * returns {@link Iterable} of the storages the block belongs to.
    */
   Iterable<DatanodeStorageInfo> getStorages(Block b) {
-    return getStorages(blocks.get(b));
+    BlockInfo block = blocks.get(b);
+    return block != null ? getStorages(block)
+           : Collections.<DatanodeStorageInfo>emptyList();
   }
 
   /**
@@ -150,12 +132,16 @@ class BlocksMap {
    * returns {@link Iterable} of the storages the block belongs to.
    */
   Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
-    return new Iterable<DatanodeStorageInfo>() {
-      @Override
-      public Iterator<DatanodeStorageInfo> iterator() {
-        return new StorageIterator(storedBlock);
-      }
-    };
+    if (storedBlock == null) {
+      return Collections.emptyList();
+    } else {
+      return new Iterable<DatanodeStorageInfo>() {
+        @Override
+        public Iterator<DatanodeStorageInfo> iterator() {
+          return storedBlock.getStorageInfos();
+        }
+      };
+    }
   }
 
   /** counts number of containing nodes. Better than using iterator. */
@@ -174,7 +160,7 @@ class BlocksMap {
     if (info == null)
       return false;
 
-    // remove block from the data-node list and the node from the block info
+    // remove block from the data-node set and the node from the block info
     boolean removed = removeBlock(node, info);
 
     if (info.hasNoStorage()    // no datanodes left
@@ -185,7 +171,7 @@ class BlocksMap {
   }
 
   /**
-   * Remove block from the list of blocks belonging to the data-node. Remove
+   * Remove block from the set of blocks belonging to the data-node. Remove
    * data-node from the block.
    */
   static boolean removeBlock(DatanodeDescriptor dn, BlockInfo b) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 1f1b24b..c4729ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.FoldedTreeSet;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -85,31 +86,6 @@ public class DatanodeStorageInfo {
     storageType = storage.getStorageType();
   }
 
-  /**
-   * Iterates over the list of blocks belonging to the data-node.
-   */
-  class BlockIterator implements Iterator<BlockInfo> {
-    private BlockInfo current;
-
-    BlockIterator(BlockInfo head) {
-      this.current = head;
-    }
-
-    public boolean hasNext() {
-      return current != null;
-    }
-
-    public BlockInfo next() {
-      BlockInfo res = current;
-      current = current.getNext(current.findStorageInfo(DatanodeStorageInfo.this));
-      return res;
-    }
-
-    public void remove() {
-      throw new UnsupportedOperationException("Sorry. can't remove.");
-    }
-  }
-
   private final DatanodeDescriptor dn;
   private final String storageID;
   private StorageType storageType;
@@ -120,8 +96,7 @@ public class DatanodeStorageInfo {
   private volatile long remaining;
   private long blockPoolUsed;
 
-  private volatile BlockInfo blockList = null;
-  private int numBlocks = 0;
+  private final FoldedTreeSet<BlockInfo> blocks = new FoldedTreeSet<>();
 
   // The ID of the last full block report which updated this storage.
   private long lastBlockReportId = 0;
@@ -207,7 +182,7 @@ public class DatanodeStorageInfo {
   }
 
   boolean areBlocksOnFailedStorage() {
-    return getState() == State.FAILED && numBlocks != 0;
+    return getState() == State.FAILED && !blocks.isEmpty();
   }
 
   @VisibleForTesting
@@ -234,6 +209,36 @@ public class DatanodeStorageInfo {
   long getBlockPoolUsed() {
     return blockPoolUsed;
   }
+  /**
+   * For use during startup. Expects block to be added in sorted order
+   * to enable fast insert in to the DatanodeStorageInfo
+   *
+   * @param b Block to add to DatanodeStorageInfo
+   * @param reportedBlock The reported replica
+   * @return Enum describing if block was added, replaced or already existed
+   */
+  public AddBlockResult addBlockInitial(BlockInfo b, Block reportedBlock) {
+    // First check whether the block belongs to a different storage
+    // on the same DN.
+    AddBlockResult result = AddBlockResult.ADDED;
+    DatanodeStorageInfo otherStorage =
+        b.findStorageInfo(getDatanodeDescriptor());
+
+    if (otherStorage != null) {
+      if (otherStorage != this) {
+        // The block belongs to a different storage. Remove it first.
+        otherStorage.removeBlock(b);
+        result = AddBlockResult.REPLACED;
+      } else {
+        // The block is already associated with this storage.
+        return AddBlockResult.ALREADY_EXIST;
+      }
+    }
+
+    b.addStorage(this, reportedBlock);
+    blocks.addSortedLast(b);
+    return result;
+  }
 
   public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
     // First check whether the block belongs to a different storage
@@ -253,9 +258,8 @@ public class DatanodeStorageInfo {
       }
     }
 
-    // add to the head of the data-node list
     b.addStorage(this, reportedBlock);
-    insertToList(b);
+    blocks.add(b);
     return result;
   }
 
@@ -263,45 +267,17 @@ public class DatanodeStorageInfo {
     return addBlock(b, b);
   }
 
-  public void insertToList(BlockInfo b) {
-    blockList = b.listInsert(blockList, this);
-    numBlocks++;
-  }
-
-  public boolean removeBlock(BlockInfo b) {
-    blockList = b.listRemove(blockList, this);
-    if (b.removeStorage(this)) {
-      numBlocks--;
-      return true;
-    } else {
-      return false;
-    }
+  boolean removeBlock(BlockInfo b) {
+    blocks.remove(b);
+    return b.removeStorage(this);
   }
 
   int numBlocks() {
-    return numBlocks;
+    return blocks.size();
   }
   
   Iterator<BlockInfo> getBlockIterator() {
-    return new BlockIterator(blockList);
-  }
-
-  /**
-   * Move block to the head of the list of blocks belonging to the data-node.
-   * @return the index of the head of the blockList
-   */
-  int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
-    blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex);
-    return curIndex;
-  }
-
-  /**
-   * Used for testing only
-   * @return the head of the blockList
-   */
-  @VisibleForTesting
-  BlockInfo getBlockListHeadForTesting(){
-    return blockList;
+    return blocks.iterator();
   }
 
   void updateState(StorageReport r) {
@@ -349,6 +325,27 @@ public class DatanodeStorageInfo {
         false, capacity, dfsUsed, remaining, blockPoolUsed);
   }
 
+  /**
+   * The fill ratio of the underlying TreeSet holding blocks.
+   *
+   * @return the fill ratio of the tree
+   */
+  public double treeSetFillRatio() {
+    return blocks.fillRatio();
+  }
+
+  /**
+   * Compact the underlying TreeSet holding blocks.
+   *
+   * @param timeout Maximum time to spend compacting the tree set in
+   *                milliseconds.
+   *
+   * @return true if compaction completed, false if aborted
+   */
+  public boolean treeSetCompact(long timeout) {
+    return blocks.compact(timeout);
+  }
+
   static Iterable<StorageType> toStorageTypes(
       final Iterable<DatanodeStorageInfo> infos) {
     return new Iterable<StorageType>() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 1b72961..bc4f2d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -461,7 +461,7 @@ class BPServiceActor implements Runnable {
         // Below split threshold, send all reports in a single message.
         DatanodeCommand cmd = bpNamenode.blockReport(
             bpRegistration, bpos.getBlockPoolId(), reports,
-              new BlockReportContext(1, 0, reportId, fullBrLeaseId));
+              new BlockReportContext(1, 0, reportId, fullBrLeaseId, true));
         numRPCs = 1;
         numReportsSent = reports.length;
         if (cmd != null) {
@@ -474,7 +474,7 @@ class BPServiceActor implements Runnable {
           DatanodeCommand cmd = bpNamenode.blockReport(
               bpRegistration, bpos.getBlockPoolId(), singleReport,
               new BlockReportContext(reports.length, r, reportId,
-                  fullBrLeaseId));
+                  fullBrLeaseId, true));
           numReportsSent++;
           numRPCs++;
           if (cmd != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index 6f0b8a7..34c9f2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -18,13 +18,14 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
-import org.apache.hadoop.util.LightWeightResizableGSet;
+import org.apache.hadoop.hdfs.util.FoldedTreeSet;
 
 /**
  * Maintains the replica map. 
@@ -33,9 +34,20 @@ class ReplicaMap {
   // Object using which this class is synchronized
   private final Object mutex;
   
-  // Map of block pool Id to another map of block Id to ReplicaInfo.
-  private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
-    new HashMap<String, LightWeightResizableGSet<Block, ReplicaInfo>>();
+  // Map of block pool Id to a set of ReplicaInfo.
+  private final Map<String, FoldedTreeSet<ReplicaInfo>> map = new HashMap<>();
+
+  // Special comparator used to compare Long to Block ID in the TreeSet.
+  private static final Comparator<Object> LONG_AND_BLOCK_COMPARATOR
+      = new Comparator<Object>() {
+
+        @Override
+        public int compare(Object o1, Object o2) {
+          long lookup = (long) o1;
+          long stored = ((Block) o2).getBlockId();
+          return lookup > stored ? 1 : lookup < stored ? -1 : 0;
+        }
+      };
 
   ReplicaMap(Object mutex) {
     if (mutex == null) {
@@ -92,11 +104,14 @@ class ReplicaMap {
   ReplicaInfo get(String bpid, long blockId) {
     checkBlockPool(bpid);
     synchronized(mutex) {
-      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
-      return m != null ? m.get(new Block(blockId)) : null;
+      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+      if (set == null) {
+        return null;
+      }
+      return set.get(blockId, LONG_AND_BLOCK_COMPARATOR);
     }
   }
-  
+
   /**
    * Add a replica's meta information into the map 
    * 
@@ -109,13 +124,13 @@ class ReplicaMap {
     checkBlockPool(bpid);
     checkBlock(replicaInfo);
     synchronized(mutex) {
-      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
-      if (m == null) {
+      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+      if (set == null) {
         // Add an entry for block pool if it does not exist already
-        m = new LightWeightResizableGSet<Block, ReplicaInfo>();
-        map.put(bpid, m);
+        set = new FoldedTreeSet<>();
+        map.put(bpid, set);
       }
-      return  m.put(replicaInfo);
+      return set.addOrReplace(replicaInfo);
     }
   }
 
@@ -138,12 +153,13 @@ class ReplicaMap {
     checkBlockPool(bpid);
     checkBlock(block);
     synchronized(mutex) {
-      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
-      if (m != null) {
-        ReplicaInfo replicaInfo = m.get(block);
+      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+      if (set != null) {
+        ReplicaInfo replicaInfo =
+            set.get(block.getBlockId(), LONG_AND_BLOCK_COMPARATOR);
         if (replicaInfo != null &&
             block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
-          return m.remove(block);
+          return set.removeAndGet(replicaInfo);
         }
       }
     }
@@ -160,9 +176,9 @@ class ReplicaMap {
   ReplicaInfo remove(String bpid, long blockId) {
     checkBlockPool(bpid);
     synchronized(mutex) {
-      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
-      if (m != null) {
-        return m.remove(new Block(blockId));
+      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+      if (set != null) {
+        return set.removeAndGet(blockId, LONG_AND_BLOCK_COMPARATOR);
       }
     }
     return null;
@@ -174,10 +190,9 @@ class ReplicaMap {
    * @return the number of replicas in the map
    */
   int size(String bpid) {
-    LightWeightResizableGSet<Block, ReplicaInfo> m = null;
     synchronized(mutex) {
-      m = map.get(bpid);
-      return m != null ? m.size() : 0;
+      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+      return set != null ? set.size() : 0;
     }
   }
   
@@ -192,19 +207,17 @@ class ReplicaMap {
    * @return a collection of the replicas belonging to the block pool
    */
   Collection<ReplicaInfo> replicas(String bpid) {
-    LightWeightResizableGSet<Block, ReplicaInfo> m = null;
-    m = map.get(bpid);
-    return m != null ? m.values() : null;
+    return map.get(bpid);
   }
 
   void initBlockPool(String bpid) {
     checkBlockPool(bpid);
     synchronized(mutex) {
-      LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
-      if (m == null) {
+      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+      if (set == null) {
         // Add an entry for block pool if it does not exist already
-        m = new LightWeightResizableGSet<Block, ReplicaInfo>();
-        map.put(bpid, m);
+        set = new FoldedTreeSet<>();
+        map.put(bpid, set);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
index 5bcd719..94749e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
@@ -52,12 +52,16 @@ public class BlockReportContext {
    */
   private final long leaseId;
 
+  private final boolean sorted;
+
   public BlockReportContext(int totalRpcs, int curRpc,
-                            long reportId, long leaseId) {
+                            long reportId, long leaseId,
+                            boolean sorted) {
     this.totalRpcs = totalRpcs;
     this.curRpc = curRpc;
     this.reportId = reportId;
     this.leaseId = leaseId;
+    this.sorted = sorted;
   }
 
   public int getTotalRpcs() {
@@ -75,4 +79,8 @@ public class BlockReportContext {
   public long getLeaseId() {
     return leaseId;
   }
+
+  public boolean isSorted() {
+    return sorted;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index add4e73..b962855 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -131,7 +131,6 @@ public interface DatanodeProtocol {
    *     Each finalized block is represented as 3 longs. Each under-
    *     construction replica is represented as 4 longs.
    *     This is done instead of Block[] to reduce memory used by block reports.
-   * @param reports report of blocks per storage
    * @param context Context information for this block report.
    *
    * @return - the next command for DN to process.