You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/06/03 06:35:04 UTC

svn commit: r1488844 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/

Author: szetszwo
Date: Mon Jun  3 04:35:04 2013
New Revision: 1488844

URL: http://svn.apache.org/r1488844
Log:
svn merge -c 1353807 from trunk for HDFS-3498. Support replica removal in BlockPlacementPolicy and make BlockPlacementPolicyDefault extensible for reusing code in subclasses.

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1353807

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1488844&r1=1488843&r2=1488844&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Jun  3 04:35:04 2013
@@ -1032,6 +1032,10 @@ Release 2.0.3-alpha - 2013-02-06
 
     HDFS-3131. Improve TestStorageRestore. (Brandon Li via atm)
 
+    HDFS-3498. Support replica removal in BlockPlacementPolicy and make
+    BlockPlacementPolicyDefault extensible for reusing code in subclasses.
+    (Junping Du via szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-3429. DataNode reads checksums even if client does not need them (todd)

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1353807

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1488844&r1=1488843&r2=1488844&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Mon Jun  3 04:35:04 2013
@@ -2422,30 +2422,14 @@ assert storedBlock.findDatanode(dn) < 0 
     BlockCollection bc = getBlockCollection(b);
     final Map<String, List<DatanodeDescriptor>> rackMap
         = new HashMap<String, List<DatanodeDescriptor>>();
-    for(final Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
-        iter.hasNext(); ) {
-      final DatanodeDescriptor node = iter.next();
-      final String rackName = node.getNetworkLocation();
-      List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
-      if (datanodeList == null) {
-        datanodeList = new ArrayList<DatanodeDescriptor>();
-        rackMap.put(rackName, datanodeList);
-      }
-      datanodeList.add(node);
-    }
+    final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
+    final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
     
     // split nodes into two sets
-    // priSet contains nodes on rack with more than one replica
-    // remains contains the remaining nodes
-    final List<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
-    final List<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
-    for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
-      if (datanodeList.size() == 1 ) {
-        remains.add(datanodeList.get(0));
-      } else {
-        priSet.addAll(datanodeList);
-      }
-    }
+    // moreThanOne contains nodes on rack with more than one replica
+    // exactlyOne contains the remaining nodes
+    replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne,
+        exactlyOne);
     
     // pick one node to delete that favors the delete hint
     // otherwise pick one with least space from priSet if it is not empty
@@ -2455,30 +2439,18 @@ assert storedBlock.findDatanode(dn) < 0 
       // check if we can delete delNodeHint
       final DatanodeInfo cur;
       if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
-          && (priSet.contains(delNodeHint)
-              || (addedNode != null && !priSet.contains(addedNode))) ) {
+          && (moreThanOne.contains(delNodeHint)
+              || (addedNode != null && !moreThanOne.contains(addedNode))) ) {
         cur = delNodeHint;
       } else { // regular excessive replica removal
         cur = replicator.chooseReplicaToDelete(bc, b, replication,
-            priSet, remains);
+        		moreThanOne, exactlyOne);
       }
       firstOne = false;
 
-      // adjust rackmap, priSet, and remains
-      String rack = cur.getNetworkLocation();
-      final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
-      datanodes.remove(cur);
-      if (datanodes.isEmpty()) {
-        rackMap.remove(rack);
-      }
-      if (priSet.remove(cur)) {
-        if (datanodes.size() == 1) {
-          priSet.remove(datanodes.get(0));
-          remains.add(datanodes.get(0));
-        }
-      } else {
-        remains.remove(cur);
-      }
+      // adjust rackmap, moreThanOne, and exactlyOne
+      replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
+          exactlyOne, cur);
 
       nonExcess.remove(cur);
       addToExcessReplicate(cur, b);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1488844&r1=1488843&r2=1488844&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Mon Jun  3 04:35:04 2013
@@ -21,12 +21,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.net.NetworkTopology;
@@ -201,4 +203,79 @@ public abstract class BlockPlacementPoli
     return replicator;
   }
 
+  /**
+   * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.
+   *
+   * @param rackMap a map from rack to replica
+   * @param moreThanOne The List of replica nodes on rack which has more than 
+   *        one replica
+   * @param exactlyOne The List of replica nodes on rack with only one replica
+   * @param cur current replica to remove
+   */
+  public void adjustSetsWithChosenReplica(final Map<String, 
+      List<DatanodeDescriptor>> rackMap,
+      final List<DatanodeDescriptor> moreThanOne,
+      final List<DatanodeDescriptor> exactlyOne, final DatanodeInfo cur) {
+    
+    String rack = getRack(cur);
+    final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
+    datanodes.remove(cur);
+    if (datanodes.isEmpty()) {
+      rackMap.remove(rack);
+    }
+    if (moreThanOne.remove(cur)) {
+      if (datanodes.size() == 1) {
+        moreThanOne.remove(datanodes.get(0));
+        exactlyOne.add(datanodes.get(0));
+      }
+    } else {
+      exactlyOne.remove(cur);
+    }
+  }
+
+  /**
+   * Get rack string from a data node
+   * @param datanode
+   * @return rack of data node
+   */
+  protected String getRack(final DatanodeInfo datanode) {
+    return datanode.getNetworkLocation();
+  }
+  
+  /**
+   * Split data nodes into two sets, one set includes nodes on rack with
+   * more than one  replica, the other set contains the remaining nodes.
+   * 
+   * @param dataNodes
+   * @param rackMap a map from rack to datanodes
+   * @param moreThanOne contains nodes on rack with more than one replica
+   * @param exactlyOne remains contains the remaining nodes
+   */
+  public void splitNodesWithRack(
+      Collection<DatanodeDescriptor> dataNodes,
+      final Map<String, List<DatanodeDescriptor>> rackMap,
+      final List<DatanodeDescriptor> moreThanOne,
+      final List<DatanodeDescriptor> exactlyOne) {
+    for(DatanodeDescriptor node : dataNodes) {
+      final String rackName = getRack(node);
+      List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
+      if (datanodeList == null) {
+        datanodeList = new ArrayList<DatanodeDescriptor>();
+        rackMap.put(rackName, datanodeList);
+      }
+      datanodeList.add(node);
+    }
+    
+    // split nodes into two sets
+    for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
+      if (datanodeList.size() == 1) {
+        // exactlyOne contains nodes on rack with only one replica
+        exactlyOne.add(datanodeList.get(0));
+      } else {
+        // moreThanOne contains nodes on rack with more than one replica
+        moreThanOne.addAll(datanodeList);
+      }
+    }
+  }
+
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1488844&r1=1488843&r2=1488844&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Mon Jun  3 04:35:04 2013
@@ -57,17 +57,17 @@ public class BlockPlacementPolicyDefault
     "For more information, please enable DEBUG log level on "
     + LOG.getClass().getName();
 
-  private boolean considerLoad; 
+  protected boolean considerLoad; 
   private boolean preferLocalNode = true;
-  private NetworkTopology clusterMap;
+  protected NetworkTopology clusterMap;
   private FSClusterStats stats;
-  private long heartbeatInterval;   // interval for DataNode heartbeats
+  protected long heartbeatInterval;   // interval for DataNode heartbeats
   private long staleInterval;   // interval used to identify stale DataNodes
   
   /**
    * A miss of that many heartbeats is tolerated for replica deletion policy.
    */
-  private int tolerateHeartbeatMultiplier;
+  protected int tolerateHeartbeatMultiplier;
 
   BlockPlacementPolicyDefault(Configuration conf,  FSClusterStats stats,
                            NetworkTopology clusterMap) {
@@ -95,7 +95,7 @@ public class BlockPlacementPolicyDefault
         DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
   }
 
-  private ThreadLocal<StringBuilder> threadLocalBuilder =
+  protected ThreadLocal<StringBuilder> threadLocalBuilder =
     new ThreadLocal<StringBuilder>() {
     @Override
     protected StringBuilder initialValue() {
@@ -319,7 +319,7 @@ public class BlockPlacementPolicyDefault
    * choose a node on the same rack
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseLocalNode(
+  protected DatanodeDescriptor chooseLocalNode(
                                              DatanodeDescriptor localMachine,
                                              HashMap<Node, Node> excludedNodes,
                                              long blocksize,
@@ -354,7 +354,7 @@ public class BlockPlacementPolicyDefault
    * in the cluster.
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseLocalRack(
+  protected DatanodeDescriptor chooseLocalRack(
                                              DatanodeDescriptor localMachine,
                                              HashMap<Node, Node> excludedNodes,
                                              long blocksize,
@@ -406,7 +406,7 @@ public class BlockPlacementPolicyDefault
    * from the local rack
    */
     
-  private void chooseRemoteRack(int numOfReplicas,
+  protected void chooseRemoteRack(int numOfReplicas,
                                 DatanodeDescriptor localMachine,
                                 HashMap<Node, Node> excludedNodes,
                                 long blocksize,
@@ -430,7 +430,7 @@ public class BlockPlacementPolicyDefault
   /* Randomly choose one target from <i>nodes</i>.
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseRandom(
+  protected DatanodeDescriptor chooseRandom(
                                           String nodes,
                                           HashMap<Node, Node> excludedNodes,
                                           long blocksize,
@@ -476,7 +476,7 @@ public class BlockPlacementPolicyDefault
     
   /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
    */
-  private void chooseRandom(int numOfReplicas,
+  protected void chooseRandom(int numOfReplicas,
                             String nodes,
                             HashMap<Node, Node> excludedNodes,
                             long blocksize,
@@ -551,7 +551,7 @@ public class BlockPlacementPolicyDefault
    *         does not have too much load, 
    *         and the rack does not have too many nodes.
    */
-  private boolean isGoodTarget(DatanodeDescriptor node,
+  protected boolean isGoodTarget(DatanodeDescriptor node,
                                long blockSize, int maxTargetPerRack,
                                boolean considerLoad,
                                List<DatanodeDescriptor> results,                           
@@ -699,8 +699,7 @@ public class BlockPlacementPolicyDefault
 
     // pick replica from the first Set. If first is empty, then pick replicas
     // from second set.
-    Iterator<DatanodeDescriptor> iter =
-          first.isEmpty() ? second.iterator() : first.iterator();
+    Iterator<DatanodeDescriptor> iter = pickupReplicaSet(first, second);
 
     // Pick the node with the oldest heartbeat or with the least free space,
     // if all hearbeats are within the tolerable heartbeat interval
@@ -719,6 +718,20 @@ public class BlockPlacementPolicyDefault
     }
     return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
   }
+
+  /**
+   * Pick up replica node set for deleting replica as over-replicated. 
+   * First set contains replica nodes on rack with more than one
+   * replica while second set contains remaining replica nodes.
+   * So pick up first set if not empty. If first is empty, then pick second.
+   */
+  protected Iterator<DatanodeDescriptor> pickupReplicaSet(
+      Collection<DatanodeDescriptor> first,
+      Collection<DatanodeDescriptor> second) {
+    Iterator<DatanodeDescriptor> iter =
+        first.isEmpty() ? second.iterator() : first.iterator();
+    return iter;
+  }
   
   @VisibleForTesting
   void setPreferLocalNode(boolean prefer) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1488844&r1=1488843&r2=1488844&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Mon Jun  3 04:35:04 2013
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -78,7 +79,7 @@ public class TestReplicationPolicy {
         DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r2"),
         DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2"),
         DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d2/r3"),
-        DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3")        
+        DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3")
       };
 
     FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
@@ -952,4 +953,50 @@ public class TestReplicationPolicy {
     exception.expect(IllegalArgumentException.class);
     blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
   }
+  
+  /**
+   * Test for the chooseReplicaToDelete are processed based on 
+   * block locality and free space
+   */
+  @Test
+  public void testChooseReplicaToDelete() throws Exception {
+    List<DatanodeDescriptor> replicaNodeList = new 
+        ArrayList<DatanodeDescriptor>();
+    final Map<String, List<DatanodeDescriptor>> rackMap
+        = new HashMap<String, List<DatanodeDescriptor>>();
+    
+    dataNodes[0].setRemaining(4*1024*1024);
+    replicaNodeList.add(dataNodes[0]);
+    
+    dataNodes[1].setRemaining(3*1024*1024);
+    replicaNodeList.add(dataNodes[1]);
+    
+    dataNodes[2].setRemaining(2*1024*1024);
+    replicaNodeList.add(dataNodes[2]);
+    
+    dataNodes[5].setRemaining(1*1024*1024);
+    replicaNodeList.add(dataNodes[5]);
+    
+    List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
+    List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
+    replicator.splitNodesWithRack(
+        replicaNodeList, rackMap, first, second);
+    // dataNodes[0] and dataNodes[1] are in first set as their rack has two 
+    // replica nodes, while datanodes[2] and dataNodes[5] are in second set.
+    assertEquals(2, first.size());
+    assertEquals(2, second.size());
+    DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(
+        null, null, (short)3, first, second);
+    // Within first set, dataNodes[1] with less free space
+    assertEquals(chosenNode, dataNodes[1]);
+
+    replicator.adjustSetsWithChosenReplica(
+        rackMap, first, second, chosenNode);
+    assertEquals(0, first.size());
+    assertEquals(3, second.size());
+    // Within second set, dataNodes[5] with less free space
+    chosenNode = replicator.chooseReplicaToDelete(
+        null, null, (short)2, first, second);
+    assertEquals(chosenNode, dataNodes[5]);
+  }
 }