You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2016/02/02 20:28:54 UTC
[1/2] hadoop git commit: HDFS-9260. Improve the performance and GC
friendliness of NameNode startup and full block reports (Staffan Friberg via
cmccabe)
Repository: hadoop
Updated Branches:
refs/heads/trunk 2da03b48e -> dd9ebf6ee
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java
new file mode 100644
index 0000000..1c6be1d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java
@@ -0,0 +1,1285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import org.apache.hadoop.util.Time;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * A memory efficient implementation of RBTree. Instead of having a Node for
+ * each entry each node contains an array holding 64 entries.
+ *
+ * Based on the Apache Harmony folded TreeMap.
+ *
+ * @param <E> Entry type
+ */
+public class FoldedTreeSet<E> implements SortedSet<E> {
+
+ private static final boolean RED = true;
+ private static final boolean BLACK = false;
+
+ private final Comparator<E> comparator;
+ private Node<E> root;
+ private int size;
+ private int nodeCount;
+ private int modCount;
+ private Node<E> cachedNode;
+
+ /**
+ * Internal tree node that holds a sorted array of entries.
+ *
+ * @param <E> type of the elements
+ */
+ private static class Node<E> {
+
+ private static final int NODE_SIZE = 64;
+
+ // Tree structure
+ private Node<E> parent, left, right;
+ private boolean color;
+ private final E[] entries;
+ private int leftIndex = 0, rightIndex = -1;
+ private int size = 0;
+ // List for fast ordered iteration
+ private Node<E> prev, next;
+
+ @SuppressWarnings("unchecked")
+ public Node() {
+ entries = (E[]) new Object[NODE_SIZE];
+ }
+
+ public boolean isRed() {
+ return color == RED;
+ }
+
+ public boolean isBlack() {
+ return color == BLACK;
+ }
+
+ public Node<E> getLeftMostNode() {
+ Node<E> node = this;
+ while (node.left != null) {
+ node = node.left;
+ }
+ return node;
+ }
+
+ public Node<E> getRightMostNode() {
+ Node<E> node = this;
+ while (node.right != null) {
+ node = node.right;
+ }
+ return node;
+ }
+
+ public void addEntryLeft(E entry) {
+ assert rightIndex < entries.length;
+ assert !isFull();
+
+ if (leftIndex == 0) {
+ rightIndex++;
+ // Shift entries right/up
+ System.arraycopy(entries, 0, entries, 1, size);
+ } else {
+ leftIndex--;
+ }
+ size++;
+ entries[leftIndex] = entry;
+ }
+
+ public void addEntryRight(E entry) {
+ assert !isFull();
+
+ if (rightIndex == NODE_SIZE - 1) {
+ assert leftIndex > 0;
+ // Shift entries left/down
+ System.arraycopy(entries, leftIndex, entries, --leftIndex, size);
+ } else {
+ rightIndex++;
+ }
+ size++;
+ entries[rightIndex] = entry;
+ }
+
+ public void addEntryAt(E entry, int index) {
+ assert !isFull();
+
+ if (leftIndex == 0 || ((rightIndex != Node.NODE_SIZE - 1)
+ && (rightIndex - index <= index - leftIndex))) {
+ rightIndex++;
+ System.arraycopy(entries, index,
+ entries, index + 1, rightIndex - index);
+ entries[index] = entry;
+ } else {
+ int newLeftIndex = leftIndex - 1;
+ System.arraycopy(entries, leftIndex,
+ entries, newLeftIndex, index - leftIndex);
+ leftIndex = newLeftIndex;
+ entries[index - 1] = entry;
+ }
+ size++;
+ }
+
+ public void addEntriesLeft(Node<E> from) {
+ leftIndex -= from.size;
+ size += from.size;
+ System.arraycopy(from.entries, from.leftIndex,
+ entries, leftIndex, from.size);
+ }
+
+ public void addEntriesRight(Node<E> from) {
+ System.arraycopy(from.entries, from.leftIndex,
+ entries, rightIndex + 1, from.size);
+ size += from.size;
+ rightIndex += from.size;
+ }
+
+ public E insertEntrySlideLeft(E entry, int index) {
+ E pushedEntry = entries[0];
+ System.arraycopy(entries, 1, entries, 0, index - 1);
+ entries[index - 1] = entry;
+ return pushedEntry;
+ }
+
+ public E insertEntrySlideRight(E entry, int index) {
+ E movedEntry = entries[rightIndex];
+ System.arraycopy(entries, index, entries, index + 1, rightIndex - index);
+ entries[index] = entry;
+ return movedEntry;
+ }
+
+ public E removeEntryLeft() {
+ assert !isEmpty();
+ E entry = entries[leftIndex];
+ entries[leftIndex] = null;
+ leftIndex++;
+ size--;
+ return entry;
+ }
+
+ public E removeEntryRight() {
+ assert !isEmpty();
+ E entry = entries[rightIndex];
+ entries[rightIndex] = null;
+ rightIndex--;
+ size--;
+ return entry;
+ }
+
+ public E removeEntryAt(int index) {
+ assert !isEmpty();
+
+ E entry = entries[index];
+ int rightSize = rightIndex - index;
+ int leftSize = index - leftIndex;
+ if (rightSize <= leftSize) {
+ System.arraycopy(entries, index + 1, entries, index, rightSize);
+ entries[rightIndex] = null;
+ rightIndex--;
+ } else {
+ System.arraycopy(entries, leftIndex, entries, leftIndex + 1, leftSize);
+ entries[leftIndex] = null;
+ leftIndex++;
+ }
+ size--;
+ return entry;
+ }
+
+ public boolean isFull() {
+ return size == NODE_SIZE;
+ }
+
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ public void clear() {
+ if (leftIndex < rightIndex) {
+ Arrays.fill(entries, leftIndex, rightIndex + 1, null);
+ }
+ size = 0;
+ leftIndex = 0;
+ rightIndex = -1;
+ prev = null;
+ next = null;
+ parent = null;
+ left = null;
+ right = null;
+ color = BLACK;
+ }
+ }
+
+ private static final class TreeSetIterator<E> implements Iterator<E> {
+
+ private final FoldedTreeSet<E> tree;
+ private int iteratorModCount;
+ private Node<E> node;
+ private int index;
+ private E lastEntry;
+ private int lastIndex;
+ private Node<E> lastNode;
+
+ private TreeSetIterator(FoldedTreeSet<E> tree) {
+ this.tree = tree;
+ this.iteratorModCount = tree.modCount;
+ if (!tree.isEmpty()) {
+ this.node = tree.root.getLeftMostNode();
+ this.index = this.node.leftIndex;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ checkForModification();
+ return node != null;
+ }
+
+ @Override
+ public E next() {
+ if (hasNext()) {
+ lastEntry = node.entries[index];
+ lastIndex = index;
+ lastNode = node;
+ if (++index > node.rightIndex) {
+ node = node.next;
+ if (node != null) {
+ index = node.leftIndex;
+ }
+ }
+ return lastEntry;
+ } else {
+ throw new NoSuchElementException("Iterator exhausted");
+ }
+ }
+
+ @Override
+ public void remove() {
+ if (lastEntry == null) {
+ throw new IllegalStateException("No current element");
+ }
+ checkForModification();
+ if (lastNode.size == 1) {
+ // Safe to remove lastNode, the iterator is on the next node
+ tree.deleteNode(lastNode);
+ } else if (lastNode.leftIndex == lastIndex) {
+ // Safe to remove leftmost entry, the iterator is on the next index
+ lastNode.removeEntryLeft();
+ } else if (lastNode.rightIndex == lastIndex) {
+ // Safe to remove the rightmost entry, the iterator is on the next node
+ lastNode.removeEntryRight();
+ } else {
+ // Remove entry in the middle of the array
+ assert node == lastNode;
+ int oldRIndex = lastNode.rightIndex;
+ lastNode.removeEntryAt(lastIndex);
+ if (oldRIndex > lastNode.rightIndex) {
+ // Entries moved to the left in the array so index must be reset
+ index = lastIndex;
+ }
+ }
+ lastEntry = null;
+ iteratorModCount++;
+ tree.modCount++;
+ tree.size--;
+ }
+
+ private void checkForModification() {
+ if (iteratorModCount != tree.modCount) {
+ throw new ConcurrentModificationException("Tree has been modified "
+ + "outside of iterator");
+ }
+ }
+ }
+
+ /**
+ * Create a new TreeSet that uses the natural ordering of objects. The element
+ * type must implement Comparable.
+ */
+ public FoldedTreeSet() {
+ this(null);
+ }
+
+ /**
+ * Create a new TreeSet that orders the elements using the supplied
+ * Comparator.
+ *
+ * @param comparator Comparator able to compare elements of type E
+ */
+ public FoldedTreeSet(Comparator<E> comparator) {
+ this.comparator = comparator;
+ }
+
+ private Node<E> cachedOrNewNode(E entry) {
+ Node<E> node = (cachedNode != null) ? cachedNode : new Node<E>();
+ cachedNode = null;
+ nodeCount++;
+ // Since BlockIDs are always increasing for new blocks it is best to
+ // add values on the left side to enable quicker inserts on the right
+ node.addEntryLeft(entry);
+ return node;
+ }
+
+ private void cacheAndClear(Node<E> node) {
+ if (cachedNode == null) {
+ node.clear();
+ cachedNode = node;
+ }
+ }
+
+ @Override
+ public Comparator<? super E> comparator() {
+ return comparator;
+ }
+
+ @Override
+ public SortedSet<E> subSet(E fromElement, E toElement) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public SortedSet<E> headSet(E toElement) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public SortedSet<E> tailSet(E fromElement) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public E first() {
+ if (!isEmpty()) {
+ Node<E> node = root.getLeftMostNode();
+ return node.entries[node.leftIndex];
+ }
+ return null;
+ }
+
+ @Override
+ public E last() {
+ if (!isEmpty()) {
+ Node<E> node = root.getRightMostNode();
+ return node.entries[node.rightIndex];
+ }
+ return null;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return root == null;
+ }
+
+ /**
+ * Lookup and return a stored object using a user provided comparator.
+ *
+ * @param obj Lookup key
+ * @param cmp User provided Comparator. The comparator should expect that the
+ * proved obj will always be the first method parameter and any
+ * stored object will be the second parameter.
+ *
+ * @return A matching stored object or null if non is found
+ */
+ public E get(Object obj, Comparator<?> cmp) {
+ Objects.requireNonNull(obj);
+
+ Node<E> node = root;
+ while (node != null) {
+ E[] entries = node.entries;
+
+ int leftIndex = node.leftIndex;
+ int result = compare(obj, entries[leftIndex], cmp);
+ if (result < 0) {
+ node = node.left;
+ } else if (result == 0) {
+ return entries[leftIndex];
+ } else {
+ int rightIndex = node.rightIndex;
+ if (leftIndex != rightIndex) {
+ result = compare(obj, entries[rightIndex], cmp);
+ }
+ if (result == 0) {
+ return entries[rightIndex];
+ } else if (result > 0) {
+ node = node.right;
+ } else {
+ int low = leftIndex + 1;
+ int high = rightIndex - 1;
+ while (low <= high) {
+ int mid = (low + high) >>> 1;
+ result = compare(obj, entries[mid], cmp);
+ if (result > 0) {
+ low = mid + 1;
+ } else if (result < 0) {
+ high = mid - 1;
+ } else {
+ return entries[mid];
+ }
+ }
+ return null;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Lookup and return a stored object.
+ *
+ * @param entry Lookup entry
+ *
+ * @return A matching stored object or null if non is found
+ */
+ public E get(E entry) {
+ return get(entry, comparator);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public boolean contains(Object obj) {
+ return get((E) obj) != null;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private static int compare(Object lookup, Object stored, Comparator cmp) {
+ return cmp != null
+ ? cmp.compare(lookup, stored)
+ : ((Comparable<Object>) lookup).compareTo(stored);
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ return new TreeSetIterator<>(this);
+ }
+
+ @Override
+ public Object[] toArray() {
+ Object[] objects = new Object[size];
+ if (!isEmpty()) {
+ int pos = 0;
+ for (Node<E> node = root.getLeftMostNode(); node != null;
+ pos += node.size, node = node.next) {
+ System.arraycopy(node.entries, node.leftIndex, objects, pos, node.size);
+ }
+ }
+ return objects;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> T[] toArray(T[] a) {
+ T[] r = a.length >= size ? a
+ : (T[]) java.lang.reflect.Array
+ .newInstance(a.getClass().getComponentType(), size);
+ if (!isEmpty()) {
+ Node<E> node = root.getLeftMostNode();
+ int pos = 0;
+ while (node != null) {
+ System.arraycopy(node.entries, node.leftIndex, r, pos, node.size);
+ pos += node.size;
+ node = node.next;
+ }
+ if (r.length > pos) {
+ r[pos] = null;
+ }
+ } else if (a.length > 0) {
+ a[0] = null;
+ }
+ return r;
+ }
+
+ /**
+ * Add or replace an entry in the TreeSet.
+ *
+ * @param entry Entry to add or replace/update.
+ *
+ * @return the previous entry, or null if this set did not already contain the
+ * specified entry
+ */
+ public E addOrReplace(E entry) {
+ return add(entry, true);
+ }
+
+ @Override
+ public boolean add(E entry) {
+ return add(entry, false) == null;
+ }
+
+ /**
+ * Internal add method to add a entry to the set.
+ *
+ * @param entry Entry to add
+ * @param replace Should the entry replace an old entry which is equal to the
+ * new entry
+ *
+ * @return null if entry added and didn't exist or the previous value (which
+ * might not have been overwritten depending on the replace parameter)
+ */
+ private E add(E entry, boolean replace) {
+ Objects.requireNonNull(entry);
+
+ // Empty tree
+ if (isEmpty()) {
+ root = cachedOrNewNode(entry);
+ size = 1;
+ modCount++;
+ return null;
+ }
+
+ // Compare right entry first since inserts of comperatively larger entries
+ // is more likely to be inserted. BlockID is always increasing in HDFS.
+ Node<E> node = root;
+ Node<E> prevNode = null;
+ int result = 0;
+ while (node != null) {
+ prevNode = node;
+ E[] entries = node.entries;
+ int rightIndex = node.rightIndex;
+ result = compare(entry, entries[rightIndex], comparator);
+ if (result > 0) {
+ node = node.right;
+ } else if (result == 0) {
+ E prevEntry = entries[rightIndex];
+ if (replace) {
+ entries[rightIndex] = entry;
+ }
+ return prevEntry;
+ } else {
+ int leftIndex = node.leftIndex;
+ if (leftIndex != rightIndex) {
+ result = compare(entry, entries[leftIndex], comparator);
+ }
+ if (result < 0) {
+ node = node.left;
+ } else if (result == 0) {
+ E prevEntry = entries[leftIndex];
+ if (replace) {
+ entries[leftIndex] = entry;
+ }
+ return prevEntry;
+ } else {
+ // Insert in this node
+ int low = leftIndex + 1, high = rightIndex - 1;
+ while (low <= high) {
+ int mid = (low + high) >>> 1;
+ result = compare(entry, entries[mid], comparator);
+ if (result > 0) {
+ low = mid + 1;
+ } else if (result == 0) {
+ E prevEntry = entries[mid];
+ if (replace) {
+ entries[mid] = entry;
+ }
+ return prevEntry;
+ } else {
+ high = mid - 1;
+ }
+ }
+ addElementInNode(node, entry, low);
+ return null;
+ }
+ }
+ }
+
+ assert prevNode != null;
+ size++;
+ modCount++;
+ if (!prevNode.isFull()) {
+ // The previous node still has space
+ if (result < 0) {
+ prevNode.addEntryLeft(entry);
+ } else {
+ prevNode.addEntryRight(entry);
+ }
+ } else if (result < 0) {
+ // The previous node is full, add to adjencent node or a new node
+ if (prevNode.prev != null && !prevNode.prev.isFull()) {
+ prevNode.prev.addEntryRight(entry);
+ } else {
+ attachNodeLeft(prevNode, cachedOrNewNode(entry));
+ }
+ } else if (prevNode.next != null && !prevNode.next.isFull()) {
+ prevNode.next.addEntryLeft(entry);
+ } else {
+ attachNodeRight(prevNode, cachedOrNewNode(entry));
+ }
+ return null;
+ }
+
+ /**
+ * Insert an entry last in the sorted tree. The entry must be the considered
+ * larger than the currently largest entry in the set when doing
+ * current.compareTo(entry), if entry is not the largest entry the method will
+ * fall back on the regular add method.
+ *
+ * @param entry entry to add
+ *
+ * @return True if added, false if already existed in the set
+ */
+ public boolean addSortedLast(E entry) {
+
+ if (isEmpty()) {
+ root = cachedOrNewNode(entry);
+ size = 1;
+ modCount++;
+ return true;
+ } else {
+ Node<E> node = root.getRightMostNode();
+ if (compare(node.entries[node.rightIndex], entry, comparator) < 0) {
+ size++;
+ modCount++;
+ if (!node.isFull()) {
+ node.addEntryRight(entry);
+ } else {
+ attachNodeRight(node, cachedOrNewNode(entry));
+ }
+ return true;
+ }
+ }
+
+ // Fallback on normal add if entry is unsorted
+ return add(entry);
+ }
+
+ private void addElementInNode(Node<E> node, E entry, int index) {
+ size++;
+ modCount++;
+
+ if (!node.isFull()) {
+ node.addEntryAt(entry, index);
+ } else {
+ // Node is full, insert and push old entry
+ Node<E> prev = node.prev;
+ Node<E> next = node.next;
+ if (prev == null) {
+ // First check if we have space in the the next node
+ if (next != null && !next.isFull()) {
+ E movedEntry = node.insertEntrySlideRight(entry, index);
+ next.addEntryLeft(movedEntry);
+ } else {
+ // Since prev is null the left child must be null
+ assert node.left == null;
+ E movedEntry = node.insertEntrySlideLeft(entry, index);
+ Node<E> newNode = cachedOrNewNode(movedEntry);
+ attachNodeLeft(node, newNode);
+ }
+ } else if (!prev.isFull()) {
+ // Prev has space
+ E movedEntry = node.insertEntrySlideLeft(entry, index);
+ prev.addEntryRight(movedEntry);
+ } else if (next == null) {
+ // Since next is null the right child must be null
+ assert node.right == null;
+ E movedEntry = node.insertEntrySlideRight(entry, index);
+ Node<E> newNode = cachedOrNewNode(movedEntry);
+ attachNodeRight(node, newNode);
+ } else if (!next.isFull()) {
+ // Next has space
+ E movedEntry = node.insertEntrySlideRight(entry, index);
+ next.addEntryLeft(movedEntry);
+ } else {
+ // Both prev and next nodes exist and are full
+ E movedEntry = node.insertEntrySlideRight(entry, index);
+ Node<E> newNode = cachedOrNewNode(movedEntry);
+ if (node.right == null) {
+ attachNodeRight(node, newNode);
+ } else {
+ // Since our right node exist,
+ // the left node of our next node must be empty
+ assert next.left == null;
+ attachNodeLeft(next, newNode);
+ }
+ }
+ }
+ }
+
+ private void attachNodeLeft(Node<E> node, Node<E> newNode) {
+ newNode.parent = node;
+ node.left = newNode;
+
+ newNode.next = node;
+ newNode.prev = node.prev;
+ if (newNode.prev != null) {
+ newNode.prev.next = newNode;
+ }
+ node.prev = newNode;
+ balanceInsert(newNode);
+ }
+
+ private void attachNodeRight(Node<E> node, Node<E> newNode) {
+ newNode.parent = node;
+ node.right = newNode;
+
+ newNode.prev = node;
+ newNode.next = node.next;
+ if (newNode.next != null) {
+ newNode.next.prev = newNode;
+ }
+ node.next = newNode;
+ balanceInsert(newNode);
+ }
+
+ /**
+ * Balance the RB Tree after insert.
+ *
+ * @param node Added node
+ */
+ private void balanceInsert(Node<E> node) {
+ node.color = RED;
+
+ while (node != root && node.parent.isRed()) {
+ if (node.parent == node.parent.parent.left) {
+ Node<E> uncle = node.parent.parent.right;
+ if (uncle != null && uncle.isRed()) {
+ node.parent.color = BLACK;
+ uncle.color = BLACK;
+ node.parent.parent.color = RED;
+ node = node.parent.parent;
+ } else {
+ if (node == node.parent.right) {
+ node = node.parent;
+ rotateLeft(node);
+ }
+ node.parent.color = BLACK;
+ node.parent.parent.color = RED;
+ rotateRight(node.parent.parent);
+ }
+ } else {
+ Node<E> uncle = node.parent.parent.left;
+ if (uncle != null && uncle.isRed()) {
+ node.parent.color = BLACK;
+ uncle.color = BLACK;
+ node.parent.parent.color = RED;
+ node = node.parent.parent;
+ } else {
+ if (node == node.parent.left) {
+ node = node.parent;
+ rotateRight(node);
+ }
+ node.parent.color = BLACK;
+ node.parent.parent.color = RED;
+ rotateLeft(node.parent.parent);
+ }
+ }
+ }
+ root.color = BLACK;
+ }
+
+ private void rotateRight(Node<E> node) {
+ Node<E> pivot = node.left;
+ node.left = pivot.right;
+ if (pivot.right != null) {
+ pivot.right.parent = node;
+ }
+ pivot.parent = node.parent;
+ if (node.parent == null) {
+ root = pivot;
+ } else if (node == node.parent.right) {
+ node.parent.right = pivot;
+ } else {
+ node.parent.left = pivot;
+ }
+ pivot.right = node;
+ node.parent = pivot;
+ }
+
+ private void rotateLeft(Node<E> node) {
+ Node<E> pivot = node.right;
+ node.right = pivot.left;
+ if (pivot.left != null) {
+ pivot.left.parent = node;
+ }
+ pivot.parent = node.parent;
+ if (node.parent == null) {
+ root = pivot;
+ } else if (node == node.parent.left) {
+ node.parent.left = pivot;
+ } else {
+ node.parent.right = pivot;
+ }
+ pivot.left = node;
+ node.parent = pivot;
+ }
+
+ /**
+ * Remove object using a provided comparator, and return the removed entry.
+ *
+ * @param obj Lookup entry
+ * @param cmp User provided Comparator. The comparator should expect that the
+ * proved obj will always be the first method parameter and any
+ * stored object will be the second parameter.
+ *
+ * @return The removed entry or null if not found
+ */
+ public E removeAndGet(Object obj, Comparator<?> cmp) {
+ Objects.requireNonNull(obj);
+
+ if (!isEmpty()) {
+ Node<E> node = root;
+ while (node != null) {
+ E[] entries = node.entries;
+ int leftIndex = node.leftIndex;
+ int result = compare(obj, entries[leftIndex], cmp);
+ if (result < 0) {
+ node = node.left;
+ } else if (result == 0) {
+ return removeElementLeft(node);
+ } else {
+ int rightIndex = node.rightIndex;
+ if (leftIndex != rightIndex) {
+ result = compare(obj, entries[rightIndex], cmp);
+ }
+ if (result == 0) {
+ return removeElementRight(node);
+ } else if (result > 0) {
+ node = node.right;
+ } else {
+ int low = leftIndex + 1, high = rightIndex - 1;
+ while (low <= high) {
+ int mid = (low + high) >>> 1;
+ result = compare(obj, entries[mid], cmp);
+ if (result > 0) {
+ low = mid + 1;
+ } else if (result == 0) {
+ return removeElementAt(node, mid);
+ } else {
+ high = mid - 1;
+ }
+ }
+ return null;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Remove object and return the removed entry.
+ *
+ * @param obj Lookup entry
+ *
+ * @return The removed entry or null if not found
+ */
+ public E removeAndGet(Object obj) {
+ return removeAndGet(obj, comparator);
+ }
+
+ /**
+ * Remove object using a provided comparator.
+ *
+ * @param obj Lookup entry
+ * @param cmp User provided Comparator. The comparator should expect that the
+ * proved obj will always be the first method parameter and any
+ * stored object will be the second parameter.
+ *
+ * @return True if found and removed, else false
+ */
+ public boolean remove(Object obj, Comparator<?> cmp) {
+ return removeAndGet(obj, cmp) != null;
+ }
+
+ @Override
+ public boolean remove(Object obj) {
+ return removeAndGet(obj, comparator) != null;
+ }
+
+ private E removeElementLeft(Node<E> node) {
+ modCount++;
+ size--;
+ E entry = node.removeEntryLeft();
+
+ if (node.isEmpty()) {
+ deleteNode(node);
+ } else if (node.prev != null
+ && (Node.NODE_SIZE - 1 - node.prev.rightIndex) >= node.size) {
+ // Remaining entries fit in the prev node, move them and delete this node
+ node.prev.addEntriesRight(node);
+ deleteNode(node);
+ } else if (node.next != null && node.next.leftIndex >= node.size) {
+ // Remaining entries fit in the next node, move them and delete this node
+ node.next.addEntriesLeft(node);
+ deleteNode(node);
+ } else if (node.prev != null && node.prev.size < node.leftIndex) {
+ // Entries in prev node will fit in this node, move them and delete prev
+ node.addEntriesLeft(node.prev);
+ deleteNode(node.prev);
+ }
+
+ return entry;
+ }
+
+ private E removeElementRight(Node<E> node) {
+ modCount++;
+ size--;
+ E entry = node.removeEntryRight();
+
+ if (node.isEmpty()) {
+ deleteNode(node);
+ } else if (node.prev != null
+ && (Node.NODE_SIZE - 1 - node.prev.rightIndex) >= node.size) {
+ // Remaining entries fit in the prev node, move them and delete this node
+ node.prev.addEntriesRight(node);
+ deleteNode(node);
+ } else if (node.next != null && node.next.leftIndex >= node.size) {
+ // Remaining entries fit in the next node, move them and delete this node
+ node.next.addEntriesLeft(node);
+ deleteNode(node);
+ } else if (node.next != null
+ && node.next.size < (Node.NODE_SIZE - 1 - node.rightIndex)) {
+ // Entries in next node will fit in this node, move them and delete next
+ node.addEntriesRight(node.next);
+ deleteNode(node.next);
+ }
+
+ return entry;
+ }
+
+ private E removeElementAt(Node<E> node, int index) {
+ modCount++;
+ size--;
+ E entry = node.removeEntryAt(index);
+
+ if (node.prev != null
+ && (Node.NODE_SIZE - 1 - node.prev.rightIndex) >= node.size) {
+ // Remaining entries fit in the prev node, move them and delete this node
+ node.prev.addEntriesRight(node);
+ deleteNode(node);
+ } else if (node.next != null && (node.next.leftIndex) >= node.size) {
+ // Remaining entries fit in the next node, move them and delete this node
+ node.next.addEntriesLeft(node);
+ deleteNode(node);
+ } else if (node.prev != null && node.prev.size < node.leftIndex) {
+ // Entries in prev node will fit in this node, move them and delete prev
+ node.addEntriesLeft(node.prev);
+ deleteNode(node.prev);
+ } else if (node.next != null
+ && node.next.size < (Node.NODE_SIZE - 1 - node.rightIndex)) {
+ // Entries in next node will fit in this node, move them and delete next
+ node.addEntriesRight(node.next);
+ deleteNode(node.next);
+ }
+
+ return entry;
+ }
+
+ /**
+ * Delete the node and ensure the tree is balanced.
+ *
+ * @param node node to delete
+ */
+ private void deleteNode(final Node<E> node) {
+ if (node.right == null) {
+ if (node.left != null) {
+ attachToParent(node, node.left);
+ } else {
+ attachNullToParent(node);
+ }
+ } else if (node.left == null) {
+ attachToParent(node, node.right);
+ } else {
+ // node.left != null && node.right != null
+ // node.next should replace node in tree
+ // node.next != null guaranteed since node.left != null
+ // node.next.left == null since node.next.prev is node
+ // node.next.right may be null or non-null
+ Node<E> toMoveUp = node.next;
+ if (toMoveUp.right == null) {
+ attachNullToParent(toMoveUp);
+ } else {
+ attachToParent(toMoveUp, toMoveUp.right);
+ }
+ toMoveUp.left = node.left;
+ if (toMoveUp.left != null) {
+ toMoveUp.left.parent = toMoveUp;
+ }
+ toMoveUp.right = node.right;
+ if (toMoveUp.right != null) {
+ toMoveUp.right.parent = toMoveUp;
+ }
+ attachToParentNoBalance(node, toMoveUp);
+ toMoveUp.color = node.color;
+ }
+
+ // Remove node from ordered list of nodes
+ if (node.prev != null) {
+ node.prev.next = node.next;
+ }
+ if (node.next != null) {
+ node.next.prev = node.prev;
+ }
+
+ nodeCount--;
+ cacheAndClear(node);
+ }
+
+ private void attachToParentNoBalance(Node<E> toDelete, Node<E> toConnect) {
+ Node<E> parent = toDelete.parent;
+ toConnect.parent = parent;
+ if (parent == null) {
+ root = toConnect;
+ } else if (toDelete == parent.left) {
+ parent.left = toConnect;
+ } else {
+ parent.right = toConnect;
+ }
+ }
+
+ private void attachToParent(Node<E> toDelete, Node<E> toConnect) {
+ attachToParentNoBalance(toDelete, toConnect);
+ if (toDelete.isBlack()) {
+ balanceDelete(toConnect);
+ }
+ }
+
+ private void attachNullToParent(Node<E> toDelete) {
+ Node<E> parent = toDelete.parent;
+ if (parent == null) {
+ root = null;
+ } else {
+ if (toDelete == parent.left) {
+ parent.left = null;
+ } else {
+ parent.right = null;
+ }
+ if (toDelete.isBlack()) {
+ balanceDelete(parent);
+ }
+ }
+ }
+
+ /**
+ * Balance tree after removing a node.
+ *
+ * @param node Node to balance after deleting another node
+ */
+ private void balanceDelete(Node<E> node) {
+ while (node != root && node.isBlack()) {
+ if (node == node.parent.left) {
+ Node<E> sibling = node.parent.right;
+ if (sibling == null) {
+ node = node.parent;
+ continue;
+ }
+ if (sibling.isRed()) {
+ sibling.color = BLACK;
+ node.parent.color = RED;
+ rotateLeft(node.parent);
+ sibling = node.parent.right;
+ if (sibling == null) {
+ node = node.parent;
+ continue;
+ }
+ }
+ if ((sibling.left == null || !sibling.left.isRed())
+ && (sibling.right == null || !sibling.right.isRed())) {
+ sibling.color = RED;
+ node = node.parent;
+ } else {
+ if (sibling.right == null || !sibling.right.isRed()) {
+ sibling.left.color = BLACK;
+ sibling.color = RED;
+ rotateRight(sibling);
+ sibling = node.parent.right;
+ }
+ sibling.color = node.parent.color;
+ node.parent.color = BLACK;
+ sibling.right.color = BLACK;
+ rotateLeft(node.parent);
+ node = root;
+ }
+ } else {
+ Node<E> sibling = node.parent.left;
+ if (sibling == null) {
+ node = node.parent;
+ continue;
+ }
+ if (sibling.isRed()) {
+ sibling.color = BLACK;
+ node.parent.color = RED;
+ rotateRight(node.parent);
+ sibling = node.parent.left;
+ if (sibling == null) {
+ node = node.parent;
+ continue;
+ }
+ }
+ if ((sibling.left == null || sibling.left.isBlack())
+ && (sibling.right == null || sibling.right.isBlack())) {
+ sibling.color = RED;
+ node = node.parent;
+ } else {
+ if (sibling.left == null || sibling.left.isBlack()) {
+ sibling.right.color = BLACK;
+ sibling.color = RED;
+ rotateLeft(sibling);
+ sibling = node.parent.left;
+ }
+ sibling.color = node.parent.color;
+ node.parent.color = BLACK;
+ sibling.left.color = BLACK;
+ rotateRight(node.parent);
+ node = root;
+ }
+ }
+ }
+ node.color = BLACK;
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ for (Object entry : c) {
+ if (!contains(entry)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends E> c) {
+ boolean modified = false;
+ for (E entry : c) {
+ if (add(entry)) {
+ modified = true;
+ }
+ }
+ return modified;
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ boolean modified = false;
+ Iterator<E> it = iterator();
+ while (it.hasNext()) {
+ if (!c.contains(it.next())) {
+ it.remove();
+ modified = true;
+ }
+ }
+ return modified;
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ boolean modified = false;
+ for (Object entry : c) {
+ if (remove(entry)) {
+ modified = true;
+ }
+ }
+ return modified;
+ }
+
+ @Override
+ public void clear() {
+ modCount++;
+ if (!isEmpty()) {
+ size = 0;
+ nodeCount = 0;
+ cacheAndClear(root);
+ root = null;
+ }
+ }
+
+ /**
+ * Returns the current size divided by the capacity of the tree. A value
+ * between 0.0 and 1.0, where 1.0 means that every allocated node in the tree
+ * is completely full.
+ *
+ * An empty set will return 1.0
+ *
+ * @return the fill ratio of the tree
+ */
+ public double fillRatio() {
+ if (nodeCount > 1) {
+ // Count the last node as completely full since it can't be compacted
+ return (size + (Node.NODE_SIZE - root.getRightMostNode().size))
+ / (double) (nodeCount * Node.NODE_SIZE);
+ }
+ return 1.0;
+ }
+
+ /**
+ * Compact all the entries to use the fewest number of nodes in the tree.
+ *
+ * Having a compact tree minimize memory usage, but can cause inserts to get
+ * slower due to new nodes needs to be allocated as there is no space in any
+ * of the existing nodes anymore for entries added in the middle of the set.
+ *
+ * Useful to do to reduce memory consumption and if the tree is know to not
+ * change after compaction or mainly added to at either extreme.
+ *
+ * @param timeout Maximum time to spend compacting the tree set in
+ * milliseconds.
+ *
+ * @return true if compaction completed, false if aborted
+ */
+ public boolean compact(long timeout) {
+
+ if (!isEmpty()) {
+ long start = Time.monotonicNow();
+ Node<E> node = root.getLeftMostNode();
+ while (node != null) {
+ if (node.prev != null && !node.prev.isFull()) {
+ Node<E> prev = node.prev;
+ int count = Math.min(Node.NODE_SIZE - prev.size, node.size);
+ System.arraycopy(node.entries, node.leftIndex,
+ prev.entries, prev.rightIndex + 1, count);
+ node.leftIndex += count;
+ node.size -= count;
+ prev.rightIndex += count;
+ prev.size += count;
+ }
+ if (node.isEmpty()) {
+ Node<E> temp = node.next;
+ deleteNode(node);
+ node = temp;
+ continue;
+ } else if (!node.isFull()) {
+ if (node.leftIndex != 0) {
+ System.arraycopy(node.entries, node.leftIndex,
+ node.entries, 0, node.size);
+ Arrays.fill(node.entries, node.size, node.rightIndex + 1, null);
+ node.leftIndex = 0;
+ node.rightIndex = node.size - 1;
+ }
+ }
+ node = node.next;
+
+ if (Time.monotonicNow() - start > timeout) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 05a6830..02d5b81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -261,6 +261,9 @@ message BlockReportContextProto {
// The block report lease ID, or 0 if we are sending without a lease to
// bypass rate-limiting.
optional uint64 leaseId = 4 [ default = 0 ];
+
+ // True if the reported blocks are sorted by increasing block IDs
+ optional bool sorted = 5 [default = false];
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
index bf29373..2cc1f7d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
@@ -228,7 +228,7 @@ public class TestBlockListAsLongs {
request.set(null);
nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
nn.blockReport(reg, "pool", sbr,
- new BlockReportContext(1, 0, System.nanoTime(), 0L));
+ new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
BlockReportRequestProto proto = request.get();
assertNotNull(proto);
assertTrue(proto.getReports(0).getBlocksList().isEmpty());
@@ -238,7 +238,7 @@ public class TestBlockListAsLongs {
request.set(null);
nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
nn.blockReport(reg, "pool", sbr,
- new BlockReportContext(1, 0, System.nanoTime(), 0L));
+ new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
proto = request.get();
assertNotNull(proto);
assertFalse(proto.getReports(0).getBlocksList().isEmpty());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
index d6213ff..4e7cf3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
@@ -19,19 +19,11 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID;
import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.junit.Assert;
import org.junit.Test;
@@ -91,84 +83,4 @@ public class TestBlockInfo {
Assert.assertThat(added, is(false));
Assert.assertThat(blockInfos[NUM_BLOCKS/2].getStorageInfo(0), is(storage2));
}
-
- @Test
- public void testBlockListMoveToHead() throws Exception {
- LOG.info("BlockInfo moveToHead tests...");
-
- final int MAX_BLOCKS = 10;
-
- DatanodeStorageInfo dd = DFSTestUtil.createDatanodeStorageInfo("s1", "1.1.1.1");
- ArrayList<Block> blockList = new ArrayList<Block>(MAX_BLOCKS);
- ArrayList<BlockInfo> blockInfoList = new ArrayList<BlockInfo>();
- int headIndex;
- int curIndex;
-
- LOG.info("Building block list...");
- for (int i = 0; i < MAX_BLOCKS; i++) {
- blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
- blockInfoList.add(new BlockInfoContiguous(blockList.get(i), (short) 3));
- dd.addBlock(blockInfoList.get(i));
-
- // index of the datanode should be 0
- assertEquals("Find datanode should be 0", 0, blockInfoList.get(i)
- .findStorageInfo(dd));
- }
-
- // list length should be equal to the number of blocks we inserted
- LOG.info("Checking list length...");
- assertEquals("Length should be MAX_BLOCK", MAX_BLOCKS, dd.numBlocks());
- Iterator<BlockInfo> it = dd.getBlockIterator();
- int len = 0;
- while (it.hasNext()) {
- it.next();
- len++;
- }
- assertEquals("There should be MAX_BLOCK blockInfo's", MAX_BLOCKS, len);
-
- headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd);
-
- LOG.info("Moving each block to the head of the list...");
- for (int i = 0; i < MAX_BLOCKS; i++) {
- curIndex = blockInfoList.get(i).findStorageInfo(dd);
- headIndex = dd.moveBlockToHead(blockInfoList.get(i), curIndex, headIndex);
- // the moved element must be at the head of the list
- assertEquals("Block should be at the head of the list now.",
- blockInfoList.get(i), dd.getBlockListHeadForTesting());
- }
-
- // move head of the list to the head - this should not change the list
- LOG.info("Moving head to the head...");
-
- BlockInfo temp = dd.getBlockListHeadForTesting();
- curIndex = 0;
- headIndex = 0;
- dd.moveBlockToHead(temp, curIndex, headIndex);
- assertEquals(
- "Moving head to the head of the list shopuld not change the list",
- temp, dd.getBlockListHeadForTesting());
-
- // check all elements of the list against the original blockInfoList
- LOG.info("Checking elements of the list...");
- temp = dd.getBlockListHeadForTesting();
- assertNotNull("Head should not be null", temp);
- int c = MAX_BLOCKS - 1;
- while (temp != null) {
- assertEquals("Expected element is not on the list",
- blockInfoList.get(c--), temp);
- temp = temp.getNext(0);
- }
-
- LOG.info("Moving random blocks to the head of the list...");
- headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd);
- Random rand = new Random();
- for (int i = 0; i < MAX_BLOCKS; i++) {
- int j = rand.nextInt(MAX_BLOCKS);
- curIndex = blockInfoList.get(j).findStorageInfo(dd);
- headIndex = dd.moveBlockToHead(blockInfoList.get(j), curIndex, headIndex);
- // the moved element must be at the head of the list
- assertEquals("Block should be at the head of the list now.",
- blockInfoList.get(j), dd.getBlockListHeadForTesting());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 0e4e167..a970d77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
@@ -75,6 +76,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -806,7 +808,8 @@ public class TestBlockManager {
// Make sure it's the first full report
assertEquals(0, ds.getBlockReportCount());
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
- builder.build(), null, false);
+ builder.build(),
+ new BlockReportContext(1, 0, System.nanoTime(), 0, true), false);
assertEquals(1, ds.getBlockReportCount());
// verify the storage info is correct
@@ -821,6 +824,70 @@ public class TestBlockManager {
(ds) >= 0);
}
+ @Test
+ public void testFullBR() throws Exception {
+ doReturn(true).when(fsn).isRunning();
+
+ DatanodeDescriptor node = nodes.get(0);
+ DatanodeStorageInfo ds = node.getStorageInfos()[0];
+ node.setAlive(true);
+ DatanodeRegistration nodeReg = new DatanodeRegistration(node, null, null, "");
+
+ // register new node
+ bm.getDatanodeManager().registerDatanode(nodeReg);
+ bm.getDatanodeManager().addDatanode(node);
+ assertEquals(node, bm.getDatanodeManager().getDatanode(node));
+ assertEquals(0, ds.getBlockReportCount());
+
+ ArrayList<BlockInfo> blocks = new ArrayList<>();
+ for (int id = 24; id > 0; id--) {
+ blocks.add(addBlockToBM(id));
+ }
+
+ // Make sure it's the first full report
+ assertEquals(0, ds.getBlockReportCount());
+ bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
+ generateReport(blocks),
+ new BlockReportContext(1, 0, System.nanoTime(), 0, false),
+ false);
+ assertEquals(1, ds.getBlockReportCount());
+ // verify the storage info is correct
+ for (BlockInfo block : blocks) {
+ assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
+ }
+
+ // Send unsorted report
+ bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
+ generateReport(blocks),
+ new BlockReportContext(1, 0, System.nanoTime(), 0, false),
+ false);
+ assertEquals(2, ds.getBlockReportCount());
+ // verify the storage info is correct
+ for (BlockInfo block : blocks) {
+ assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
+ }
+
+ // Sort list and send a sorted report
+ Collections.sort(blocks);
+ bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
+ generateReport(blocks),
+ new BlockReportContext(1, 0, System.nanoTime(), 0, true),
+ false);
+ assertEquals(3, ds.getBlockReportCount());
+ // verify the storage info is correct
+ for (BlockInfo block : blocks) {
+ assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
+ }
+ }
+
+ private BlockListAsLongs generateReport(List<BlockInfo> blocks) {
+ BlockListAsLongs.Builder builder = BlockListAsLongs.builder();
+ for (BlockInfo block : blocks) {
+ builder.add(new FinalizedReplica(block, null, null));
+ }
+ return builder.build();
+ }
+
private BlockInfo addBlockToBM(long blkId) {
Block block = new Block(blkId);
BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);
@@ -1061,4 +1128,4 @@ public class TestBlockManager {
cluster.shutdown();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index c843938..f4e88b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -29,6 +29,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import javax.management.NotCompliantMBeanException;
@@ -566,7 +567,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
Map<Block, BInfo> map = blockMap.get(bpid);
if (map == null) {
- map = new HashMap<Block, BInfo>();
+ map = new TreeMap<>();
blockMap.put(bpid, map);
}
@@ -1206,7 +1207,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public void addBlockPool(String bpid, Configuration conf) {
- Map<Block, BInfo> map = new HashMap<Block, BInfo>();
+ Map<Block, BInfo> map = new TreeMap<>();
blockMap.put(bpid, map);
storage.addBlockPool(bpid);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
index 27d1cea..61321e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
@@ -19,14 +19,23 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import java.util.ArrayList;
-
+import java.util.Collections;
+import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -108,12 +117,13 @@ public class TestBlockHasMultipleReplicasOnSameDN {
StorageBlockReport reports[] =
new StorageBlockReport[cluster.getStoragesPerDatanode()];
- ArrayList<Replica> blocks = new ArrayList<Replica>();
+ ArrayList<ReplicaInfo> blocks = new ArrayList<>();
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
Block localBlock = locatedBlock.getBlock().getLocalBlock();
blocks.add(new FinalizedReplica(localBlock, null, null));
}
+ Collections.sort(blocks);
try (FsDatasetSpi.FsVolumeReferences volumes =
dn.getFSDataset().getFsVolumeReferences()) {
@@ -126,7 +136,7 @@ public class TestBlockHasMultipleReplicasOnSameDN {
// Should not assert!
cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports,
- new BlockReportContext(1, 0, System.nanoTime(), 0L));
+ new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
// Get the block locations once again.
locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index 212d2e6..27029a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -82,6 +82,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.timeout;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index 90e000b..66f804c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -190,7 +191,8 @@ public class TestDataNodeVolumeFailure {
new StorageBlockReport(dnStorage, blockList);
}
- cluster.getNameNodeRpc().blockReport(dnR, bpid, reports, null);
+ cluster.getNameNodeRpc().blockReport(dnR, bpid, reports,
+ new BlockReportContext(1, 0, System.nanoTime(), 0, true));
// verify number of blocks and files...
verify(filename, filesize);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
index aadd9b2..badd593 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
@@ -134,7 +134,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
any(DatanodeRegistration.class),
anyString(),
- captor.capture(), Mockito.<BlockReportContext>anyObject());
+ captor.capture(), Mockito.<BlockReportContext>anyObject());
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
}
@@ -166,7 +166,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
Mockito.verify(nnSpy, times(1)).blockReport(
any(DatanodeRegistration.class),
anyString(),
- captor.capture(), Mockito.<BlockReportContext>anyObject());
+ captor.capture(), Mockito.<BlockReportContext>anyObject());
verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
index 67bbefe..791ee20 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
-import org.apache.hadoop.util.Time;
/**
@@ -40,7 +39,7 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
StorageBlockReport[] singletonReport = { report };
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
- new BlockReportContext(reports.length, i, System.nanoTime(), 0L));
+ new BlockReportContext(reports.length, i, System.nanoTime(), 0L, true));
i++;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
index fd19ba6..a35fa48 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
@@ -36,6 +36,6 @@ public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase {
StorageBlockReport[] reports) throws IOException {
LOG.info("Sending combined block reports for " + dnR);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports,
- new BlockReportContext(1, 0, System.nanoTime(), 0L));
+ new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
index 00c0f22..f12bc18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.timeout;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index 42cb72f..7fa3803 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -973,7 +973,7 @@ public class NNThroughputBenchmark implements Tool {
new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
};
dataNodeProto.blockReport(dnRegistration, bpid, reports,
- new BlockReportContext(1, 0, System.nanoTime(), 0L));
+ new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
}
/**
@@ -1247,7 +1247,7 @@ public class NNThroughputBenchmark implements Tool {
StorageBlockReport[] report = { new StorageBlockReport(
dn.storage, dn.getBlockReportList()) };
dataNodeProto.blockReport(dn.dnRegistration, bpid, report,
- new BlockReportContext(1, 0, System.nanoTime(), 0L));
+ new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
long end = Time.now();
return end-start;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
index 542c616..dfe9cbe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
@@ -303,7 +304,8 @@ public class TestAddStripedBlocks {
StorageBlockReport[] reports = {new StorageBlockReport(storage,
bll)};
cluster.getNameNodeRpc().blockReport(dn.getDNRegistrationForBP(bpId),
- bpId, reports, null);
+ bpId, reports,
+ new BlockReportContext(1, 0, System.nanoTime(), 0, true));
}
DatanodeStorageInfo[] locs = lastBlock.getUnderConstructionFeature()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 7fd0c30..ff8f81b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -118,7 +118,7 @@ public class TestDeadDatanode {
BlockListAsLongs.EMPTY) };
try {
dnp.blockReport(reg, poolId, report,
- new BlockReportContext(1, 0, System.nanoTime(), 0L));
+ new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
fail("Expected IOException is not thrown");
} catch (IOException ex) {
// Expected
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java
new file mode 100644
index 0000000..d554b1b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * Test of TreeSet
+ */
+public class FoldedTreeSetTest {
+
+ private static Random srand;
+
+ public FoldedTreeSetTest() {
+ }
+
+ @BeforeClass
+ public static void setUpClass() {
+ long seed = System.nanoTime();
+ System.out.println("This run uses the random seed " + seed);
+ srand = new Random(seed);
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ }
+
+ @Before
+ public void setUp() {
+ }
+
+ @After
+ public void tearDown() {
+ }
+
+ /**
+ * Test of comparator method, of class TreeSet.
+ */
+ @Test
+ public void testComparator() {
+ Comparator<String> comparator = new Comparator<String>() {
+
+ @Override
+ public int compare(String o1, String o2) {
+ return o1.compareTo(o2);
+ }
+ };
+ assertEquals(null, new FoldedTreeSet<>().comparator());
+ assertEquals(comparator, new FoldedTreeSet<>(comparator).comparator());
+
+ FoldedTreeSet<String> set = new FoldedTreeSet<>(comparator);
+ set.add("apa3");
+ set.add("apa2");
+ set.add("apa");
+ set.add("apa5");
+ set.add("apa4");
+ assertEquals(5, set.size());
+ assertEquals("apa", set.get("apa"));
+ }
+
+ /**
+ * Test of first method, of class TreeSet.
+ */
+ @Test
+ public void testFirst() {
+ FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
+ for (int i = 0; i < 256; i++) {
+ tree.add(1024 + i);
+ assertEquals(1024, tree.first().intValue());
+ }
+ for (int i = 1; i < 256; i++) {
+ tree.remove(1024 + i);
+ assertEquals(1024, tree.first().intValue());
+ }
+ }
+
+ /**
+ * Test of last method, of class TreeSet.
+ */
+ @Test
+ public void testLast() {
+ FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
+ for (int i = 0; i < 256; i++) {
+ tree.add(1024 + i);
+ assertEquals(1024 + i, tree.last().intValue());
+ }
+ for (int i = 0; i < 255; i++) {
+ tree.remove(1024 + i);
+ assertEquals(1279, tree.last().intValue());
+ }
+ }
+
+ /**
+ * Test of size method, of class TreeSet.
+ */
+ @Test
+ public void testSize() {
+ FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+ String entry = "apa";
+ assertEquals(0, instance.size());
+ instance.add(entry);
+ assertEquals(1, instance.size());
+ instance.remove(entry);
+ assertEquals(0, instance.size());
+ }
+
+ /**
+ * Test of isEmpty method, of class TreeSet.
+ */
+ @Test
+ public void testIsEmpty() {
+ FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+ boolean expResult = true;
+ boolean result = instance.isEmpty();
+ assertEquals(expResult, result);
+ instance.add("apa");
+ instance.remove("apa");
+ assertEquals(expResult, result);
+ }
+
+ /**
+ * Test of contains method, of class TreeSet.
+ */
+ @Test
+ public void testContains() {
+ FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+ String entry = "apa";
+ assertEquals(false, instance.contains(entry));
+ instance.add(entry);
+ assertEquals(true, instance.contains(entry));
+ assertEquals(false, instance.contains(entry + entry));
+ }
+
+ /**
+ * Test of iterator method, of class TreeSet.
+ */
+ @Test
+ public void testIterator() {
+
+ for (int iter = 0; iter < 10; iter++) {
+ FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+ long[] longs = new long[64723];
+ for (int i = 0; i < longs.length; i++) {
+ Holder val = new Holder(srand.nextLong());
+ while (set.contains(val)) {
+ val = new Holder(srand.nextLong());
+ }
+ longs[i] = val.getId();
+ set.add(val);
+ }
+ assertEquals(longs.length, set.size());
+ Arrays.sort(longs);
+
+ Iterator<Holder> it = set.iterator();
+ for (int i = 0; i < longs.length; i++) {
+ assertTrue(it.hasNext());
+ Holder val = it.next();
+ assertEquals(longs[i], val.getId());
+ // remove randomly to force non linear removes
+ if (srand.nextBoolean()) {
+ it.remove();
+ }
+ }
+ }
+ }
+
+ /**
+ * Test of toArray method, of class TreeSet.
+ */
+ @Test
+ public void testToArray() {
+ FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
+ ArrayList<Integer> list = new ArrayList<>(256);
+ for (int i = 0; i < 256; i++) {
+ list.add(1024 + i);
+ }
+ tree.addAll(list);
+ assertArrayEquals(list.toArray(), tree.toArray());
+ }
+
+ /**
+ * Test of toArray method, of class TreeSet.
+ */
+ @Test
+ public void testToArray_GenericType() {
+ FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
+ ArrayList<Integer> list = new ArrayList<>(256);
+ for (int i = 0; i < 256; i++) {
+ list.add(1024 + i);
+ }
+ tree.addAll(list);
+ assertArrayEquals(list.toArray(new Integer[tree.size()]), tree.toArray(new Integer[tree.size()]));
+ assertArrayEquals(list.toArray(new Integer[tree.size() + 100]), tree.toArray(new Integer[tree.size() + 100]));
+ }
+
+ /**
+ * Test of add method, of class TreeSet.
+ */
+ @Test
+ public void testAdd() {
+ FoldedTreeSet<String> simpleSet = new FoldedTreeSet<>();
+ String entry = "apa";
+ assertTrue(simpleSet.add(entry));
+ assertFalse(simpleSet.add(entry));
+
+ FoldedTreeSet<Integer> intSet = new FoldedTreeSet<>();
+ for (int i = 512; i < 1024; i++) {
+ assertTrue(intSet.add(i));
+ }
+ for (int i = -1024; i < -512; i++) {
+ assertTrue(intSet.add(i));
+ }
+ for (int i = 0; i < 512; i++) {
+ assertTrue(intSet.add(i));
+ }
+ for (int i = -512; i < 0; i++) {
+ assertTrue(intSet.add(i));
+ }
+ assertEquals(2048, intSet.size());
+
+ FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+ long[] longs = new long[23432];
+ for (int i = 0; i < longs.length; i++) {
+ Holder val = new Holder(srand.nextLong());
+ while (set.contains(val)) {
+ val = new Holder(srand.nextLong());
+ }
+ longs[i] = val.getId();
+ assertTrue(set.add(val));
+ }
+ assertEquals(longs.length, set.size());
+ Arrays.sort(longs);
+
+ Iterator<Holder> it = set.iterator();
+ for (int i = 0; i < longs.length; i++) {
+ assertTrue(it.hasNext());
+ Holder val = it.next();
+ assertEquals(longs[i], val.getId());
+ }
+
+ // Specially constructed adds to exercise all code paths
+ FoldedTreeSet<Integer> specialAdds = new FoldedTreeSet<>();
+ // Fill node with even numbers
+ for (int i = 0; i < 128; i += 2) {
+ assertTrue(specialAdds.add(i));
+ }
+ // Remove left and add left
+ assertTrue(specialAdds.remove(0));
+ assertTrue(specialAdds.add(-1));
+ assertTrue(specialAdds.remove(-1));
+ // Add right and shift everything left
+ assertTrue(specialAdds.add(127));
+ assertTrue(specialAdds.remove(127));
+
+ // Empty at both ends
+ assertTrue(specialAdds.add(0));
+ assertTrue(specialAdds.remove(0));
+ assertTrue(specialAdds.remove(126));
+ // Add in the middle left to slide entries left
+ assertTrue(specialAdds.add(11));
+ assertTrue(specialAdds.remove(11));
+ // Add in the middle right to slide entries right
+ assertTrue(specialAdds.add(99));
+ assertTrue(specialAdds.remove(99));
+ // Add existing entry in the middle of a node
+ assertFalse(specialAdds.add(64));
+ }
+
+ @Test
+ public void testAddOrReplace() {
+ FoldedTreeSet<String> simpleSet = new FoldedTreeSet<>();
+ String entry = "apa";
+ assertNull(simpleSet.addOrReplace(entry));
+ assertEquals(entry, simpleSet.addOrReplace(entry));
+
+ FoldedTreeSet<Integer> intSet = new FoldedTreeSet<>();
+ for (int i = 0; i < 1024; i++) {
+ assertNull(intSet.addOrReplace(i));
+ }
+ for (int i = 0; i < 1024; i++) {
+ assertEquals(i, intSet.addOrReplace(i).intValue());
+ }
+ }
+
+ private static class Holder implements Comparable<Holder> {
+
+ private final long id;
+
+ public Holder(long id) {
+ this.id = id;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ @Override
+ public int compareTo(Holder o) {
+ return id < o.getId() ? -1
+ : id > o.getId() ? 1 : 0;
+ }
+ }
+
+ @Test
+ public void testRemoveWithComparator() {
+ FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+ long[] longs = new long[98327];
+ for (int i = 0; i < longs.length; i++) {
+ Holder val = new Holder(srand.nextLong());
+ while (set.contains(val)) {
+ val = new Holder(srand.nextLong());
+ }
+ longs[i] = val.getId();
+ set.add(val);
+ }
+ assertEquals(longs.length, set.size());
+ Comparator<Object> cmp = new Comparator<Object>() {
+ @Override
+ public int compare(Object o1, Object o2) {
+ long lookup = (long) o1;
+ long stored = ((Holder) o2).getId();
+ return lookup < stored ? -1
+ : lookup > stored ? 1 : 0;
+ }
+ };
+
+ for (long val : longs) {
+ set.remove(val, cmp);
+ }
+ assertEquals(0, set.size());
+ assertTrue(set.isEmpty());
+ }
+
+ @Test
+ public void testGetWithComparator() {
+ FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+ long[] longs = new long[32147];
+ for (int i = 0; i < longs.length; i++) {
+ Holder val = new Holder(srand.nextLong());
+ while (set.contains(val)) {
+ val = new Holder(srand.nextLong());
+ }
+ longs[i] = val.getId();
+ set.add(val);
+ }
+ assertEquals(longs.length, set.size());
+ Comparator<Object> cmp = new Comparator<Object>() {
+ @Override
+ public int compare(Object o1, Object o2) {
+ long lookup = (long) o1;
+ long stored = ((Holder) o2).getId();
+ return lookup < stored ? -1
+ : lookup > stored ? 1 : 0;
+ }
+ };
+
+ for (long val : longs) {
+ assertEquals(val, set.get(val, cmp).getId());
+ }
+ }
+
+ @Test
+ public void testGet() {
+ FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+ long[] longs = new long[43277];
+ for (int i = 0; i < longs.length; i++) {
+ Holder val = new Holder(srand.nextLong());
+ while (set.contains(val)) {
+ val = new Holder(srand.nextLong());
+ }
+ longs[i] = val.getId();
+ set.add(val);
+ }
+ assertEquals(longs.length, set.size());
+
+ for (long val : longs) {
+ assertEquals(val, set.get(new Holder(val)).getId());
+ }
+ }
+
+ /**
+ * Test of remove method, of class TreeSet.
+ */
+ @Test
+ public void testRemove() {
+ FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+ assertEquals(false, instance.remove("apa"));
+ instance.add("apa");
+ assertEquals(true, instance.remove("apa"));
+
+ removeLeft();
+ removeRight();
+ removeAt();
+ removeRandom();
+ }
+
+ public void removeLeft() {
+ FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
+ for (int i = 1; i <= 320; i++) {
+ set.add(i);
+ }
+ for (int i = 193; i < 225; i++) {
+ assertEquals(true, set.remove(i));
+ assertEquals(false, set.remove(i));
+ }
+ for (int i = 129; i < 161; i++) {
+ assertEquals(true, set.remove(i));
+ assertEquals(false, set.remove(i));
+ }
+ for (int i = 256; i > 224; i--) {
+ assertEquals(true, set.remove(i));
+ assertEquals(false, set.remove(i));
+ }
+ for (int i = 257; i < 289; i++) {
+ assertEquals(true, set.remove(i));
+ assertEquals(false, set.remove(i));
+ }
+ while (!set.isEmpty()) {
+ assertTrue(set.remove(set.first()));
+ }
+ }
+
+ public void removeRight() {
+ FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
+ for (int i = 1; i <= 320; i++) {
+ set.add(i);
+ }
+ for (int i = 193; i < 225; i++) {
+ assertEquals(true, set.remove(i));
+ assertEquals(false, set.remove(i));
+ }
+ for (int i = 192; i > 160; i--) {
+ assertEquals(true, set.remove(i));
+ assertEquals(false, set.remove(i));
+ }
+ for (int i = 256; i > 224; i--) {
+ assertEquals(true, set.remove(i));
+ assertEquals(false, set.remove(i));
+ }
+ for (int i = 320; i > 288; i--) {
+ assertEquals(true, set.remove(i));
+ assertEquals(false, set.remove(i));
+ }
+ while (!set.isEmpty()) {
+ assertTrue(set.remove(set.last()));
+ }
+ }
+
+ public void removeAt() {
+ FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
+ for (int i = 1; i <= 320; i++) {
+ set.add(i);
+ }
+ for (int i = 193; i < 225; i++) {
+ assertEquals(true, set.remove(i));
+ assertEquals(false, set.remove(i));
+ }
+ for (int i = 160; i < 192; i++) {
+ assertEquals(true, set.remove(i));
+ assertEquals(false, set.remove(i));
+ }
+ for (int i = 225; i < 257; i++) {
+ assertEquals(true, set.remove(i));
+ assertEquals(false, set.remove(i));
+ }
+ for (int i = 288; i < 320; i++) {
+ assertEquals(true, set.remove(i));
+ assertEquals(false, set.remove(i));
+ }
+ }
+
+ public void removeRandom() {
+ FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
+ int[] integers = new int[2048];
+ for (int i = 0; i < 2048; i++) {
+ int val = srand.nextInt();
+ while (set.contains(val)) {
+ val = srand.nextInt();
+ }
+ integers[i] = val;
+ set.add(val);
+ }
+ assertEquals(2048, set.size());
+
+ for (int val : integers) {
+ assertEquals(true, set.remove(val));
+ assertEquals(false, set.remove(val));
+ }
+ assertEquals(true, set.isEmpty());
+ }
+
+ /**
+ * Test of containsAll method, of class TreeSet.
+ */
+ @Test
+ public void testContainsAll() {
+ Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
+ FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+ assertEquals(false, instance.containsAll(list));
+ instance.addAll(list);
+ assertEquals(true, instance.containsAll(list));
+ }
+
+ /**
+ * Test of addAll method, of class TreeSet.
+ */
+ @Test
+ public void testAddAll() {
+ Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
+ FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+ assertEquals(true, instance.addAll(list));
+ assertEquals(false, instance.addAll(list)); // add same entries again
+ }
+
+ /**
+ * Test of retainAll method, of class TreeSet.
+ */
+ @Test
+ public void testRetainAll() {
+ Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
+ FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+ instance.addAll(list);
+ assertEquals(false, instance.retainAll(list));
+ assertEquals(2, instance.size());
+ Collection<String> list2 = Arrays.asList(new String[]{"apa"});
+ assertEquals(true, instance.retainAll(list2));
+ assertEquals(1, instance.size());
+ }
+
+ /**
+ * Test of removeAll method, of class TreeSet.
+ */
+ @Test
+ public void testRemoveAll() {
+ Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
+ FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+ assertEquals(false, instance.removeAll(list));
+ instance.addAll(list);
+ assertEquals(true, instance.removeAll(list));
+ assertEquals(true, instance.isEmpty());
+ }
+
+ /**
+ * Test of clear method, of class TreeSet.
+ */
+ @Test
+ public void testClear() {
+ FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+ instance.clear();
+ assertEquals(true, instance.isEmpty());
+ instance.add("apa");
+ assertEquals(false, instance.isEmpty());
+ instance.clear();
+ assertEquals(true, instance.isEmpty());
+ }
+
+ @Test
+ public void testFillRatio() {
+ FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
+ final int size = 1024;
+ for (int i = 1; i <= size; i++) {
+ set.add(i);
+ assertEquals("Iteration: " + i, 1.0, set.fillRatio(), 0.0);
+ }
+
+ for (int i = 1; i <= size / 2; i++) {
+ set.remove(i * 2);
+ // Need the max since all the removes from the last node doesn't
+ // affect the fill ratio
+ assertEquals("Iteration: " + i,
+ Math.max((size - i) / (double) size, 0.53125),
+ set.fillRatio(), 0.0);
+ }
+ }
+
+ @Test
+ public void testCompact() {
+ FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+ long[] longs = new long[24553];
+ for (int i = 0; i < longs.length; i++) {
+ Holder val = new Holder(srand.nextLong());
+ while (set.contains(val)) {
+ val = new Holder(srand.nextLong());
+ }
+ longs[i] = val.getId();
+ set.add(val);
+ }
+ assertEquals(longs.length, set.size());
+
+ long[] longs2 = new long[longs.length];
+ for (int i = 0; i < longs2.length; i++) {
+ Holder val = new Holder(srand.nextLong());
+ while (set.contains(val)) {
+ val = new Holder(srand.nextLong());
+ }
+ longs2[i] = val.getId();
+ set.add(val);
+ }
+ assertEquals(longs.length + longs2.length, set.size());
+
+ // Create fragementation
+ for (long val : longs) {
+ assertTrue(set.remove(new Holder(val)));
+ }
+ assertEquals(longs2.length, set.size());
+
+ assertFalse(set.compact(0));
+ assertTrue(set.compact(Long.MAX_VALUE));
+ assertEquals(longs2.length, set.size());
+ for (long val : longs) {
+ assertFalse(set.remove(new Holder(val)));
+ }
+ for (long val : longs2) {
+ assertEquals(val, set.get(new Holder(val)).getId());
+ }
+ }
+}
[2/2] hadoop git commit: HDFS-9260. Improve the performance and GC
friendliness of NameNode startup and full block reports (Staffan Friberg via
cmccabe)
Posted by cm...@apache.org.
HDFS-9260. Improve the performance and GC friendliness of NameNode startup and full block reports (Staffan Friberg via cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dd9ebf6e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dd9ebf6e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dd9ebf6e
Branch: refs/heads/trunk
Commit: dd9ebf6eedfd4ff8b3486eae2a446de6b0c7fa8a
Parents: 2da03b4
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Tue Feb 2 11:23:00 2016 -0800
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Tue Feb 2 11:23:00 2016 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 12 +
.../DatanodeProtocolClientSideTranslatorPB.java | 5 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 5 +-
.../hdfs/server/blockmanagement/BlockInfo.java | 192 +--
.../blockmanagement/BlockInfoContiguous.java | 29 +-
.../blockmanagement/BlockInfoStriped.java | 30 +-
.../server/blockmanagement/BlockManager.java | 458 +++++--
.../hdfs/server/blockmanagement/BlocksMap.java | 66 +-
.../blockmanagement/DatanodeStorageInfo.java | 123 +-
.../hdfs/server/datanode/BPServiceActor.java | 4 +-
.../datanode/fsdataset/impl/ReplicaMap.java | 71 +-
.../server/protocol/BlockReportContext.java | 10 +-
.../hdfs/server/protocol/DatanodeProtocol.java | 1 -
.../apache/hadoop/hdfs/util/FoldedTreeSet.java | 1285 ++++++++++++++++++
.../src/main/proto/DatanodeProtocol.proto | 3 +
.../hdfs/protocol/TestBlockListAsLongs.java | 4 +-
.../server/blockmanagement/TestBlockInfo.java | 88 --
.../blockmanagement/TestBlockManager.java | 71 +-
.../server/datanode/SimulatedFSDataset.java | 5 +-
.../TestBlockHasMultipleReplicasOnSameDN.java | 20 +-
.../datanode/TestDataNodeHotSwapVolumes.java | 1 +
.../datanode/TestDataNodeVolumeFailure.java | 4 +-
...TestDnRespectsBlockReportSplitThreshold.java | 4 +-
.../TestNNHandlesBlockReportPerStorage.java | 3 +-
.../TestNNHandlesCombinedBlockReport.java | 2 +-
.../server/datanode/TestTriggerBlockReport.java | 1 +
.../server/namenode/NNThroughputBenchmark.java | 4 +-
.../server/namenode/TestAddStripedBlocks.java | 4 +-
.../hdfs/server/namenode/TestDeadDatanode.java | 2 +-
.../hadoop/hdfs/util/FoldedTreeSetTest.java | 644 +++++++++
31 files changed, 2565 insertions(+), 589 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 2eac881..38cb3df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -971,6 +971,9 @@ Release 2.9.0 - UNRELEASED
HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had
an error (Rakesh R via cmccabe)
+ HDFS-9260. Improve the performance and GC friendliness of NameNode startup
+ and full block reports (Staffan Friberg via cmccabe)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 5217740..76915cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -219,6 +219,18 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;
public static final String DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit";
public static final int DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4;
+ public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY
+ = "dfs.namenode.storageinfo.defragment.interval.ms";
+ public static final int
+ DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT = 10 * 60 * 1000;
+ public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY
+ = "dfs.namenode.storageinfo.defragment.timeout.ms";
+ public static final int
+ DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT = 4;
+ public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY
+ = "dfs.namenode.storageinfo.defragment.ratio";
+ public static final double
+ DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT = 0.75;
public static final String DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY = "dfs.web.authentication.filter";
/* Phrased as below to avoid javac inlining as a constant, to match the behavior when
this was AuthFilter.class.getName(). Note that if you change the import for AuthFilter, you
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 81c23e1..79113dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -174,12 +174,13 @@ public class DatanodeProtocolClientSideTranslatorPB implements
@Override
public DatanodeCommand blockReport(DatanodeRegistration registration,
- String poolId, StorageBlockReport[] reports, BlockReportContext context)
+ String poolId, StorageBlockReport[] reports,
+ BlockReportContext context)
throws IOException {
BlockReportRequestProto.Builder builder = BlockReportRequestProto
.newBuilder().setRegistration(PBHelper.convert(registration))
.setBlockPoolId(poolId);
-
+
boolean useBlocksBuffer = registration.getNamespaceInfo()
.isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 4b6baf2..e70cdf0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -824,8 +824,8 @@ public class PBHelper {
public static BlockReportContext convert(BlockReportContextProto proto) {
- return new BlockReportContext(proto.getTotalRpcs(),
- proto.getCurRpc(), proto.getId(), proto.getLeaseId());
+ return new BlockReportContext(proto.getTotalRpcs(), proto.getCurRpc(),
+ proto.getId(), proto.getLeaseId(), proto.getSorted());
}
public static BlockReportContextProto convert(BlockReportContext context) {
@@ -834,6 +834,7 @@ public class PBHelper {
setCurRpc(context.getCurRpc()).
setId(context.getReportId()).
setLeaseId(context.getLeaseId()).
+ setSorted(context.isSorted()).
build();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index e9fa123..5da2140 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -18,8 +18,9 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
-import java.util.LinkedList;
+import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -55,19 +56,9 @@ public abstract class BlockInfo extends Block
/** For implementing {@link LightWeightGSet.LinkedElement} interface. */
private LightWeightGSet.LinkedElement nextLinkedElement;
- /**
- * This array contains triplets of references. For each i-th storage, the
- * block belongs to triplets[3*i] is the reference to the
- * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
- * references to the previous and the next blocks, respectively, in the list
- * of blocks belonging to this storage.
- *
- * Using previous and next in Object triplets is done instead of a
- * {@link LinkedList} list to efficiently use memory. With LinkedList the cost
- * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16
- * bytes using the triplets.
- */
- protected Object[] triplets;
+
+ // Storages this block is replicated on
+ protected DatanodeStorageInfo[] storages;
private BlockUnderConstructionFeature uc;
@@ -77,14 +68,14 @@ public abstract class BlockInfo extends Block
* in the block group
*/
public BlockInfo(short size) {
- this.triplets = new Object[3 * size];
+ this.storages = new DatanodeStorageInfo[size];
this.bcId = INVALID_INODE_ID;
this.replication = isStriped() ? 0 : size;
}
public BlockInfo(Block blk, short size) {
super(blk);
- this.triplets = new Object[3*size];
+ this.storages = new DatanodeStorageInfo[size];
this.bcId = INVALID_INODE_ID;
this.replication = isStriped() ? 0 : size;
}
@@ -109,79 +100,52 @@ public abstract class BlockInfo extends Block
return bcId == INVALID_INODE_ID;
}
- public DatanodeDescriptor getDatanode(int index) {
- DatanodeStorageInfo storage = getStorageInfo(index);
- return storage == null ? null : storage.getDatanodeDescriptor();
- }
+ public Iterator<DatanodeStorageInfo> getStorageInfos() {
+ return new Iterator<DatanodeStorageInfo>() {
- DatanodeStorageInfo getStorageInfo(int index) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
- return (DatanodeStorageInfo)triplets[index*3];
- }
+ private int index = 0;
- BlockInfo getPrevious(int index) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
- BlockInfo info = (BlockInfo)triplets[index*3+1];
- assert info == null ||
- info.getClass().getName().startsWith(BlockInfo.class.getName()) :
- "BlockInfo is expected at " + index*3;
- return info;
- }
+ @Override
+ public boolean hasNext() {
+ while (index < storages.length && storages[index] == null) {
+ index++;
+ }
+ return index < storages.length;
+ }
- BlockInfo getNext(int index) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
- BlockInfo info = (BlockInfo)triplets[index*3+2];
- assert info == null || info.getClass().getName().startsWith(
- BlockInfo.class.getName()) :
- "BlockInfo is expected at " + index*3;
- return info;
+ @Override
+ public DatanodeStorageInfo next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return storages[index++];
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Sorry. can't remove.");
+ }
+ };
}
- void setStorageInfo(int index, DatanodeStorageInfo storage) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
- triplets[index*3] = storage;
+ public DatanodeDescriptor getDatanode(int index) {
+ DatanodeStorageInfo storage = getStorageInfo(index);
+ return storage == null ? null : storage.getDatanodeDescriptor();
}
- /**
- * Return the previous block on the block list for the datanode at
- * position index. Set the previous block on the list to "to".
- *
- * @param index - the datanode index
- * @param to - block to be set to previous on the list of blocks
- * @return current previous block on the list of blocks
- */
- BlockInfo setPrevious(int index, BlockInfo to) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
- BlockInfo info = (BlockInfo) triplets[index*3+1];
- triplets[index*3+1] = to;
- return info;
+ DatanodeStorageInfo getStorageInfo(int index) {
+ assert this.storages != null : "BlockInfo is not initialized";
+ return storages[index];
}
- /**
- * Return the next block on the block list for the datanode at
- * position index. Set the next block on the list to "to".
- *
- * @param index - the datanode index
- * @param to - block to be set to next on the list of blocks
- * @return current next block on the list of blocks
- */
- BlockInfo setNext(int index, BlockInfo to) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
- BlockInfo info = (BlockInfo) triplets[index*3+2];
- triplets[index*3+2] = to;
- return info;
+ void setStorageInfo(int index, DatanodeStorageInfo storage) {
+ assert this.storages != null : "BlockInfo is not initialized";
+ this.storages[index] = storage;
}
public int getCapacity() {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert triplets.length % 3 == 0 : "Malformed BlockInfo";
- return triplets.length / 3;
+ assert this.storages != null : "BlockInfo is not initialized";
+ return storages.length;
}
/**
@@ -240,80 +204,6 @@ public abstract class BlockInfo extends Block
return -1;
}
- /**
- * Insert this block into the head of the list of blocks
- * related to the specified DatanodeStorageInfo.
- * If the head is null then form a new list.
- * @return current block as the new head of the list.
- */
- BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
- int dnIndex = this.findStorageInfo(storage);
- assert dnIndex >= 0 : "Data node is not found: current";
- assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
- "Block is already in the list and cannot be inserted.";
- this.setPrevious(dnIndex, null);
- this.setNext(dnIndex, head);
- if (head != null) {
- head.setPrevious(head.findStorageInfo(storage), this);
- }
- return this;
- }
-
- /**
- * Remove this block from the list of blocks
- * related to the specified DatanodeStorageInfo.
- * If this block is the head of the list then return the next block as
- * the new head.
- * @return the new head of the list or null if the list becomes
- * empy after deletion.
- */
- BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) {
- if (head == null) {
- return null;
- }
- int dnIndex = this.findStorageInfo(storage);
- if (dnIndex < 0) { // this block is not on the data-node list
- return head;
- }
-
- BlockInfo next = this.getNext(dnIndex);
- BlockInfo prev = this.getPrevious(dnIndex);
- this.setNext(dnIndex, null);
- this.setPrevious(dnIndex, null);
- if (prev != null) {
- prev.setNext(prev.findStorageInfo(storage), next);
- }
- if (next != null) {
- next.setPrevious(next.findStorageInfo(storage), prev);
- }
- if (this == head) { // removing the head
- head = next;
- }
- return head;
- }
-
- /**
- * Remove this block from the list of blocks related to the specified
- * DatanodeDescriptor. Insert it into the head of the list of blocks.
- *
- * @return the new head of the list.
- */
- public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage,
- int curIndex, int headIndex) {
- if (head == this) {
- return this;
- }
- BlockInfo next = this.setNext(curIndex, head);
- BlockInfo prev = this.setPrevious(curIndex, null);
-
- head.setPrevious(headIndex, this);
- prev.setNext(prev.findStorageInfo(storage), next);
- if (next != null) {
- next.setPrevious(next.findStorageInfo(storage), prev);
- }
- return this;
- }
-
@Override
public int hashCode() {
// Super implementation is sufficient
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index 746e298..f729c4f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@ -35,20 +35,20 @@ public class BlockInfoContiguous extends BlockInfo {
}
/**
- * Ensure that there is enough space to include num more triplets.
- * @return first free triplet index.
+ * Ensure that there is enough space to include num more storages.
+ * @return first free storage index.
*/
private int ensureCapacity(int num) {
- assert this.triplets != null : "BlockInfo is not initialized";
+ assert this.storages != null : "BlockInfo is not initialized";
int last = numNodes();
- if (triplets.length >= (last+num)*3) {
+ if (storages.length >= (last+num)) {
return last;
}
/* Not enough space left. Create a new array. Should normally
* happen only when replication is manually increased by the user. */
- Object[] old = triplets;
- triplets = new Object[(last+num)*3];
- System.arraycopy(old, 0, triplets, 0, last * 3);
+ DatanodeStorageInfo[] old = storages;
+ storages = new DatanodeStorageInfo[(last+num)];
+ System.arraycopy(old, 0, storages, 0, last);
return last;
}
@@ -57,8 +57,6 @@ public class BlockInfoContiguous extends BlockInfo {
// find the last null node
int lastNode = ensureCapacity(1);
setStorageInfo(lastNode, storage);
- setNext(lastNode, null);
- setPrevious(lastNode, null);
return true;
}
@@ -68,25 +66,18 @@ public class BlockInfoContiguous extends BlockInfo {
if (dnIndex < 0) { // the node is not found
return false;
}
- assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
- "Block is still in the list and must be removed first.";
// find the last not null node
int lastNode = numNodes()-1;
- // replace current node triplet by the lastNode one
+ // replace current node entry by the lastNode one
setStorageInfo(dnIndex, getStorageInfo(lastNode));
- setNext(dnIndex, getNext(lastNode));
- setPrevious(dnIndex, getPrevious(lastNode));
- // set the last triplet to null
+ // set the last entry to null
setStorageInfo(lastNode, null);
- setNext(lastNode, null);
- setPrevious(lastNode, null);
return true;
}
@Override
public int numNodes() {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+ assert this.storages != null : "BlockInfo is not initialized";
for (int idx = getCapacity()-1; idx >= 0; idx--) {
if (getDatanode(idx) != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 20d5858..c6e26ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -26,21 +26,20 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
/**
* Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
*
- * We still use triplets to store DatanodeStorageInfo for each block in the
- * block group, as well as the previous/next block in the corresponding
- * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
+ * We still use a storage array to store DatanodeStorageInfo for each block in
+ * the block group. For a (m+k) block group, the first (m+k) storage units
* are sorted and strictly mapped to the corresponding block.
*
* Normally each block belonging to group is stored in only one DataNode.
- * However, it is possible that some block is over-replicated. Thus the triplet
+ * However, it is possible that some block is over-replicated. Thus the storage
* array's size can be larger than (m+k). Thus currently we use an extra byte
- * array to record the block index for each triplet.
+ * array to record the block index for each entry.
*/
@InterfaceAudience.Private
public class BlockInfoStriped extends BlockInfo {
private final ErasureCodingPolicy ecPolicy;
/**
- * Always the same size with triplets. Record the block index for each triplet
+ * Always the same size with storage. Record the block index for each entry
* TODO: actually this is only necessary for over-replicated block. Thus can
* be further optimized to save memory usage.
*/
@@ -104,7 +103,7 @@ public class BlockInfoStriped extends BlockInfo {
return i;
}
}
- // need to expand the triplet size
+ // need to expand the storage size
ensureCapacity(i + 1, true);
return i;
}
@@ -130,8 +129,6 @@ public class BlockInfoStriped extends BlockInfo {
private void addStorage(DatanodeStorageInfo storage, int index,
int blockIndex) {
setStorageInfo(index, storage);
- setNext(index, null);
- setPrevious(index, null);
indices[index] = (byte) blockIndex;
}
@@ -173,26 +170,22 @@ public class BlockInfoStriped extends BlockInfo {
if (dnIndex < 0) { // the node is not found
return false;
}
- assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
- "Block is still in the list and must be removed first.";
- // set the triplet to null
+ // set the entry to null
setStorageInfo(dnIndex, null);
- setNext(dnIndex, null);
- setPrevious(dnIndex, null);
indices[dnIndex] = -1;
return true;
}
private void ensureCapacity(int totalSize, boolean keepOld) {
if (getCapacity() < totalSize) {
- Object[] old = triplets;
+ DatanodeStorageInfo[] old = storages;
byte[] oldIndices = indices;
- triplets = new Object[totalSize * 3];
+ storages = new DatanodeStorageInfo[totalSize];
indices = new byte[totalSize];
initIndices();
if (keepOld) {
- System.arraycopy(old, 0, triplets, 0, old.length);
+ System.arraycopy(old, 0, storages, 0, old.length);
System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
}
}
@@ -214,8 +207,7 @@ public class BlockInfoStriped extends BlockInfo {
@Override
public int numNodes() {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+ assert this.storages != null : "BlockInfo is not initialized";
int num = 0;
for (int idx = getCapacity()-1; idx >= 0; idx--) {
if (getStorageInfo(idx) != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 587e6b6..25cec8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@@ -93,6 +94,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.hdfs.util.FoldedTreeSet;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@@ -106,6 +108,7 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.VersionInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -195,7 +198,12 @@ public class BlockManager implements BlockStatsMXBean {
/**replicationRecheckInterval is how often namenode checks for new replication work*/
private final long replicationRecheckInterval;
-
+
+ /** How often to check and the limit for the storageinfo efficiency. */
+ private final long storageInfoDefragmentInterval;
+ private final long storageInfoDefragmentTimeout;
+ private final double storageInfoDefragmentRatio;
+
/**
* Mapping: Block -> { BlockCollection, datanodes, self ref }
* Updated only in response to client-sent information.
@@ -204,6 +212,10 @@ public class BlockManager implements BlockStatsMXBean {
/** Replication thread. */
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
+
+ /** StorageInfoDefragmenter thread. */
+ private final Daemon storageInfoDefragmenterThread =
+ new Daemon(new StorageInfoDefragmenter());
/** Block report thread for handling async reports. */
private final BlockReportProcessingThread blockReportThread =
@@ -376,7 +388,20 @@ public class BlockManager implements BlockStatsMXBean {
this.replicationRecheckInterval =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
-
+
+ this.storageInfoDefragmentInterval =
+ conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT);
+ this.storageInfoDefragmentTimeout =
+ conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT);
+ this.storageInfoDefragmentRatio =
+ conf.getDouble(
+ DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT);
+
this.encryptDataTransfer =
conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
@@ -508,6 +533,8 @@ public class BlockManager implements BlockStatsMXBean {
datanodeManager.activate(conf);
this.replicationThread.setName("ReplicationMonitor");
this.replicationThread.start();
+ storageInfoDefragmenterThread.setName("StorageInfoMonitor");
+ storageInfoDefragmenterThread.start();
this.blockReportThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
bmSafeMode.activate(blockTotal);
@@ -517,8 +544,10 @@ public class BlockManager implements BlockStatsMXBean {
bmSafeMode.close();
try {
replicationThread.interrupt();
+ storageInfoDefragmenterThread.interrupt();
blockReportThread.interrupt();
replicationThread.join(3000);
+ storageInfoDefragmenterThread.join(3000);
blockReportThread.join(3000);
} catch (InterruptedException ie) {
}
@@ -1165,9 +1194,15 @@ public class BlockManager implements BlockStatsMXBean {
/** Remove the blocks associated to the given datanode. */
void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
- final Iterator<BlockInfo> it = node.getBlockIterator();
- while(it.hasNext()) {
- removeStoredBlock(it.next(), node);
+ for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+ final Iterator<BlockInfo> it = storage.getBlockIterator();
+ while (it.hasNext()) {
+ BlockInfo block = it.next();
+ // DatanodeStorageInfo must be removed using the iterator to avoid
+ // ConcurrentModificationException in the underlying storage
+ it.remove();
+ removeStoredBlock(block, node);
+ }
}
// Remove all pending DN messages referencing this DN.
pendingDNMessages.removeAllMessagesForDatanode(node);
@@ -1183,6 +1218,9 @@ public class BlockManager implements BlockStatsMXBean {
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
while(it.hasNext()) {
BlockInfo block = it.next();
+ // DatanodeStorageInfo must be removed using the iterator to avoid
+ // ConcurrentModificationException in the underlying storage
+ it.remove();
removeStoredBlock(block, node);
final Block b = getBlockOnStorage(block, storageInfo);
if (b != null) {
@@ -2033,8 +2071,8 @@ public class BlockManager implements BlockStatsMXBean {
*/
public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage,
- final BlockListAsLongs newReport, BlockReportContext context,
- boolean lastStorageInRpc) throws IOException {
+ final BlockListAsLongs newReport,
+ BlockReportContext context, boolean lastStorageInRpc) throws IOException {
namesystem.writeLock();
final long startTime = Time.monotonicNow(); //after acquiring write lock
final long endTime;
@@ -2079,7 +2117,8 @@ public class BlockManager implements BlockStatsMXBean {
nodeID.getDatanodeUuid());
processFirstBlockReport(storageInfo, newReport);
} else {
- invalidatedBlocks = processReport(storageInfo, newReport);
+ invalidatedBlocks = processReport(storageInfo, newReport,
+ context != null ? context.isSorted() : false);
}
storageInfo.receivedBlockReport();
@@ -2149,6 +2188,9 @@ public class BlockManager implements BlockStatsMXBean {
// TODO: remove this assumption in case we want to put a block on
// more than one storage on a datanode (and because it's a difficult
// assumption to really enforce)
+ // DatanodeStorageInfo must be removed using the iterator to avoid
+ // ConcurrentModificationException in the underlying storage
+ iter.remove();
removeStoredBlock(block, zombie.getDatanodeDescriptor());
Block b = getBlockOnStorage(block, zombie);
if (b != null) {
@@ -2238,7 +2280,7 @@ public class BlockManager implements BlockStatsMXBean {
private Collection<Block> processReport(
final DatanodeStorageInfo storageInfo,
- final BlockListAsLongs report) throws IOException {
+ final BlockListAsLongs report, final boolean sorted) throws IOException {
// Normal case:
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
@@ -2248,9 +2290,29 @@ public class BlockManager implements BlockStatsMXBean {
Collection<Block> toInvalidate = new LinkedList<>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
Collection<StatefulBlockInfo> toUC = new LinkedList<>();
- reportDiff(storageInfo, report,
- toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-
+
+ Iterable<BlockReportReplica> sortedReport;
+ if (!sorted) {
+ blockLog.warn("BLOCK* processReport: Report from the DataNode ({}) is "
+ + "unsorted. This will cause overhead on the NameNode "
+ + "which needs to sort the Full BR. Please update the "
+ + "DataNode to the same version of Hadoop HDFS as the "
+ + "NameNode ({}).",
+ storageInfo.getDatanodeDescriptor().getDatanodeUuid(),
+ VersionInfo.getVersion());
+ Set<BlockReportReplica> set = new FoldedTreeSet<>();
+ for (BlockReportReplica iblk : report) {
+ set.add(new BlockReportReplica(iblk));
+ }
+ sortedReport = set;
+ } else {
+ sortedReport = report;
+ }
+
+ reportDiffSorted(storageInfo, sortedReport,
+ toAdd, toRemove, toInvalidate, toCorrupt, toUC);
+
+
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Process the blocks on each queue
for (StatefulBlockInfo b : toUC) {
@@ -2399,126 +2461,111 @@ public class BlockManager implements BlockStatsMXBean {
}
}
- private void reportDiff(DatanodeStorageInfo storageInfo,
- BlockListAsLongs newReport,
+ private void reportDiffSorted(DatanodeStorageInfo storageInfo,
+ Iterable<BlockReportReplica> newReport,
Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor
Collection<BlockInfo> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
- // place a delimiter in the list which separates blocks
- // that have been reported from those that have not
- Block delimiterBlock = new Block();
- BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
- (short) 1);
- AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
- assert result == AddBlockResult.ADDED
- : "Delimiting block cannot be present in the node";
- int headIndex = 0; //currently the delimiter is in the head of the list
- int curIndex;
-
- if (newReport == null) {
- newReport = BlockListAsLongs.EMPTY;
- }
- // scan the report and process newly reported blocks
- for (BlockReportReplica iblk : newReport) {
- ReplicaState iState = iblk.getState();
- BlockInfo storedBlock = processReportedBlock(storageInfo,
- iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
-
- // move block to the head of the list
- if (storedBlock != null &&
- (curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) {
- headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
+ // The blocks must be sorted and the storagenodes blocks must be sorted
+ Iterator<BlockInfo> storageBlocksIterator = storageInfo.getBlockIterator();
+ DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
+ BlockInfo storageBlock = null;
+
+ for (BlockReportReplica replica : newReport) {
+
+ long replicaID = replica.getBlockId();
+ if (BlockIdManager.isStripedBlockID(replicaID)
+ && (!hasNonEcBlockUsingStripedID ||
+ !blocksMap.containsBlock(replica))) {
+ replicaID = BlockIdManager.convertToStripedID(replicaID);
+ }
+
+ ReplicaState reportedState = replica.getState();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reported block " + replica
+ + " on " + dn + " size " + replica.getNumBytes()
+ + " replicaState = " + reportedState);
+ }
+
+ if (shouldPostponeBlocksFromFuture
+ && isGenStampInFuture(replica)) {
+ queueReportedBlock(storageInfo, replica, reportedState,
+ QUEUE_REASON_FUTURE_GENSTAMP);
+ continue;
+ }
+
+ if (storageBlock == null && storageBlocksIterator.hasNext()) {
+ storageBlock = storageBlocksIterator.next();
}
+
+ do {
+ int cmp;
+ if (storageBlock == null ||
+ (cmp = Long.compare(replicaID, storageBlock.getBlockId())) < 0) {
+ // Check if block is available in NN but not yet on this storage
+ BlockInfo nnBlock = blocksMap.getStoredBlock(new Block(replicaID));
+ if (nnBlock != null) {
+ reportDiffSortedInner(storageInfo, replica, reportedState,
+ nnBlock, toAdd, toCorrupt, toUC);
+ } else {
+ // Replica not found anywhere so it should be invalidated
+ toInvalidate.add(new Block(replica));
+ }
+ break;
+ } else if (cmp == 0) {
+ // Replica matched current storageblock
+ reportDiffSortedInner(storageInfo, replica, reportedState,
+ storageBlock, toAdd, toCorrupt, toUC);
+ storageBlock = null;
+ } else {
+ // replica has higher ID than storedBlock
+ // Remove all stored blocks with IDs lower than replica
+ do {
+ toRemove.add(storageBlock);
+ storageBlock = storageBlocksIterator.hasNext()
+ ? storageBlocksIterator.next() : null;
+ } while (storageBlock != null &&
+ Long.compare(replicaID, storageBlock.getBlockId()) > 0);
+ }
+ } while (storageBlock != null);
}
- // collect blocks that have not been reported
- // all of them are next to the delimiter
- Iterator<BlockInfo> it =
- storageInfo.new BlockIterator(delimiter.getNext(0));
- while (it.hasNext()) {
- toRemove.add(it.next());
+ // Iterate any remaing blocks that have not been reported and remove them
+ while (storageBlocksIterator.hasNext()) {
+ toRemove.add(storageBlocksIterator.next());
}
- storageInfo.removeBlock(delimiter);
}
- /**
- * Process a block replica reported by the data-node.
- * No side effects except adding to the passed-in Collections.
- *
- * <ol>
- * <li>If the block is not known to the system (not in blocksMap) then the
- * data-node should be notified to invalidate this block.</li>
- * <li>If the reported replica is valid that is has the same generation stamp
- * and length as recorded on the name-node, then the replica location should
- * be added to the name-node.</li>
- * <li>If the reported replica is not valid, then it is marked as corrupt,
- * which triggers replication of the existing valid replicas.
- * Corrupt replicas are removed from the system when the block
- * is fully replicated.</li>
- * <li>If the reported replica is for a block currently marked "under
- * construction" in the NN, then it should be added to the
- * BlockUnderConstructionFeature's list of replicas.</li>
- * </ol>
- *
- * @param storageInfo DatanodeStorageInfo that sent the report.
- * @param block reported block replica
- * @param reportedState reported replica state
- * @param toAdd add to DatanodeDescriptor
- * @param toInvalidate missing blocks (not in the blocks map)
- * should be removed from the data-node
- * @param toCorrupt replicas with unexpected length or generation stamp;
- * add to corrupt replicas
- * @param toUC replicas of blocks currently under construction
- * @return the up-to-date stored block, if it should be kept.
- * Otherwise, null.
- */
- private BlockInfo processReportedBlock(
+ private void reportDiffSortedInner(
final DatanodeStorageInfo storageInfo,
- final Block block, final ReplicaState reportedState,
+ final BlockReportReplica replica, final ReplicaState reportedState,
+ final BlockInfo storedBlock,
final Collection<BlockInfoToAdd> toAdd,
- final Collection<Block> toInvalidate,
final Collection<BlockToMarkCorrupt> toCorrupt,
final Collection<StatefulBlockInfo> toUC) {
-
- DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Reported block " + block
- + " on " + dn + " size " + block.getNumBytes()
- + " replicaState = " + reportedState);
- }
-
- if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) {
- queueReportedBlock(storageInfo, block, reportedState,
- QUEUE_REASON_FUTURE_GENSTAMP);
- return null;
- }
-
- // find block by blockId
- BlockInfo storedBlock = getStoredBlock(block);
- if(storedBlock == null) {
- // If blocksMap does not contain reported block id,
- // the replica should be removed from the data-node.
- toInvalidate.add(new Block(block));
- return null;
- }
+ assert replica != null;
+ assert storedBlock != null;
+
+ DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
BlockUCState ucState = storedBlock.getBlockUCState();
-
+
// Block is on the NN
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("In memory blockUCState = " + ucState);
}
// Ignore replicas already scheduled to be removed from the DN
- if(invalidateBlocks.contains(dn, block)) {
- return storedBlock;
+ if (invalidateBlocks.contains(dn, replica)) {
+ return;
}
- BlockToMarkCorrupt c = checkReplicaCorrupt(
- block, reportedState, storedBlock, ucState, dn);
+ BlockToMarkCorrupt c = checkReplicaCorrupt(replica, reportedState,
+ storedBlock, ucState, dn);
if (c != null) {
if (shouldPostponeBlocksFromFuture) {
// If the block is an out-of-date generation stamp or state,
@@ -2532,23 +2579,16 @@ public class BlockManager implements BlockStatsMXBean {
} else {
toCorrupt.add(c);
}
- return storedBlock;
- }
-
- if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
- toUC.add(new StatefulBlockInfo(storedBlock,
- new Block(block), reportedState));
- return storedBlock;
- }
-
- // Add replica if appropriate. If the replica was previously corrupt
- // but now okay, it might need to be updated.
- if (reportedState == ReplicaState.FINALIZED
- && (storedBlock.findStorageInfo(storageInfo) == -1 ||
- corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
- toAdd.add(new BlockInfoToAdd(storedBlock, block));
+ } else if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+ toUC.add(new StatefulBlockInfo(storedBlock, new Block(replica),
+ reportedState));
+ } else if (reportedState == ReplicaState.FINALIZED &&
+ (storedBlock.findStorageInfo(storageInfo) == -1 ||
+ corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
+ // Add replica if appropriate. If the replica was previously corrupt
+ // but now okay, it might need to be updated.
+ toAdd.add(new BlockInfoToAdd(storedBlock, replica));
}
- return storedBlock;
}
/**
@@ -2774,7 +2814,7 @@ public class BlockManager implements BlockStatsMXBean {
}
// just add it
- AddBlockResult result = storageInfo.addBlock(storedBlock, reported);
+ AddBlockResult result = storageInfo.addBlockInitial(storedBlock, reported);
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
@@ -3497,40 +3537,75 @@ public class BlockManager implements BlockStatsMXBean {
DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, DatanodeDescriptor delHintNode)
throws IOException {
- // blockReceived reports a finalized block
- Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
- Collection<Block> toInvalidate = new LinkedList<Block>();
- Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
- Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+
final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
- processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
- toCorrupt, toUC);
- // the block is only in one of the to-do lists
- // if it is in none then data-node already has it
- assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
- : "The block should be only in one of the lists.";
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Reported block " + block
+ + " on " + node + " size " + block.getNumBytes()
+ + " replicaState = " + reportedState);
+ }
- for (StatefulBlockInfo b : toUC) {
- addStoredBlockUnderConstruction(b, storageInfo);
+ if (shouldPostponeBlocksFromFuture &&
+ isGenStampInFuture(block)) {
+ queueReportedBlock(storageInfo, block, reportedState,
+ QUEUE_REASON_FUTURE_GENSTAMP);
+ return;
}
- long numBlocksLogged = 0;
- for (BlockInfoToAdd b : toAdd) {
- addStoredBlock(b.stored, b.reported, storageInfo, delHintNode,
- numBlocksLogged < maxNumBlocksToLog);
- numBlocksLogged++;
+
+ // find block by blockId
+ BlockInfo storedBlock = getStoredBlock(block);
+ if(storedBlock == null) {
+ // If blocksMap does not contain reported block id,
+ // the replica should be removed from the data-node.
+ blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
+ "belong to any file", block, node, block.getNumBytes());
+ addToInvalidates(new Block(block), node);
+ return;
}
- if (numBlocksLogged > maxNumBlocksToLog) {
- blockLog.debug("BLOCK* addBlock: logged info for {} of {} reported.",
- maxNumBlocksToLog, numBlocksLogged);
+
+ BlockUCState ucState = storedBlock.getBlockUCState();
+ // Block is on the NN
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("In memory blockUCState = " + ucState);
}
- for (Block b : toInvalidate) {
- blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
- "belong to any file", b, node, b.getNumBytes());
- addToInvalidates(b, node);
+
+ // Ignore replicas already scheduled to be removed from the DN
+ if(invalidateBlocks.contains(node, block)) {
+ return;
}
- for (BlockToMarkCorrupt b : toCorrupt) {
- markBlockAsCorrupt(b, storageInfo, node);
+
+ BlockToMarkCorrupt c = checkReplicaCorrupt(
+ block, reportedState, storedBlock, ucState, node);
+ if (c != null) {
+ if (shouldPostponeBlocksFromFuture) {
+ // If the block is an out-of-date generation stamp or state,
+ // but we're the standby, we shouldn't treat it as corrupt,
+ // but instead just queue it for later processing.
+ // TODO: Pretty confident this should be s/storedBlock/block below,
+ // since we should be postponing the info of the reported block, not
+ // the stored block. See HDFS-6289 for more context.
+ queueReportedBlock(storageInfo, storedBlock, reportedState,
+ QUEUE_REASON_CORRUPT_STATE);
+ } else {
+ markBlockAsCorrupt(c, storageInfo, node);
+ }
+ return;
+ }
+
+ if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+ addStoredBlockUnderConstruction(
+ new StatefulBlockInfo(storedBlock, new Block(block), reportedState),
+ storageInfo);
+ return;
+ }
+
+ // Add replica if appropriate. If the replica was previously corrupt
+ // but now okay, it might need to be updated.
+ if (reportedState == ReplicaState.FINALIZED
+ && (storedBlock.findStorageInfo(storageInfo) == -1 ||
+ corruptReplicas.isReplicaCorrupt(storedBlock, node))) {
+ addStoredBlock(storedBlock, block, storageInfo, delHintNode, true);
}
}
@@ -4060,6 +4135,87 @@ public class BlockManager implements BlockStatsMXBean {
}
}
+ /**
+ * Runnable that monitors the fragmentation of the StorageInfo TreeSet and
+ * compacts it when it falls under a certain threshold.
+ */
+ private class StorageInfoDefragmenter implements Runnable {
+
+ @Override
+ public void run() {
+ while (namesystem.isRunning()) {
+ try {
+ // Check storage efficiency only when active NN is out of safe mode.
+ if (isPopulatingReplQueues()) {
+ scanAndCompactStorages();
+ }
+ Thread.sleep(storageInfoDefragmentInterval);
+ } catch (Throwable t) {
+ if (!namesystem.isRunning()) {
+ LOG.info("Stopping thread.");
+ if (!(t instanceof InterruptedException)) {
+ LOG.info("Received an exception while shutting down.", t);
+ }
+ break;
+ } else if (!checkNSRunning && t instanceof InterruptedException) {
+ LOG.info("Stopping for testing.");
+ break;
+ }
+ LOG.error("Thread received Runtime exception.", t);
+ terminate(1, t);
+ }
+ }
+ }
+
+ private void scanAndCompactStorages() throws InterruptedException {
+ ArrayList<String> datanodesAndStorages = new ArrayList<>();
+ for (DatanodeDescriptor node
+ : datanodeManager.getDatanodeListForReport(DatanodeReportType.ALL)) {
+ for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+ try {
+ namesystem.readLock();
+ double ratio = storage.treeSetFillRatio();
+ if (ratio < storageInfoDefragmentRatio) {
+ datanodesAndStorages.add(node.getDatanodeUuid());
+ datanodesAndStorages.add(storage.getStorageID());
+ }
+ LOG.info("StorageInfo TreeSet fill ratio {} : {}{}",
+ storage.getStorageID(), ratio,
+ (ratio < storageInfoDefragmentRatio)
+ ? " (queued for defragmentation)" : "");
+ } finally {
+ namesystem.readUnlock();
+ }
+ }
+ }
+ if (!datanodesAndStorages.isEmpty()) {
+ for (int i = 0; i < datanodesAndStorages.size(); i += 2) {
+ namesystem.writeLock();
+ try {
+ DatanodeStorageInfo storage = datanodeManager.
+ getDatanode(datanodesAndStorages.get(i)).
+ getStorageInfo(datanodesAndStorages.get(i + 1));
+ if (storage != null) {
+ boolean aborted =
+ !storage.treeSetCompact(storageInfoDefragmentTimeout);
+ if (aborted) {
+ // Compaction timed out, reset iterator to continue with
+ // the same storage next iteration.
+ i -= 2;
+ }
+ LOG.info("StorageInfo TreeSet defragmented {} : {}{}",
+ storage.getStorageID(), storage.treeSetFillRatio(),
+ aborted ? " (aborted)" : "");
+ }
+ } finally {
+ namesystem.writeUnlock();
+ }
+ // Wait between each iteration
+ Thread.sleep(1000);
+ }
+ }
+ }
+ }
/**
* Compute block replication and block invalidation work that can be scheduled
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index 47a21fe..71d0598 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import java.util.Collections;
import java.util.Iterator;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -30,37 +31,6 @@ import org.apache.hadoop.util.LightWeightGSet;
* the datanodes that store the block.
*/
class BlocksMap {
- private static class StorageIterator implements Iterator<DatanodeStorageInfo> {
- private final BlockInfo blockInfo;
- private int nextIdx = 0;
-
- StorageIterator(BlockInfo blkInfo) {
- this.blockInfo = blkInfo;
- }
-
- @Override
- public boolean hasNext() {
- if (blockInfo == null) {
- return false;
- }
- while (nextIdx < blockInfo.getCapacity() &&
- blockInfo.getDatanode(nextIdx) == null) {
- // note that for striped blocks there may be null in the triplets
- nextIdx++;
- }
- return nextIdx < blockInfo.getCapacity();
- }
-
- @Override
- public DatanodeStorageInfo next() {
- return blockInfo.getStorageInfo(nextIdx++);
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Sorry. can't remove.");
- }
- }
/** Constant {@link LightWeightGSet} capacity. */
private final int capacity;
@@ -132,6 +102,16 @@ class BlocksMap {
}
}
+ /**
+ * Check if BlocksMap contains the block.
+ *
+ * @param b Block to check
+ * @return true if block is in the map, otherwise false
+ */
+ boolean containsBlock(Block b) {
+ return blocks.contains(b);
+ }
+
/** Returns the block object if it exists in the map. */
BlockInfo getStoredBlock(Block b) {
return blocks.get(b);
@@ -142,7 +122,9 @@ class BlocksMap {
* returns {@link Iterable} of the storages the block belongs to.
*/
Iterable<DatanodeStorageInfo> getStorages(Block b) {
- return getStorages(blocks.get(b));
+ BlockInfo block = blocks.get(b);
+ return block != null ? getStorages(block)
+ : Collections.<DatanodeStorageInfo>emptyList();
}
/**
@@ -150,12 +132,16 @@ class BlocksMap {
* returns {@link Iterable} of the storages the block belongs to.
*/
Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
- return new Iterable<DatanodeStorageInfo>() {
- @Override
- public Iterator<DatanodeStorageInfo> iterator() {
- return new StorageIterator(storedBlock);
- }
- };
+ if (storedBlock == null) {
+ return Collections.emptyList();
+ } else {
+ return new Iterable<DatanodeStorageInfo>() {
+ @Override
+ public Iterator<DatanodeStorageInfo> iterator() {
+ return storedBlock.getStorageInfos();
+ }
+ };
+ }
}
/** counts number of containing nodes. Better than using iterator. */
@@ -174,7 +160,7 @@ class BlocksMap {
if (info == null)
return false;
- // remove block from the data-node list and the node from the block info
+ // remove block from the data-node set and the node from the block info
boolean removed = removeBlock(node, info);
if (info.hasNoStorage() // no datanodes left
@@ -185,7 +171,7 @@ class BlocksMap {
}
/**
- * Remove block from the list of blocks belonging to the data-node. Remove
+ * Remove block from the set of blocks belonging to the data-node. Remove
* data-node from the block.
*/
static boolean removeBlock(DatanodeDescriptor dn, BlockInfo b) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 1f1b24b..c4729ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.FoldedTreeSet;
import com.google.common.annotations.VisibleForTesting;
@@ -85,31 +86,6 @@ public class DatanodeStorageInfo {
storageType = storage.getStorageType();
}
- /**
- * Iterates over the list of blocks belonging to the data-node.
- */
- class BlockIterator implements Iterator<BlockInfo> {
- private BlockInfo current;
-
- BlockIterator(BlockInfo head) {
- this.current = head;
- }
-
- public boolean hasNext() {
- return current != null;
- }
-
- public BlockInfo next() {
- BlockInfo res = current;
- current = current.getNext(current.findStorageInfo(DatanodeStorageInfo.this));
- return res;
- }
-
- public void remove() {
- throw new UnsupportedOperationException("Sorry. can't remove.");
- }
- }
-
private final DatanodeDescriptor dn;
private final String storageID;
private StorageType storageType;
@@ -120,8 +96,7 @@ public class DatanodeStorageInfo {
private volatile long remaining;
private long blockPoolUsed;
- private volatile BlockInfo blockList = null;
- private int numBlocks = 0;
+ private final FoldedTreeSet<BlockInfo> blocks = new FoldedTreeSet<>();
// The ID of the last full block report which updated this storage.
private long lastBlockReportId = 0;
@@ -207,7 +182,7 @@ public class DatanodeStorageInfo {
}
boolean areBlocksOnFailedStorage() {
- return getState() == State.FAILED && numBlocks != 0;
+ return getState() == State.FAILED && !blocks.isEmpty();
}
@VisibleForTesting
@@ -234,6 +209,36 @@ public class DatanodeStorageInfo {
long getBlockPoolUsed() {
return blockPoolUsed;
}
+ /**
+ * For use during startup. Expects block to be added in sorted order
+ * to enable fast insert in to the DatanodeStorageInfo
+ *
+ * @param b Block to add to DatanodeStorageInfo
+ * @param reportedBlock The reported replica
+ * @return Enum describing if block was added, replaced or already existed
+ */
+ public AddBlockResult addBlockInitial(BlockInfo b, Block reportedBlock) {
+ // First check whether the block belongs to a different storage
+ // on the same DN.
+ AddBlockResult result = AddBlockResult.ADDED;
+ DatanodeStorageInfo otherStorage =
+ b.findStorageInfo(getDatanodeDescriptor());
+
+ if (otherStorage != null) {
+ if (otherStorage != this) {
+ // The block belongs to a different storage. Remove it first.
+ otherStorage.removeBlock(b);
+ result = AddBlockResult.REPLACED;
+ } else {
+ // The block is already associated with this storage.
+ return AddBlockResult.ALREADY_EXIST;
+ }
+ }
+
+ b.addStorage(this, reportedBlock);
+ blocks.addSortedLast(b);
+ return result;
+ }
public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
// First check whether the block belongs to a different storage
@@ -253,9 +258,8 @@ public class DatanodeStorageInfo {
}
}
- // add to the head of the data-node list
b.addStorage(this, reportedBlock);
- insertToList(b);
+ blocks.add(b);
return result;
}
@@ -263,45 +267,17 @@ public class DatanodeStorageInfo {
return addBlock(b, b);
}
- public void insertToList(BlockInfo b) {
- blockList = b.listInsert(blockList, this);
- numBlocks++;
- }
-
- public boolean removeBlock(BlockInfo b) {
- blockList = b.listRemove(blockList, this);
- if (b.removeStorage(this)) {
- numBlocks--;
- return true;
- } else {
- return false;
- }
+ boolean removeBlock(BlockInfo b) {
+ blocks.remove(b);
+ return b.removeStorage(this);
}
int numBlocks() {
- return numBlocks;
+ return blocks.size();
}
Iterator<BlockInfo> getBlockIterator() {
- return new BlockIterator(blockList);
- }
-
- /**
- * Move block to the head of the list of blocks belonging to the data-node.
- * @return the index of the head of the blockList
- */
- int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
- blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex);
- return curIndex;
- }
-
- /**
- * Used for testing only
- * @return the head of the blockList
- */
- @VisibleForTesting
- BlockInfo getBlockListHeadForTesting(){
- return blockList;
+ return blocks.iterator();
}
void updateState(StorageReport r) {
@@ -349,6 +325,27 @@ public class DatanodeStorageInfo {
false, capacity, dfsUsed, remaining, blockPoolUsed);
}
+ /**
+ * The fill ratio of the underlying TreeSet holding blocks.
+ *
+ * @return the fill ratio of the tree
+ */
+ public double treeSetFillRatio() {
+ return blocks.fillRatio();
+ }
+
+ /**
+ * Compact the underlying TreeSet holding blocks.
+ *
+ * @param timeout Maximum time to spend compacting the tree set in
+ * milliseconds.
+ *
+ * @return true if compaction completed, false if aborted
+ */
+ public boolean treeSetCompact(long timeout) {
+ return blocks.compact(timeout);
+ }
+
static Iterable<StorageType> toStorageTypes(
final Iterable<DatanodeStorageInfo> infos) {
return new Iterable<StorageType>() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 1b72961..bc4f2d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -461,7 +461,7 @@ class BPServiceActor implements Runnable {
// Below split threshold, send all reports in a single message.
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), reports,
- new BlockReportContext(1, 0, reportId, fullBrLeaseId));
+ new BlockReportContext(1, 0, reportId, fullBrLeaseId, true));
numRPCs = 1;
numReportsSent = reports.length;
if (cmd != null) {
@@ -474,7 +474,7 @@ class BPServiceActor implements Runnable {
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), singleReport,
new BlockReportContext(reports.length, r, reportId,
- fullBrLeaseId));
+ fullBrLeaseId, true));
numReportsSent++;
numRPCs++;
if (cmd != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index 6f0b8a7..34c9f2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -18,13 +18,14 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
-import org.apache.hadoop.util.LightWeightResizableGSet;
+import org.apache.hadoop.hdfs.util.FoldedTreeSet;
/**
* Maintains the replica map.
@@ -33,9 +34,20 @@ class ReplicaMap {
// Object using which this class is synchronized
private final Object mutex;
- // Map of block pool Id to another map of block Id to ReplicaInfo.
- private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
- new HashMap<String, LightWeightResizableGSet<Block, ReplicaInfo>>();
+ // Map of block pool Id to a set of ReplicaInfo.
+ private final Map<String, FoldedTreeSet<ReplicaInfo>> map = new HashMap<>();
+
+ // Special comparator used to compare Long to Block ID in the TreeSet.
+ private static final Comparator<Object> LONG_AND_BLOCK_COMPARATOR
+ = new Comparator<Object>() {
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ long lookup = (long) o1;
+ long stored = ((Block) o2).getBlockId();
+ return lookup > stored ? 1 : lookup < stored ? -1 : 0;
+ }
+ };
ReplicaMap(Object mutex) {
if (mutex == null) {
@@ -92,11 +104,14 @@ class ReplicaMap {
ReplicaInfo get(String bpid, long blockId) {
checkBlockPool(bpid);
synchronized(mutex) {
- LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
- return m != null ? m.get(new Block(blockId)) : null;
+ FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+ if (set == null) {
+ return null;
+ }
+ return set.get(blockId, LONG_AND_BLOCK_COMPARATOR);
}
}
-
+
/**
* Add a replica's meta information into the map
*
@@ -109,13 +124,13 @@ class ReplicaMap {
checkBlockPool(bpid);
checkBlock(replicaInfo);
synchronized(mutex) {
- LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
- if (m == null) {
+ FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+ if (set == null) {
// Add an entry for block pool if it does not exist already
- m = new LightWeightResizableGSet<Block, ReplicaInfo>();
- map.put(bpid, m);
+ set = new FoldedTreeSet<>();
+ map.put(bpid, set);
}
- return m.put(replicaInfo);
+ return set.addOrReplace(replicaInfo);
}
}
@@ -138,12 +153,13 @@ class ReplicaMap {
checkBlockPool(bpid);
checkBlock(block);
synchronized(mutex) {
- LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
- if (m != null) {
- ReplicaInfo replicaInfo = m.get(block);
+ FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+ if (set != null) {
+ ReplicaInfo replicaInfo =
+ set.get(block.getBlockId(), LONG_AND_BLOCK_COMPARATOR);
if (replicaInfo != null &&
block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
- return m.remove(block);
+ return set.removeAndGet(replicaInfo);
}
}
}
@@ -160,9 +176,9 @@ class ReplicaMap {
ReplicaInfo remove(String bpid, long blockId) {
checkBlockPool(bpid);
synchronized(mutex) {
- LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
- if (m != null) {
- return m.remove(new Block(blockId));
+ FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+ if (set != null) {
+ return set.removeAndGet(blockId, LONG_AND_BLOCK_COMPARATOR);
}
}
return null;
@@ -174,10 +190,9 @@ class ReplicaMap {
* @return the number of replicas in the map
*/
int size(String bpid) {
- LightWeightResizableGSet<Block, ReplicaInfo> m = null;
synchronized(mutex) {
- m = map.get(bpid);
- return m != null ? m.size() : 0;
+ FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+ return set != null ? set.size() : 0;
}
}
@@ -192,19 +207,17 @@ class ReplicaMap {
* @return a collection of the replicas belonging to the block pool
*/
Collection<ReplicaInfo> replicas(String bpid) {
- LightWeightResizableGSet<Block, ReplicaInfo> m = null;
- m = map.get(bpid);
- return m != null ? m.values() : null;
+ return map.get(bpid);
}
void initBlockPool(String bpid) {
checkBlockPool(bpid);
synchronized(mutex) {
- LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
- if (m == null) {
+ FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+ if (set == null) {
// Add an entry for block pool if it does not exist already
- m = new LightWeightResizableGSet<Block, ReplicaInfo>();
- map.put(bpid, m);
+ set = new FoldedTreeSet<>();
+ map.put(bpid, set);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
index 5bcd719..94749e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
@@ -52,12 +52,16 @@ public class BlockReportContext {
*/
private final long leaseId;
+ private final boolean sorted;
+
public BlockReportContext(int totalRpcs, int curRpc,
- long reportId, long leaseId) {
+ long reportId, long leaseId,
+ boolean sorted) {
this.totalRpcs = totalRpcs;
this.curRpc = curRpc;
this.reportId = reportId;
this.leaseId = leaseId;
+ this.sorted = sorted;
}
public int getTotalRpcs() {
@@ -75,4 +79,8 @@ public class BlockReportContext {
public long getLeaseId() {
return leaseId;
}
+
+ public boolean isSorted() {
+ return sorted;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index add4e73..b962855 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -131,7 +131,6 @@ public interface DatanodeProtocol {
* Each finalized block is represented as 3 longs. Each under-
* construction replica is represented as 4 longs.
* This is done instead of Block[] to reduce memory used by block reports.
- * @param reports report of blocks per storage
* @param context Context information for this block report.
*
* @return - the next command for DN to process.