You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2011/10/23 16:15:09 UTC

svn commit: r1187889 - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/

Author: stevel
Date: Sun Oct 23 14:15:09 2011
New Revision: 1187889

URL: http://svn.apache.org/viewvc?rev=1187889&view=rev
Log:
HDFS-2485

Added:
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1187889&r1=1187888&r2=1187889&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sun Oct 23 14:15:09 2011
@@ -704,6 +704,9 @@ Release 0.23.0 - Unreleased
 
     HDFS-2471. Add federation documentation. (suresh)
 
+    HDFS-2485. Improve code layout and constants in UnderReplicatedBlocks
+    (stevel)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java?rev=1187889&r1=1187888&r2=1187889&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java Sun Oct 23 14:15:09 2011
@@ -26,19 +26,66 @@ import java.util.TreeSet;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 
-/** Keep track of under replication blocks.
- * Blocks have replication priority, with priority 0 indicating the highest
- * Blocks have only one replicas has the highest
+/**
+ * Keep prioritized queues of under replicated blocks.
+ * Blocks have replication priority, with priority {@link #QUEUE_HIGHEST_PRIORITY}
+ * indicating the highest priority.
+ * </p>
+ * Having a prioritised queue allows the {@link BlockManager} to select
+ * which blocks to replicate first -it tries to give priority to data
+ * that is most at risk or considered most valuable.
+ *
+ * <p/>
+ * The policy for choosing which priority to give added blocks
+ * is implemented in {@link #getPriority(Block, int, int, int)}.
+ * </p>
+ * <p>The queue order is as follows:</p>
+ * <ol>
+ *   <li>{@link #QUEUE_HIGHEST_PRIORITY}: the blocks that must be replicated
+ *   first. That is blocks with only one copy, or blocks with zero live
+ *   copies but a copy in a node being decommissioned. These blocks
+ *   are at risk of loss if the disk or server on which they
+ *   remain fails.</li>
+ *   <li>{@link #QUEUE_VERY_UNDER_REPLICATED}: blocks that are very
+ *   under-replicated compared to their expected values. Currently
+ *   that means the ratio of the ratio of actual:expected means that
+ *   there is <i>less than</i> 1:3.</li>. These blocks may not be at risk,
+ *   but they are clearly considered "important".
+ *   <li>{@link #QUEUE_UNDER_REPLICATED}: blocks that are also under
+ *   replicated, and the ratio of actual:expected is good enough that
+ *   they do not need to go into the {@link #QUEUE_VERY_UNDER_REPLICATED}
+ *   queue.</li>
+ *   <li>{@link #QUEUE_REPLICAS_BADLY_DISTRIBUTED}: there are as least as
+ *   many copies of a block as required, but the blocks are not adequately
+ *   distributed. Loss of a rack/switch could take all copies off-line.</li>
+ *   <li>{@link #QUEUE_WITH_CORRUPT_BLOCKS} This is for blocks that are corrupt
+ *   and for which there are no-non-corrupt copies (currently) available.
+ *   The policy here is to keep those corrupt blocks replicated, but give
+ *   blocks that are not corrupt higher priority.</li>
+ * </ol>
  */
 class UnderReplicatedBlocks implements Iterable<Block> {
+  /** The total number of queues : {@value} */
   static final int LEVEL = 5;
+  /** The queue with the highest priority: {@value} */
+  static final int QUEUE_HIGHEST_PRIORITY = 0;
+  /** The queue for blocks that are way below their expected value : {@value} */
+  static final int QUEUE_VERY_UNDER_REPLICATED = 1;
+  /** The queue for "normally" under-replicated blocks: {@value} */
+  static final int QUEUE_UNDER_REPLICATED = 2;
+  /** The queue for blocks that have the right number of replicas,
+   * but which the block manager felt were badly distributed: {@value}
+   */
+  static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3;
+  /** The queue for corrupt blocks: {@value} */
   static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
+  /** the queues themselves */
   private final List<NavigableSet<Block>> priorityQueues
-      = new ArrayList<NavigableSet<Block>>();
-      
+      = new ArrayList<NavigableSet<Block>>(LEVEL);
+
   /** Create an object. */
   UnderReplicatedBlocks() {
-    for(int i=0; i<LEVEL; i++) {
+    for (int i = 0; i < LEVEL; i++) {
       priorityQueues.add(new TreeSet<Block>());
     }
   }
@@ -47,7 +94,7 @@ class UnderReplicatedBlocks implements I
    * Empty the queues.
    */
   void clear() {
-    for(int i=0; i<LEVEL; i++) {
+    for (int i = 0; i < LEVEL; i++) {
       priorityQueues.get(i).clear();
     }
   }
@@ -55,7 +102,7 @@ class UnderReplicatedBlocks implements I
   /** Return the total number of under replication blocks */
   synchronized int size() {
     int size = 0;
-    for (int i=0; i<LEVEL; i++) {
+    for (int i = 0; i < LEVEL; i++) {
       size += priorityQueues.get(i).size();
     }
     return size;
@@ -64,12 +111,14 @@ class UnderReplicatedBlocks implements I
   /** Return the number of under replication blocks excluding corrupt blocks */
   synchronized int getUnderReplicatedBlockCount() {
     int size = 0;
-    for (int i=0; i<QUEUE_WITH_CORRUPT_BLOCKS; i++) {
-      size += priorityQueues.get(i).size();
+    for (int i = 0; i < LEVEL; i++) {
+      if (i != QUEUE_WITH_CORRUPT_BLOCKS) {
+        size += priorityQueues.get(i).size();
+      }
     }
     return size;
   }
-  
+
   /** Return the number of corrupt blocks */
   synchronized int getCorruptBlockSize() {
     return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
@@ -77,47 +126,58 @@ class UnderReplicatedBlocks implements I
   
   /** Check if a block is in the neededReplication queue */
   synchronized boolean contains(Block block) {
-    for(NavigableSet<Block> set : priorityQueues) {
-      if(set.contains(block)) { return true; }
+    for (NavigableSet<Block> set : priorityQueues) {
+      if (set.contains(block)) {
+        return true;
+      }
     }
     return false;
   }
-      
+
   /** Return the priority of a block
-   * @param block a under replication block
+   * @param block a under replicated block
    * @param curReplicas current number of replicas of the block
    * @param expectedReplicas expected number of replicas of the block
+   * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
    */
-  private int getPriority(Block block, 
+  private int getPriority(Block block,
                           int curReplicas, 
                           int decommissionedReplicas,
                           int expectedReplicas) {
     assert curReplicas >= 0 : "Negative replicas!";
     if (curReplicas >= expectedReplicas) {
-      return 3; // Block doesn't have enough racks
-    } else if(curReplicas==0) {
-      // If there are zero non-decommissioned replica but there are
+      // Block has enough copies, but not enough racks
+      return QUEUE_REPLICAS_BADLY_DISTRIBUTED;
+    } else if (curReplicas == 0) {
+      // If there are zero non-decommissioned replicas but there are
       // some decommissioned replicas, then assign them highest priority
       if (decommissionedReplicas > 0) {
-        return 0;
+        return QUEUE_HIGHEST_PRIORITY;
       }
-      return QUEUE_WITH_CORRUPT_BLOCKS; // keep these blocks in needed replication.
-    } else if(curReplicas==1) {
-      return 0; // highest priority
-    } else if(curReplicas*3<expectedReplicas) {
-      return 1;
+      //all we have are corrupt blocks
+      return QUEUE_WITH_CORRUPT_BLOCKS;
+    } else if (curReplicas == 1) {
+      //only on replica -risk of loss
+      // highest priority
+      return QUEUE_HIGHEST_PRIORITY;
+    } else if ((curReplicas * 3) < expectedReplicas) {
+      //there is less than a third as many blocks as requested;
+      //this is considered very under-replicated
+      return QUEUE_VERY_UNDER_REPLICATED;
     } else {
-      return 2;
+      //add to the normal queue for under replicated blocks
+      return QUEUE_UNDER_REPLICATED;
     }
   }
-      
+
   /** add a block to a under replication queue according to its priority
    * @param block a under replication block
    * @param curReplicas current number of replicas of the block
+   * @param decomissionedReplicas the number of decommissioned replicas
    * @param expectedReplicas expected number of replicas of the block
+   * @return true if the block was added to a queue.
    */
-  synchronized boolean add(
-                           Block block,
+  synchronized boolean add(Block block,
                            int curReplicas, 
                            int decomissionedReplicas,
                            int expectedReplicas) {
@@ -129,7 +189,7 @@ class UnderReplicatedBlocks implements I
         NameNode.stateChangeLog.debug(
           "BLOCK* NameSystem.UnderReplicationBlock.add:"
           + block
-          + " has only "+curReplicas
+          + " has only " + curReplicas
           + " replicas and need " + expectedReplicas
           + " replicas so is added to neededReplications"
           + " at priority level " + priLevel);
@@ -149,8 +209,22 @@ class UnderReplicatedBlocks implements I
                                oldExpectedReplicas);
     return remove(block, priLevel);
   }
-      
-  /** remove a block from a under replication queue given a priority*/
+
+  /**
+   * Remove a block from the under replication queues.
+   *
+   * The priLevel parameter is a hint of which queue to query
+   * first: if negative or &gt;= {@link #LEVEL} this shortcutting
+   * is not attmpted.
+   *
+   * If the block is not found in the nominated queue, an attempt is made to
+   * remove it from all queues.
+   *
+   * <i>Warning:</i> This is not a synchronized method.
+   * @param block block to remove
+   * @param priLevel expected privilege level
+   * @return true if the block was found and removed from one of the priority queues
+   */
   boolean remove(Block block, int priLevel) {
     if(priLevel >= 0 && priLevel < LEVEL 
         && priorityQueues.get(priLevel).remove(block)) {
@@ -164,8 +238,8 @@ class UnderReplicatedBlocks implements I
     } else {
       // Try to remove the block from all queues if the block was
       // not found in the queue for the given priority level.
-      for(int i=0; i<LEVEL; i++) {
-        if(priorityQueues.get(i).remove(block)) {
+      for (int i = 0; i < LEVEL; i++) {
+        if (priorityQueues.get(i).remove(block)) {
           if(NameNode.stateChangeLog.isDebugEnabled()) {
             NameNode.stateChangeLog.debug(
               "BLOCK* NameSystem.UnderReplicationBlock.remove: "
@@ -178,9 +252,24 @@ class UnderReplicatedBlocks implements I
     }
     return false;
   }
-      
-  /** update the priority level of a block */
-  synchronized void update(Block block, int curReplicas, 
+
+  /**
+   * Recalculate and potentially update the priority level of a block.
+   *
+   * If the block priority has changed from before an attempt is made to
+   * remove it from the block queue. Regardless of whether or not the block
+   * is in the block queue of (recalculate) priority, an attempt is made
+   * to add it to that queue. This ensures that the block will be
+   * in its expected priority queue (and only that queue) by the end of the
+   * method call.
+   * @param block a under replicated block
+   * @param curReplicas current number of replicas of the block
+   * @param decommissionedReplicas  the number of decommissioned replicas
+   * @param curExpectedReplicas expected number of replicas of the block
+   * @param curReplicasDelta the change in the replicate count from before
+   * @param expectedReplicasDelta the change in the expected replica count from before
+   */
+  synchronized void update(Block block, int curReplicas,
                            int decommissionedReplicas,
                            int curExpectedReplicas,
                            int curReplicasDelta, int expectedReplicasDelta) {
@@ -206,7 +295,7 @@ class UnderReplicatedBlocks implements I
         NameNode.stateChangeLog.debug(
           "BLOCK* NameSystem.UnderReplicationBlock.update:"
           + block
-          + " has only "+curReplicas
+          + " has only "+ curReplicas
           + " replicas and needs " + curExpectedReplicas
           + " replicas so is added to neededReplications"
           + " at priority level " + curPri);
@@ -218,17 +307,24 @@ class UnderReplicatedBlocks implements I
   synchronized BlockIterator iterator(int level) {
     return new BlockIterator(level);
   }
-    
+
   /** return an iterator of all the under replication blocks */
+  @Override
   public synchronized BlockIterator iterator() {
     return new BlockIterator();
   }
-  
+
+  /**
+   * An iterator over blocks.
+   */
   class BlockIterator implements Iterator<Block> {
     private int level;
     private boolean isIteratorForLevel = false;
     private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
 
+    /**
+     * Construct an iterator over all queues.
+     */
     private BlockIterator() {
       level=0;
       for(int i=0; i<LEVEL; i++) {
@@ -236,6 +332,10 @@ class UnderReplicatedBlocks implements I
       }
     }
 
+    /**
+     * Constrict an iterator for a single queue level
+     * @param l the priority level to iterate over
+     */
     private BlockIterator(int l) {
       level = l;
       isIteratorForLevel = true;
@@ -243,8 +343,9 @@ class UnderReplicatedBlocks implements I
     }
 
     private void update() {
-      if (isIteratorForLevel)
+      if (isIteratorForLevel) {
         return;
+      }
       while(level< LEVEL-1 && !iterators.get(level).hasNext()) {
         level++;
       }
@@ -252,30 +353,33 @@ class UnderReplicatedBlocks implements I
 
     @Override
     public Block next() {
-      if (isIteratorForLevel)
+      if (isIteratorForLevel) {
         return iterators.get(0).next();
+      }
       update();
       return iterators.get(level).next();
     }
 
     @Override
     public boolean hasNext() {
-      if (isIteratorForLevel)
+      if (isIteratorForLevel) {
         return iterators.get(0).hasNext();
+      }
       update();
       return iterators.get(level).hasNext();
     }
 
     @Override
     public void remove() {
-      if (isIteratorForLevel) 
+      if (isIteratorForLevel) {
         iterators.get(0).remove();
-      else
+      } else {
         iterators.get(level).remove();
+      }
     }
 
     int getPriority() {
       return level;
     }
-  }  
+  }
 }

Added: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java?rev=1187889&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java Sun Oct 23 14:15:09 2011
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestUnderReplicatedBlockQueues extends Assert {
+
+  /**
+   * Test that adding blocks with different replication counts puts them
+   * into different queues
+   * @throws Throwable if something goes wrong
+   */
+  @Test
+  public void testBlockPriorities() throws Throwable {
+    UnderReplicatedBlocks queues = new UnderReplicatedBlocks();
+    Block block1 = new Block(1);
+    Block block2 = new Block(2);
+    Block block_very_under_replicated = new Block(3);
+    Block block_corrupt = new Block(4);
+
+    //add a block with a single entry
+    assertAdded(queues, block1, 1, 0, 3);
+
+    assertEquals(1, queues.getUnderReplicatedBlockCount());
+    assertEquals(1, queues.size());
+    assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
+    //repeated additions fail
+    assertFalse(queues.add(block1, 1, 0, 3));
+
+    //add a second block with two replicas
+    assertAdded(queues, block2, 2, 0, 3);
+    assertEquals(2, queues.getUnderReplicatedBlockCount());
+    assertEquals(2, queues.size());
+    assertInLevel(queues, block2, UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED);
+    //now try to add a block that is corrupt
+    assertAdded(queues, block_corrupt, 0, 0, 3);
+    assertEquals(3, queues.size());
+    assertEquals(2, queues.getUnderReplicatedBlockCount());
+    assertEquals(1, queues.getCorruptBlockSize());
+    assertInLevel(queues, block_corrupt,
+                  UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
+
+    //insert a very under-replicated block
+    assertAdded(queues, block_very_under_replicated, 4, 0, 25);
+    assertInLevel(queues, block_very_under_replicated,
+                  UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED);
+
+  }
+
+  private void assertAdded(UnderReplicatedBlocks queues,
+                           Block block,
+                           int curReplicas,
+                           int decomissionedReplicas,
+                           int expectedReplicas) {
+    assertTrue("Failed to add " + block,
+               queues.add(block,
+                          curReplicas,
+                          decomissionedReplicas,
+                          expectedReplicas));
+  }
+
+  /**
+   * Determine whether or not a block is in a level without changing the API.
+   * Instead get the per-level iterator and run though it looking for a match.
+   * If the block is not found, an assertion is thrown.
+   *
+   * This is inefficient, but this is only a test case.
+   * @param queues queues to scan
+   * @param block block to look for
+   * @param level level to select
+   */
+  private void assertInLevel(UnderReplicatedBlocks queues,
+                             Block block,
+                             int level) {
+    UnderReplicatedBlocks.BlockIterator bi = queues.iterator(level);
+    while (bi.hasNext()) {
+      Block next = bi.next();
+      if (block.equals(next)) {
+        return;
+      }
+    }
+    fail("Block " + block + " not found in level " + level);
+  }
+}