You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2008/11/14 02:01:21 UTC

svn commit: r713891 - in /hadoop/core/branches/branch-0.18: ./ src/hdfs/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/

Author: hairong
Date: Thu Nov 13 17:01:21 2008
New Revision: 713891

URL: http://svn.apache.org/viewvc?rev=713891&view=rev
Log:
Merge -r 713887:713888 fromtrunk to main to move the change of HADOOP-4643 into branch 0.18

Added:
    hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestNodeCount.java
Modified:
    hadoop/core/branches/branch-0.18/CHANGES.txt
    hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java
    hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
    hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java

Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=713891&r1=713890&r2=713891&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Thu Nov 13 17:01:21 2008
@@ -25,6 +25,9 @@
 
     HADOOP-4556. Block went missing. (hairong)
 
+    HADOOP-4643. NameNode should exclude excessive replicas when counting
+    live replicas for a block. (hairong)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

Modified: hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java?rev=713891&r1=713890&r2=713891&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java Thu Nov 13 17:01:21 2008
@@ -169,7 +169,7 @@
   // eventually remove these extras.
   // Mapping: StorageID -> TreeSet<Block>
   //
-  private Map<String, Collection<Block>> excessReplicateMap = 
+  Map<String, Collection<Block>> excessReplicateMap = 
     new TreeMap<String, Collection<Block>>();
 
   //
@@ -2348,7 +2348,7 @@
           replIndex--;
           NameNode.stateChangeLog.info("BLOCK* "
               + "Removing block " + block
-              + " from neededReplications as it does not belong to any file.");
+              + " from neededReplications as it has enough replicas.");
           continue;
         }
 
@@ -2422,26 +2422,30 @@
     int live = 0;
     int decommissioned = 0;
     int corrupt = 0;
+    int excess = 0;
     Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
+    Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
     while(it.hasNext()) {
       DatanodeDescriptor node = it.next();
-      Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(block);
-      if ((nodes != null) && (nodes.contains(node)))
+      Collection<Block> excessBlocks = 
+        excessReplicateMap.get(node.getStorageID());
+      if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
         corrupt++;
-      else if(!node.isDecommissionInProgress() && !node.isDecommissioned())
-        live++;
-      else
+      else if (node.isDecommissionInProgress() || node.isDecommissioned())
         decommissioned++;
+      else if (excessBlocks != null && excessBlocks.contains(block)) {
+        excess++;
+      } else {
+        live++;
+      }
       containingNodes.add(node);
       // Check if this replica is corrupt
       // If so, do not select the node as src node
-      if ((nodes != null) && nodes.contains(node))
+      if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
         continue;
       if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
         continue; // already reached replication limit
       // the block must not be scheduled for removal on srcNode
-      Collection<Block> excessBlocks = 
-        excessReplicateMap.get(node.getStorageID());
       if(excessBlocks != null && excessBlocks.contains(block))
         continue;
       // never use already decommissioned nodes
@@ -2461,7 +2465,7 @@
         srcNode = node;
     }
     if(numReplicas != null)
-      numReplicas.initialize(live, decommissioned, corrupt);
+      numReplicas.initialize(live, decommissioned, corrupt, excess);
     return srcNode;
   }
 
@@ -3386,23 +3390,25 @@
    * A immutable object that stores the number of live replicas and
    * the number of decommissined Replicas.
    */
-  private static class NumberReplicas {
+  static class NumberReplicas {
     private int liveReplicas;
     private int decommissionedReplicas;
     private int corruptReplicas;
+    private int excessReplicas;
 
     NumberReplicas() {
-      initialize(0, 0, 0);
+      initialize(0, 0, 0, 0);
     }
 
-    NumberReplicas(int live, int decommissioned, int corrupt) {
-      initialize(live, decommissioned, corrupt);
+    NumberReplicas(int live, int decommissioned, int corrupt, int excess) {
+      initialize(live, decommissioned, corrupt, excess);
     }
 
-    void initialize(int live, int decommissioned, int corrupt) {
+    void initialize(int live, int decommissioned, int corrupt, int excess) {
       liveReplicas = live;
       decommissionedReplicas = decommissioned;
       corruptReplicas = corrupt;
+      excessReplicas = excess;
     }
 
     int liveReplicas() {
@@ -3414,6 +3420,9 @@
     int corruptReplicas() {
       return corruptReplicas;
     }
+    int excessReplicas() {
+      return excessReplicas;
+    }
   } 
 
   /**
@@ -3425,6 +3434,7 @@
     int count = 0;
     int live = 0;
     int corrupt = 0;
+    int excess = 0;
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
     while ( nodeIter.hasNext() ) {
       DatanodeDescriptor node = nodeIter.next();
@@ -3434,17 +3444,23 @@
       else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
         count++;
       }
-      else {
-        live++;
+      else  {
+        Collection<Block> blocksExcess = 
+          excessReplicateMap.get(node.getStorageID());
+        if (blocksExcess != null && blocksExcess.contains(b)) {
+          excess++;
+        } else {
+          live++;
+        }
       }
     }
-    return new NumberReplicas(live, count, corrupt);
+    return new NumberReplicas(live, count, corrupt, excess);
   }
 
   /**
    * Return the number of nodes that are live and decommissioned.
    */
-  private NumberReplicas countNodes(Block b) {
+  NumberReplicas countNodes(Block b) {
     return countNodes(b, blocksMap.nodeIterator(b));
   }
 

Modified: hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=713891&r1=713890&r2=713891&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Thu Nov 13 17:01:21 2008
@@ -45,7 +45,7 @@
  */
 public class MiniDFSCluster {
 
-  private class DataNodeProperties {
+  public class DataNodeProperties {
     DataNode datanode;
     Configuration conf;
     String[] dnArgs;
@@ -588,50 +588,55 @@
   /*
    * Shutdown a particular datanode
    */
-  boolean stopDataNode(int i) {
+  DataNodeProperties stopDataNode(int i) {
     if (i < 0 || i >= dataNodes.size()) {
-      return false;
+      return null;
     }
-    DataNode dn = dataNodes.remove(i).datanode;
+    DataNodeProperties dnprop = dataNodes.remove(i);
+    DataNode dn = dnprop.datanode;
     System.out.println("MiniDFSCluster Stopping DataNode " + 
                        dn.dnRegistration.getName() +
                        " from a total of " + (dataNodes.size() + 1) + 
                        " datanodes.");
     dn.shutdown();
     numDataNodes--;
-    return true;
+    return dnprop;
   }
 
-  /*
-   * Restart a particular datanode
+  /**
+   * Restart a datanode
+   * @param dnprop datanode's property
+   * @return true if restarting is successful
+   * @throws IOException
    */
-  synchronized boolean restartDataNode(int i) throws IOException {
-    if (i < 0 || i >= dataNodes.size()) {
-      return false;
-    }
-    DataNodeProperties dnprop = dataNodes.remove(i);
-    DataNode dn = dnprop.datanode;
+  public synchronized boolean restartDataNode(DataNodeProperties dnprop)
+  throws IOException {
     Configuration conf = dnprop.conf;
     String[] args = dnprop.dnArgs;
-    System.out.println("MiniDFSCluster Restart DataNode " + 
-                       dn.dnRegistration.getName() +
-                       " from a total of " + (dataNodes.size() + 1) + 
-                       " datanodes.");
-    dn.shutdown();
-
-    // recreate new datanode with the same configuration as the one
-    // that was stopped.
     Configuration newconf = new Configuration(conf); // save cloned config
     dataNodes.add(new DataNodeProperties(
                      DataNode.createDataNode(args, conf), 
                      newconf, args));
+    numDataNodes++;
     return true;
+
+  }
+  /*
+   * Restart a particular datanode
+   */
+  synchronized boolean restartDataNode(int i) throws IOException {
+    DataNodeProperties dnprop = stopDataNode(i);
+    if (dnprop == null) {
+      return false;
+    } else {
+      return restartDataNode(dnprop);
+    }
   }
 
   /*
    * Shutdown a datanode by name.
    */
-  synchronized boolean stopDataNode(String name) {
+  public synchronized DataNodeProperties stopDataNode(String name) {
     int i;
     for (i = 0; i < dataNodes.size(); i++) {
       DataNode dn = dataNodes.get(i).datanode;

Modified: hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java?rev=713891&r1=713890&r2=713891&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java (original)
+++ hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java Thu Nov 13 17:01:21 2008
@@ -364,7 +364,7 @@
     // corruptReplicasMap
     corruptReplicaSize = cluster.getNameNode().namesystem.
                           corruptReplicas.numCorruptReplicas(blk);
-    while (corruptReplicaSize != 0) {
+    while (corruptReplicaSize != 0 || replicaCount != numReplicas) {
       try {
         LOG.info("Looping until corrupt replica is invalidated");
         Thread.sleep(1000);

Added: hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestNodeCount.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestNodeCount.java?rev=713891&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestNodeCount.java (added)
+++ hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/dfs/TestNodeCount.java Thu Nov 13 17:01:21 2008
@@ -0,0 +1,105 @@
+package org.apache.hadoop.dfs;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.dfs.DFSTestUtil;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.dfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.dfs.Block;
+import org.apache.hadoop.dfs.FSNamesystem.NumberReplicas;
+
+import junit.framework.TestCase;
+
+/**
+ * Test if live nodes count per node is correct 
+ * so NN makes right decision for under/over-replicated blocks
+ */
+public class TestNodeCount extends TestCase {
+  public void testNodeCount() throws Exception {
+    // start a mini dfs cluster of 2 nodes
+    final Configuration conf = new Configuration();
+    final short REPLICATION_FACTOR = (short)2;
+    final MiniDFSCluster cluster = 
+      new MiniDFSCluster(conf, REPLICATION_FACTOR, true, null);
+    try {
+      final FSNamesystem namesystem = cluster.getNameNode().namesystem;
+      final FileSystem fs = cluster.getFileSystem();
+      
+      // populate the cluster with a one block file
+      final Path FILE_PATH = new Path("/testfile");
+      DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
+      DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
+      Block block = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
+
+      // keep a copy of all datanode descriptor
+      DatanodeDescriptor[] datanodes = (DatanodeDescriptor[])
+         namesystem.heartbeats.toArray(new DatanodeDescriptor[REPLICATION_FACTOR]);
+      
+      // start two new nodes
+      cluster.startDataNodes(conf, 2, true, null, null);
+      cluster.waitActive();
+      
+      // bring down first datanode
+      DatanodeDescriptor datanode = datanodes[0];
+      DataNodeProperties dnprop = cluster.stopDataNode(datanode.getName());
+      // make sure that NN detects that the datanode is down
+      synchronized (namesystem.heartbeats) {
+        datanode.setLastUpdate(0); // mark it dead
+        namesystem.heartbeatCheck();
+      }
+      // the block will be replicated
+      DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
+
+      // restart the first datanode
+      cluster.restartDataNode(dnprop);
+      cluster.waitActive();
+      
+      // check if excessive replica is detected
+      NumberReplicas num = null;
+      do {
+       num = namesystem.countNodes(block);
+      } while (num.excessReplicas() == 0);
+      
+      // find out a non-excess node
+      Iterator<DatanodeDescriptor> iter = namesystem.blocksMap.nodeIterator(block);
+      DatanodeDescriptor nonExcessDN = null;
+      while (iter.hasNext()) {
+        DatanodeDescriptor dn = iter.next();
+        Collection<Block> blocks = namesystem.excessReplicateMap.get(dn.getStorageID());
+        if (blocks == null || !blocks.contains(block) ) {
+          nonExcessDN = dn;
+          break;
+        }
+      }
+      assertTrue(nonExcessDN!=null);
+      
+      // bring down non excessive datanode
+      dnprop = cluster.stopDataNode(nonExcessDN.getName());
+      // make sure that NN detects that the datanode is down
+      synchronized (namesystem.heartbeats) {
+        nonExcessDN.setLastUpdate(0); // mark it dead
+        namesystem.heartbeatCheck();
+      }
+      
+      // The block should be replicated
+      do {
+        num = namesystem.countNodes(block);
+      } while (num.liveReplicas() != REPLICATION_FACTOR);
+      
+      // restart the first datanode
+      cluster.restartDataNode(dnprop);
+      cluster.waitActive();
+      
+      // check if excessive replica is detected
+      do {
+       num = namesystem.countNodes(block);
+      } while (num.excessReplicas() == 2);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}