You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/08/31 01:45:55 UTC

svn commit: r809439 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/

Author: hairong
Date: Sun Aug 30 23:45:55 2009
New Revision: 809439

URL: http://svn.apache.org/viewvc?rev=809439&view=rev
Log:
HDFS-15. All replicas end up on 1 rack. Contributed by Jitendra Nath Pandey.

Added:
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=809439&r1=809438&r2=809439&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Sun Aug 30 23:45:55 2009
@@ -182,6 +182,8 @@
     src/test/mapred-site.xml for mapreduce tests to run.  (Amareshwari
     Sriramadasu via szetszwo)
  
+    HDFS-15. All replicas end up on 1 rack. (Jitendra Nath Pandey via hairong)
+ 
 Release 0.20.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=809439&r1=809438&r2=809439&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Sun Aug 30 23:45:55 2009
@@ -105,6 +105,9 @@
   // Default number of replicas
   int defaultReplication;
 
+  // variable to enable check for enough racks 
+  boolean shouldCheckForEnoughRacks = true;
+
   /**
    * Last block index used for replication work.
    */
@@ -155,10 +158,13 @@
                             + " must be less than dfs.replication.max = "
                             + maxReplication);
     this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
+    this.shouldCheckForEnoughRacks = conf.get("topology.script.file.name") == null ? false
+                                                                             : true;
     FSNamesystem.LOG.info("defaultReplication = " + defaultReplication);
     FSNamesystem.LOG.info("maxReplication = " + maxReplication);
     FSNamesystem.LOG.info("minReplication = " + minReplication);
     FSNamesystem.LOG.info("maxReplicationStreams = " + maxReplicationStreams);
+    FSNamesystem.LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
   }
 
   void activate() {
@@ -613,6 +619,7 @@
     int requiredReplication, numEffectiveReplicas;
     List<DatanodeDescriptor> containingNodes;
     DatanodeDescriptor srcNode;
+    int additionalReplRequired;
 
     synchronized (namesystem) {
       synchronized (neededReplications) {
@@ -624,6 +631,7 @@
           replIndex--;
           return false;
         }
+
         requiredReplication = fileINode.getReplication();
 
         // get a source data-node
@@ -640,21 +648,32 @@
         // do not schedule more if enough replicas is already pending
         numEffectiveReplicas = numReplicas.liveReplicas() +
                                 pendingReplications.getNumReplicas(block);
-        if(numEffectiveReplicas >= requiredReplication) {
-          neededReplications.remove(block, priority); // remove from neededReplications
-          replIndex--;
-          NameNode.stateChangeLog.info("BLOCK* "
-              + "Removing block " + block
-              + " from neededReplications as it has enough replicas.");
-          return false;
+      
+        if (numEffectiveReplicas >= requiredReplication) {
+          if ( (pendingReplications.getNumReplicas(block) > 0) ||
+               (blockHasEnoughRacks(block)) ) {
+            neededReplications.remove(block, priority); // remove from neededReplications
+            replIndex--;
+            NameNode.stateChangeLog.info("BLOCK* "
+                + "Removing block " + block
+                + " from neededReplications as it has enough replicas.");
+            return false;
+          }
         }
+
+        if (numReplicas.liveReplicas() < requiredReplication) {
+          additionalReplRequired = requiredReplication - numEffectiveReplicas;
+        } else {
+          additionalReplRequired = 1; //Needed on a new rack
+        }
+
       }
     }
 
     // choose replication targets: NOT HOLDING THE GLOBAL LOCK
-    DatanodeDescriptor targets[] = replicator.chooseTarget(
-        requiredReplication - numEffectiveReplicas,
-        srcNode, containingNodes, null, block.getNumBytes());
+    DatanodeDescriptor targets[] = 
+                       replicator.chooseTarget(additionalReplRequired,
+                       srcNode, containingNodes, null, block.getNumBytes());
     if(targets.length == 0)
       return false;
 
@@ -675,13 +694,25 @@
         NumberReplicas numReplicas = countNodes(block);
         numEffectiveReplicas = numReplicas.liveReplicas() +
         pendingReplications.getNumReplicas(block);
-        if(numEffectiveReplicas >= requiredReplication) {
-          neededReplications.remove(block, priority); // remove from neededReplications
-          replIndex--;
-          NameNode.stateChangeLog.info("BLOCK* "
-              + "Removing block " + block
-              + " from neededReplications as it has enough replicas.");
-          return false;
+
+        if (numEffectiveReplicas >= requiredReplication) {
+          if ( (pendingReplications.getNumReplicas(block) > 0) ||
+               (blockHasEnoughRacks(block)) ) {
+            neededReplications.remove(block, priority); // remove from neededReplications
+            replIndex--;
+            NameNode.stateChangeLog.info("BLOCK* "
+                + "Removing block " + block
+                + " from neededReplications as it has enough replicas.");
+            return false;
+          }
+        }
+
+        if ( (numReplicas.liveReplicas() >= requiredReplication) &&
+             (!blockHasEnoughRacks(block)) ) {
+          if (srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
+            //No use continuing, unless a new rack in this case
+            return false;
+          }
         }
 
         // Add block to the to be replicated list
@@ -803,10 +834,13 @@
       synchronized (namesystem) {
         for (int i = 0; i < timedOutItems.length; i++) {
           NumberReplicas num = countNodes(timedOutItems[i]);
-          neededReplications.add(timedOutItems[i],
-                                 num.liveReplicas(),
-                                 num.decommissionedReplicas(),
-                                 getReplication(timedOutItems[i]));
+          if (isNeededReplication(timedOutItems[i], getReplication(timedOutItems[i]),
+                                 num.liveReplicas())) {
+            neededReplications.add(timedOutItems[i],
+                                   num.liveReplicas(),
+                                   num.decommissionedReplicas(),
+                                   getReplication(timedOutItems[i]));
+          }
         }
       }
       /* If we know the target datanodes where the replication timedout,
@@ -1058,9 +1092,11 @@
         NumberReplicas num = countNodes(block);
         int numCurrentReplica = num.liveReplicas();
         // add to under-replicated queue if need to be
-        if (neededReplications.add(block, numCurrentReplica, num
-            .decommissionedReplicas(), expectedReplication)) {
-          nrUnderReplicated++;
+        if (isNeededReplication(block, expectedReplication, numCurrentReplica)) {
+          if (neededReplications.add(block, numCurrentReplica, num
+              .decommissionedReplicas(), expectedReplication)) {
+            nrUnderReplicated++;
+          }
         }
 
         if (numCurrentReplica > expectedReplication) {
@@ -1239,8 +1275,11 @@
         NumberReplicas num = countNodes(block);
         int curReplicas = num.liveReplicas();
         int curExpectedReplicas = getReplication(block);
-        if (curExpectedReplicas > curReplicas) {
-          status = true;
+        if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
+          if (curExpectedReplicas > curReplicas) {
+            //Set to true only if strictly under-replicated
+            status = true;
+          }
           if (!neededReplications.contains(block) &&
             pendingReplications.getNumReplicas(block) == 0) {
             //
@@ -1293,16 +1332,23 @@
     synchronized (namesystem) {
       NumberReplicas repl = countNodes(block);
       int curExpectedReplicas = getReplication(block);
-      neededReplications.update(block, repl.liveReplicas(), repl
-          .decommissionedReplicas(), curExpectedReplicas, curReplicasDelta,
-          expectedReplicasDelta);
+      if (isNeededReplication(block, curExpectedReplicas, repl.liveReplicas())) {
+        neededReplications.update(block, repl.liveReplicas(), repl
+            .decommissionedReplicas(), curExpectedReplicas, curReplicasDelta,
+            expectedReplicasDelta);
+      } else {
+        int oldReplicas = repl.liveReplicas()-curReplicasDelta;
+        int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
+        neededReplications.remove(block, oldReplicas, repl.decommissionedReplicas(),
+                                  oldExpectedReplicas);
+      }
     }
   }
 
   void checkReplication(Block block, int numExpectedReplicas) {
     // filter out containingNodes that are marked for decommission.
     NumberReplicas number = countNodes(block);
-    if (number.liveReplicas() < numExpectedReplicas) {
+    if (isNeededReplication(block, numExpectedReplicas, number.liveReplicas())) { 
       neededReplications.add(block,
                              number.liveReplicas(),
                              number.decommissionedReplicas,
@@ -1384,7 +1430,68 @@
       return blocksToInvalidate.size();
     }
   }
+  
+  //Returns the number of racks over which a given block is replicated
+  //decommissioning/decommissioned nodes are not counted. corrupt replicas 
+  //are also ignored
+  int getNumberOfRacks(Block b) {
+    HashSet<String> rackSet = new HashSet<String>(0);
+    Collection<DatanodeDescriptor> corruptNodes = 
+                                  corruptReplicas.getNodes(b);
+    for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); 
+         it.hasNext();) {
+      DatanodeDescriptor cur = it.next();
+      if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
+        if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
+          String rackName = cur.getNetworkLocation();
+          if (!rackSet.contains(rackName)) {
+            rackSet.add(rackName);
+          }
+        }
+      }
+    }
+    return rackSet.size();
+  }
 
+  boolean blockHasEnoughRacks(Block b) {
+    if (!this.shouldCheckForEnoughRacks) {
+      return true;
+    }
+    boolean enoughRacks = false;;
+    Collection<DatanodeDescriptor> corruptNodes = 
+                                  corruptReplicas.getNodes(b);
+    int numExpectedReplicas = getReplication(b);
+    String rackName = null;
+    for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); 
+         it.hasNext();) {
+      DatanodeDescriptor cur = it.next();
+      if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
+        if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
+          if (numExpectedReplicas == 1) {
+            enoughRacks = true;
+            break;
+          }
+          String rackNameNew = cur.getNetworkLocation();
+          if (rackName == null) {
+            rackName = rackNameNew;
+          } else if (!rackName.equals(rackNameNew)) {
+            enoughRacks = true;
+            break;
+          }
+        }
+      }
+    }
+    return enoughRacks;
+  }
+
+  boolean isNeededReplication(Block b, int expectedReplication, int curReplicas) {
+    if ((curReplicas >= expectedReplication) && (blockHasEnoughRacks(b))) {
+      return false;
+    } else {
+      return true;
+    }
+  }
+  
   long getMissingBlocksCount() {
     // not locking
     return Math.max(missingBlocksInPrevIter, missingBlocksInCurIter);

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java?rev=809439&r1=809438&r2=809439&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java Sun Aug 30 23:45:55 2009
@@ -26,7 +26,7 @@
  * Blocks have only one replicas has the highest
  */
 class UnderReplicatedBlocks implements Iterable<Block> {
-  static final int LEVEL = 3;
+  static final int LEVEL = 4;
   private List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>();
       
   /* constructor */
@@ -53,7 +53,7 @@
     }
     return size;
   }
-        
+
   /* Check if a block is in the neededReplication queue */
   synchronized boolean contains(Block block) {
     for(TreeSet<Block> set:priorityQueues) {
@@ -71,8 +71,10 @@
                           int curReplicas, 
                           int decommissionedReplicas,
                           int expectedReplicas) {
-    if (curReplicas<0 || curReplicas>=expectedReplicas) {
-      return LEVEL; // no need to replicate
+    if (curReplicas<0) {
+      return LEVEL;
+    } else if (curReplicas>=expectedReplicas) {
+      return 3; // Block doesn't have enough racks
     } else if(curReplicas==0) {
       // If there are zero non-decommissioned replica but there are
       // some decommissioned replicas, then assign them highest priority
@@ -99,7 +101,7 @@
                            int curReplicas, 
                            int decomissionedReplicas,
                            int expectedReplicas) {
-    if(curReplicas<0 || expectedReplicas <= curReplicas) {
+    if(curReplicas<0) {
       return false;
     }
     int priLevel = getPriority(block, curReplicas, decomissionedReplicas,

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java?rev=809439&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java Sun Aug 30 23:45:55 2009
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
+import junit.framework.TestCase;
+
+public class TestBlocksWithNotEnoughRacks extends TestCase {
+
+  static {
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL) ;
+  }
+
+  //Creates a block with all datanodes on same rack
+  //Adds additional datanode on a different rack
+  //The block should be replicated to the new rack
+  public void testSufficientlyReplicatedBlocksWithNotEnoughRacks() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong("dfs.heartbeat.interval", 1L);
+    conf.setInt("dfs.replication.interval", 1);
+    conf.set("topology.script.file.name", "xyz");
+    final short REPLICATION_FACTOR = 3;
+    final String FILE_NAME = "/testFile";
+    final Path FILE_PATH = new Path(FILE_NAME);
+    //All datanodes are on the same rack
+    String racks[] = {"/rack1","/rack1","/rack1",} ;
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION_FACTOR, true, racks);
+    try {
+      // create a file with one block with a replication factor of 3
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
+      DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
+      
+      Block b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
+      final FSNamesystem namesystem = cluster.getNamesystem();
+      int numRacks = namesystem.blockManager.getNumberOfRacks(b);
+      
+      //Add a new datanode on a different rack
+      String newRacks[] = {"/rack2"} ;
+      cluster.startDataNodes(conf, 1, true, null, newRacks);
+      
+      Thread.sleep(5000);
+            
+      numRacks = namesystem.blockManager.getNumberOfRacks(b);
+      NumberReplicas number = namesystem.blockManager.countNodes(b);
+      int curReplicas = number.liveReplicas();
+
+      System.out.println("curReplicas = " + curReplicas);
+      System.out.println("numRacks = " + numRacks);
+      System.out.println("Size = " + namesystem.blockManager.neededReplications.size());
+
+      assertEquals(2,numRacks);
+      assertTrue(curReplicas == REPLICATION_FACTOR);
+      assertEquals(0,namesystem.blockManager.neededReplications.size());
+    } finally {
+      cluster.shutdown();
+    }
+    
+  }
+
+  public void testUnderReplicatedNotEnoughRacks() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong("dfs.heartbeat.interval", 1L);
+    conf.setInt("dfs.replication.interval", 1);
+    conf.setInt("dfs.replication.pending.timeout.sec", 1);
+    conf.set("topology.script.file.name", "xyz");
+    short REPLICATION_FACTOR = 3;
+    final String FILE_NAME = "/testFile";
+    final Path FILE_PATH = new Path(FILE_NAME);
+    //All datanodes are on the same rack
+    String racks[] = {"/rack1","/rack1","/rack1",} ;
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION_FACTOR, true, racks);
+    try {
+      // create a file with one block with a replication factor of 3
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
+      DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
+      
+      Block b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
+      final FSNamesystem namesystem = cluster.getNamesystem();
+      int numRacks = namesystem.blockManager.getNumberOfRacks(b);
+      
+      //Add a new datanode on a different rack
+      String newRacks[] = {"/rack2","/rack2"} ;
+      cluster.startDataNodes(conf, 2, true, null, newRacks);
+      REPLICATION_FACTOR = 5;
+      namesystem.setReplication(FILE_NAME, REPLICATION_FACTOR); 
+      Thread.sleep(30000);
+            
+
+      numRacks = namesystem.blockManager.getNumberOfRacks(b);
+      NumberReplicas number = namesystem.blockManager.countNodes(b);
+      int curReplicas = number.liveReplicas();
+
+      System.out.println("curReplicas = " + curReplicas);
+      System.out.println("numRacks = " + numRacks);
+      System.out.println("Size = " + namesystem.blockManager.neededReplications.size());
+
+      assertEquals(2,numRacks);
+      assertTrue(curReplicas == REPLICATION_FACTOR);
+      assertEquals(0,namesystem.blockManager.neededReplications.size());
+    } finally {
+      cluster.shutdown();
+    }
+    
+  }
+}