You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/11/17 04:25:21 UTC

svn commit: r1410690 - in /hadoop/common/branches/branch-1: ./ src/core/org/apache/hadoop/net/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/hdfs/server/namenode/

Author: szetszwo
Date: Sat Nov 17 03:25:18 2012
New Revision: 1410690

URL: http://svn.apache.org/viewvc?rev=1410690&view=rev
Log:
HDFS-3941. Backport HDFS-3498 and HDFS-3601: Support replica removal in BlockPlacementPolicy and make BlockPlacementPolicyDefault extensible for reusing code in subclasses, and add BlockPlacementPolicyWithNodeGroup to support block placement with 4-layer network topology.  Contributed by Junping Du and Jing Zhao

Added:
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1410690&r1=1410689&r2=1410690&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Sat Nov 17 03:25:18 2012
@@ -36,6 +36,12 @@ Release 1.2.0 - unreleased
     class pluggable and add NetworkTopologyWithNodeGroup, a 4-layer
     implementation of NetworkTopology.  (Junping Du and Jing Zhao via szetszwo)
 
+    HDFS-3941. Backport HDFS-3498 and HDFS-3601: Support replica removal in
+    BlockPlacementPolicy and make BlockPlacementPolicyDefault extensible for
+    reusing code in subclasses, and add BlockPlacementPolicyWithNodeGroup to
+    support block placement with 4-layer network topology.  (Junping Du and
+    Jing Zhao via szetszwo)
+
   IMPROVEMENTS
 
     HDFS-3515. Port HDFS-1457 to branch-1. (eli)

Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java?rev=1410690&r1=1410689&r2=1410690&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java (original)
+++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java Sat Nov 17 03:25:18 2012
@@ -732,6 +732,31 @@ public class NetworkTopology {
     }
     return tree.toString();
   }
+  
+  /**
+   * Divide networklocation string into two parts by last separator, and get the
+   * first part here.
+   * 
+   * @param networkLocation
+   * @return
+   */
+  public static String getFirstHalf(String networkLocation) {
+    int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+    return networkLocation.substring(0, index);
+  }
+
+  /**
+   * Divide networklocation string into two parts by last separator, and get the
+   * second part here.
+   * 
+   * @param networkLocation
+   * @return
+   */
+  public static String getLastHalf(String networkLocation) {
+    int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+    return networkLocation.substring(index);
+  }
+
 
   /* swap two array items */
   static protected void swap(Node[] nodes, int i, int j) {

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java?rev=1410690&r1=1410689&r2=1410690&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java Sat Nov 17 03:25:18 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
@@ -215,5 +216,80 @@ public abstract class BlockPlacementPoli
                         excludedNodes,
                         blocksize);
   }
+  
+  /**
+   * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.
+   *
+   * @param rackMap a map from rack to replica
+   * @param moreThanOne The List of replica nodes on rack which has more than 
+   *        one replica
+   * @param exactlyOne The List of replica nodes on rack with only one replica
+   * @param cur current replica to remove
+   */
+  public void adjustSetsWithChosenReplica(final Map<String, 
+      List<DatanodeDescriptor>> rackMap,
+      final List<DatanodeDescriptor> moreThanOne,
+      final List<DatanodeDescriptor> exactlyOne, final DatanodeInfo cur) {
+    
+    String rack = getRack(cur);
+    final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
+    datanodes.remove(cur);
+    if (datanodes.isEmpty()) {
+      rackMap.remove(rack);
+    }
+    if (moreThanOne.remove(cur)) {
+      if (datanodes.size() == 1) {
+        moreThanOne.remove(datanodes.get(0));
+        exactlyOne.add(datanodes.get(0));
+      }
+    } else {
+      exactlyOne.remove(cur);
+    }
+  }
+
+  /**
+   * Get rack string from a data node
+   * @param datanode
+   * @return rack of data node
+   */
+  protected String getRack(final DatanodeInfo datanode) {
+    return datanode.getNetworkLocation();
+  }
+  
+  /**
+   * Split data nodes into two sets, one set includes nodes on rack with
+   * more than one  replica, the other set contains the remaining nodes.
+   * 
+   * @param dataNodes
+   * @param rackMap a map from rack to datanodes
+   * @param moreThanOne contains nodes on rack with more than one replica
+   * @param exactlyOne remains contains the remaining nodes
+   */
+  public void splitNodesWithRack(
+      Collection<DatanodeDescriptor> dataNodes,
+      final Map<String, List<DatanodeDescriptor>> rackMap,
+      final List<DatanodeDescriptor> moreThanOne,
+      final List<DatanodeDescriptor> exactlyOne) {
+    for(DatanodeDescriptor node : dataNodes) {
+      final String rackName = getRack(node);
+      List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
+      if (datanodeList == null) {
+        datanodeList = new ArrayList<DatanodeDescriptor>();
+        rackMap.put(rackName, datanodeList);
+      }
+      datanodeList.add(node);
+    }
+    
+    // split nodes into two sets
+    for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
+      if (datanodeList.size() == 1) {
+        // exactlyOne contains nodes on rack with only one replica
+        exactlyOne.add(datanodeList.get(0));
+      } else {
+        // moreThanOne contains nodes on rack with more than one replica
+        moreThanOne.addAll(datanodeList);
+      }
+    }
+  }
 
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=1410690&r1=1410689&r2=1410690&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java Sat Nov 17 03:25:18 2012
@@ -47,8 +47,8 @@ import org.apache.hadoop.net.NodeBase;
  */
 @InterfaceAudience.Private
 public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
-  private boolean considerLoad; 
-  private NetworkTopology clusterMap;
+  protected boolean considerLoad; 
+  protected NetworkTopology clusterMap;
   private FSClusterStats stats;
   private long staleInterval;   // interval used to identify stale DataNodes
 
@@ -132,6 +132,7 @@ public class BlockPlacementPolicyDefault
       new ArrayList<DatanodeDescriptor>(chosenNodes);
     for (Node node:chosenNodes) {
       excludedNodes.put(node, node);
+      adjustExcludedNodes(excludedNodes, node);
     }
       
     if (!clusterMap.contains(writer)) {
@@ -225,7 +226,7 @@ public class BlockPlacementPolicyDefault
    * choose a node on the same rack
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
+  protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
       HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
       List<DatanodeDescriptor> results, boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
@@ -256,7 +257,7 @@ public class BlockPlacementPolicyDefault
    * in the cluster.
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
+  protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
       HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
       List<DatanodeDescriptor> results, boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
@@ -303,8 +304,7 @@ public class BlockPlacementPolicyDefault
    * if not enough nodes are available, choose the remaining ones 
    * from the local rack
    */
-    
-  private void chooseRemoteRack(int numOfReplicas,
+  protected void chooseRemoteRack(int numOfReplicas,
                                 DatanodeDescriptor localMachine,
                                 HashMap<Node, Node> excludedNodes,
                                 long blocksize,
@@ -328,7 +328,7 @@ public class BlockPlacementPolicyDefault
   /* Randomly choose one target from <i>nodes</i>.
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseRandom(String nodes,
+  protected DatanodeDescriptor chooseRandom(String nodes,
                                           HashMap<Node, Node> excludedNodes,
                                           long blocksize,
                                           int maxNodesPerRack,
@@ -342,11 +342,12 @@ public class BlockPlacementPolicyDefault
         (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
 
       Node oldNode = excludedNodes.put(chosenNode, chosenNode);
-      if (oldNode == null) { // choosendNode was not in the excluded list
+      if (oldNode == null) { // chosendNode was not in the excluded list
         numOfAvailableNodes--;
         if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results,
             avoidStaleNodes)) {
           results.add(chosenNode);
+          adjustExcludedNodes(excludedNodes, chosenNode);
           return chosenNode;
         }
       }
@@ -358,7 +359,7 @@ public class BlockPlacementPolicyDefault
     
   /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
    */
-  private void chooseRandom(int numOfReplicas,
+  protected void chooseRandom(int numOfReplicas,
                             String nodes,
                             HashMap<Node, Node> excludedNodes,
                             long blocksize,
@@ -380,6 +381,7 @@ public class BlockPlacementPolicyDefault
             avoidStaleNodes)) {
           numOfReplicas--;
           results.add(chosenNode);
+          adjustExcludedNodes(excludedNodes, chosenNode);
         }
       }
     }
@@ -389,7 +391,22 @@ public class BlockPlacementPolicyDefault
                                            "Not able to place enough replicas");
     }
   }
-    
+   
+  /**
+   * After choosing a node to place replica, adjust excluded nodes accordingly.
+   * It should do nothing here as chosenNode is already put into exlcudeNodes, 
+   * but it can be overridden in subclass to put more related nodes into 
+   * excludedNodes.
+   * 
+   * @param excludedNodes
+   * @param chosenNode
+   * @return Number of nodes that should be added into the excludedNodes
+   */
+  protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
+      Node chosenNode) {
+    // do nothing
+  }
+  
   /* judge if a node is a good target.
    * return true if <i>node</i> has enough space, 
    * does not have too much load, and the rack does not have too many nodes
@@ -417,7 +434,7 @@ public class BlockPlacementPolicyDefault
    *         does not have too much load, 
    *         and the rack does not have too many nodes.
    */
-  private boolean isGoodTarget(DatanodeDescriptor node,
+  protected boolean isGoodTarget(DatanodeDescriptor node,
                                long blockSize, int maxTargetPerLoc,
                                boolean considerLoad,
                                List<DatanodeDescriptor> results,
@@ -548,8 +565,7 @@ public class BlockPlacementPolicyDefault
 
     // pick replica from the first Set. If first is empty, then pick replicas
     // from second set.
-    Iterator<DatanodeDescriptor> iter =
-          first.isEmpty() ? second.iterator() : first.iterator();
+    Iterator<DatanodeDescriptor> iter = pickupReplicaSet(first, second);
 
     // pick node with least free space
     while (iter.hasNext() ) {
@@ -562,5 +578,19 @@ public class BlockPlacementPolicyDefault
     }
     return cur;
   }
+  
+  /**
+   * Pick up replica node set for deleting replica as over-replicated. First set
+   * contains replica nodes on rack with more than one replica while second set
+   * contains remaining replica nodes. So pick up first set if not empty. If
+   * first is empty, then pick second.
+   */
+  protected Iterator<DatanodeDescriptor> pickupReplicaSet(
+      Collection<DatanodeDescriptor> first,
+      Collection<DatanodeDescriptor> second) {
+    Iterator<DatanodeDescriptor> iter = first.isEmpty() ? second.iterator()
+        : first.iterator();
+    return iter;
+  }
 }
 

Added: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java?rev=1410690&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java (added)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java Sat Nov 17 03:25:18 2012
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+
+/** The class is responsible for choosing the desired number of targets
+ * for placing block replicas on environment with node-group layer.
+ * The replica placement strategy is adjusted to:
+ * If the writer is on a datanode, the 1st replica is placed on the local 
+ *     node (or local node-group), otherwise a random datanode. 
+ * The 2nd replica is placed on a datanode that is on a different rack with 1st
+ *     replica node. 
+ * The 3rd replica is placed on a datanode which is on a different node-group
+ *     but the same rack as the second replica node.
+ */
+public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault {
+
+  BlockPlacementPolicyWithNodeGroup(Configuration conf,  FSClusterStats stats,
+      NetworkTopology clusterMap) {
+    initialize(conf, stats, clusterMap);
+  }
+
+  BlockPlacementPolicyWithNodeGroup() {
+  }
+
+  @Override
+  public void initialize(Configuration conf,  FSClusterStats stats,
+          NetworkTopology clusterMap) {
+    super.initialize(conf, stats, clusterMap);
+  }
+
+  /** choose local node of localMachine as the target.
+   * if localMachine is not available, choose a node on the same nodegroup or 
+   * rack instead.
+   * @return the chosen node
+   */
+  @Override
+  protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+        throws NotEnoughReplicasException {
+    // if no local machine, randomly choose one node
+    if (localMachine == null)
+      return chooseRandom(NodeBase.ROOT, excludedNodes, 
+          blocksize, maxNodesPerRack, results, avoidStaleNodes);
+
+    // otherwise try local machine first
+    Node oldNode = excludedNodes.put(localMachine, localMachine);
+    if (oldNode == null) { // was not in the excluded list
+      if (isGoodTarget(localMachine, blocksize,
+          maxNodesPerRack, false, results, avoidStaleNodes)) {
+        results.add(localMachine);
+        // Nodes under same nodegroup should be excluded.
+        addNodeGroupToExcludedNodes(excludedNodes,
+            localMachine.getNetworkLocation());
+        return localMachine;
+      }
+    } 
+
+    // try a node on local node group
+    DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
+        (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, 
+        blocksize, maxNodesPerRack, results, avoidStaleNodes);
+    if (chosenNode != null) {
+      return chosenNode;
+    }
+    // try a node on local rack
+    return chooseLocalRack(localMachine, excludedNodes, 
+        blocksize, maxNodesPerRack, results, avoidStaleNodes);
+  }
+
+  @Override
+  protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
+      Node chosenNode) {
+    // as node-group aware implementation, it should make sure no two replica
+    // are placing on the same node group.
+    addNodeGroupToExcludedNodes(excludedNodes, chosenNode.getNetworkLocation());
+  }
+
+  // add all nodes under specific nodegroup to excludedNodes.
+  private void addNodeGroupToExcludedNodes(HashMap<Node, Node> excludedNodes,
+      String nodeGroup) {
+    List<Node> leafNodes = clusterMap.getLeaves(nodeGroup);
+    for (Node node : leafNodes) {
+      excludedNodes.put(node, node);
+    }
+  }
+
+  @Override
+  protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+      throws NotEnoughReplicasException {
+    // no local machine, so choose a random machine
+    if (localMachine == null) {
+      return chooseRandom(NodeBase.ROOT, excludedNodes, 
+                          blocksize, maxNodesPerRack, results, 
+                          avoidStaleNodes);
+    }
+
+    // choose one from the local rack, but off-nodegroup
+    try {
+      return chooseRandom(NetworkTopology.getFirstHalf(
+                              localMachine.getNetworkLocation()),
+                          excludedNodes, blocksize, 
+                          maxNodesPerRack, results, 
+                          avoidStaleNodes);
+    } catch (NotEnoughReplicasException e1) {
+      // find the second replica
+      DatanodeDescriptor newLocal=null;
+      for(Iterator<DatanodeDescriptor> iter=results.iterator();
+          iter.hasNext();) {
+        DatanodeDescriptor nextNode = iter.next();
+        if (nextNode != localMachine) {
+          newLocal = nextNode;
+          break;
+        }
+      }
+      if (newLocal != null) {
+        try {
+          return chooseRandom(
+              clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
+              blocksize, maxNodesPerRack, results, avoidStaleNodes);
+        } catch(NotEnoughReplicasException e2) {
+          //otherwise randomly choose one from the network
+          return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+              maxNodesPerRack, results, avoidStaleNodes);
+        }
+      } else {
+        //otherwise randomly choose one from the network
+        return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
+      }
+    }
+  }
+
+  @Override
+  protected void chooseRemoteRack(int numOfReplicas,
+      DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes,
+      long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
+      boolean avoidStaleNodes) throws NotEnoughReplicasException {
+    int oldNumOfReplicas = results.size();
+    // randomly choose one node from remote racks
+    try {
+      chooseRandom(
+          numOfReplicas,
+          "~" + NetworkTopology.getFirstHalf(localMachine.getNetworkLocation()),
+          excludedNodes, blocksize, maxReplicasPerRack, results,
+          avoidStaleNodes);
+    } catch (NotEnoughReplicasException e) {
+      chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
+          localMachine.getNetworkLocation(), excludedNodes, blocksize,
+          maxReplicasPerRack, results, avoidStaleNodes);
+    }
+  }
+
+  /* choose one node from the nodegroup that <i>localMachine</i> is on.
+   * if no such node is available, choose one node from the nodegroup where
+   * a second replica is on.
+   * if still no such node is available, choose a random node in the cluster.
+   * @return the chosen node
+   */
+  private DatanodeDescriptor chooseLocalNodeGroup(
+      NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+      throws NotEnoughReplicasException {
+    // no local machine, so choose a random machine
+    if (localMachine == null) {
+      return chooseRandom(NodeBase.ROOT, excludedNodes, 
+      blocksize, maxNodesPerRack, results, avoidStaleNodes);
+    }
+
+    // choose one from the local node group
+    try {
+      return chooseRandom(
+          clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
+          excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
+    } catch (NotEnoughReplicasException e1) {
+      // find the second replica
+      DatanodeDescriptor newLocal=null;
+      for(Iterator<DatanodeDescriptor> iter=results.iterator();
+        iter.hasNext();) {
+        DatanodeDescriptor nextNode = iter.next();
+        if (nextNode != localMachine) {
+          newLocal = nextNode;
+          break;
+        }
+      }
+      if (newLocal != null) {
+        try {
+          return chooseRandom(
+              clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
+              excludedNodes, blocksize, maxNodesPerRack, results,
+              avoidStaleNodes);
+        } catch(NotEnoughReplicasException e2) {
+          //otherwise randomly choose one from the network
+          return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+              maxNodesPerRack, results, avoidStaleNodes);
+        }
+      } else {
+        //otherwise randomly choose one from the network
+        return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
+      }
+    }
+  }
+
+  @Override
+  protected String getRack(final DatanodeInfo cur) {
+    String nodeGroupString = cur.getNetworkLocation();
+    return NetworkTopology.getFirstHalf(nodeGroupString);
+  }
+
+  /**
+   * Pick up replica node set for deleting replica as over-replicated. 
+   * First set contains replica nodes on rack with more than one
+   * replica while second set contains remaining replica nodes.
+   * If first is not empty, divide first set into two subsets:
+   *   moreThanOne contains nodes on nodegroup with more than one replica
+   *   exactlyOne contains the remaining nodes in first set
+   * then pickup priSet if not empty.
+   * If first is empty, then pick second.
+   */
+  @Override
+  public Iterator<DatanodeDescriptor> pickupReplicaSet(
+      Collection<DatanodeDescriptor> first,
+      Collection<DatanodeDescriptor> second) {
+    // If no replica within same rack, return directly.
+    if (first.isEmpty()) {
+      return second.iterator();
+    }
+    // Split data nodes in the first set into two sets, 
+    // moreThanOne contains nodes on nodegroup with more than one replica
+    // exactlyOne contains the remaining nodes
+    Map<String, List<DatanodeDescriptor>> nodeGroupMap = 
+        new HashMap<String, List<DatanodeDescriptor>>();
+    
+    for(DatanodeDescriptor node : first) {
+      final String nodeGroupName = 
+          NetworkTopology.getLastHalf(node.getNetworkLocation());
+      List<DatanodeDescriptor> datanodeList = 
+          nodeGroupMap.get(nodeGroupName);
+      if (datanodeList == null) {
+        datanodeList = new ArrayList<DatanodeDescriptor>();
+        nodeGroupMap.put(nodeGroupName, datanodeList);
+      }
+      datanodeList.add(node);
+    }
+    
+    final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
+    final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
+    // split nodes into two sets
+    for(List<DatanodeDescriptor> datanodeList : nodeGroupMap.values()) {
+      if (datanodeList.size() == 1 ) {
+        // exactlyOne contains nodes on nodegroup with exactly one replica
+        exactlyOne.add(datanodeList.get(0));
+      } else {
+        // moreThanOne contains nodes on nodegroup with more than one replica
+        moreThanOne.addAll(datanodeList);
+      }
+    }
+    
+    Iterator<DatanodeDescriptor> iter =
+        moreThanOne.isEmpty() ? exactlyOne.iterator() : moreThanOne.iterator();
+    return iter;
+  } 
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1410690&r1=1410689&r2=1410690&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Nov 17 03:25:18 2012
@@ -45,7 +45,6 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Random;
 import java.util.Set;
@@ -4142,35 +4141,15 @@ public class FSNamesystem implements FSC
                               DatanodeDescriptor delNodeHint) {
     INodeFile inode = blocksMap.getINode(b);
     // first form a rack to datanodes map and
-    HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
-      new HashMap<String, ArrayList<DatanodeDescriptor>>();
-    for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
-         iter.hasNext();) {
-      DatanodeDescriptor node = iter.next();
-      String rackName = node.getNetworkLocation();
-      ArrayList<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
-      if(datanodeList==null) {
-        datanodeList = new ArrayList<DatanodeDescriptor>();
-      }
-      datanodeList.add(node);
-      rackMap.put(rackName, datanodeList);
-    }
-    
+    final Map<String, List<DatanodeDescriptor>> rackMap = 
+        new HashMap<String, List<DatanodeDescriptor>>();
+    final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
+    final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
+
     // split nodes into two sets
-    // priSet contains nodes on rack with more than one replica
-    // remains contains the remaining nodes
-    ArrayList<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
-    ArrayList<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
-    for( Iterator<Entry<String, ArrayList<DatanodeDescriptor>>> iter = 
-      rackMap.entrySet().iterator(); iter.hasNext(); ) {
-      Entry<String, ArrayList<DatanodeDescriptor>> rackEntry = iter.next();
-      ArrayList<DatanodeDescriptor> datanodeList = rackEntry.getValue(); 
-      if( datanodeList.size() == 1 ) {
-        remains.add(datanodeList.get(0));
-      } else {
-        priSet.addAll(datanodeList);
-      }
-    }
+    // moreThanOne contains nodes on rack with more than one replica
+    // exactlyOne contains the remaining nodes
+    replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
     
     // pick one node to delete that favors the delete hint
     // otherwise pick one with least space from priSet if it is not empty
@@ -4180,29 +4159,19 @@ public class FSNamesystem implements FSC
       DatanodeInfo cur = null;
 
       // check if we can del delNodeHint
-      if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) &&
-            (priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) {
+      if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
+          && (moreThanOne.contains(delNodeHint)
+                || (addedNode != null && !moreThanOne.contains(addedNode))) ) {
           cur = delNodeHint;
       } else { // regular excessive replica removal
-        cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, remains);          
+        cur = replicator.chooseReplicaToDelete(inode, b, replication,
+                       moreThanOne, exactlyOne);
       }
 
       firstOne = false;
-      // adjust rackmap, priSet, and remains
-      String rack = cur.getNetworkLocation();
-      ArrayList<DatanodeDescriptor> datanodes = rackMap.get(rack);
-      datanodes.remove(cur);
-      if(datanodes.isEmpty()) {
-        rackMap.remove(rack);
-      }
-      if( priSet.remove(cur) ) {
-        if (datanodes.size() == 1) {
-          priSet.remove(datanodes.get(0));
-          remains.add(datanodes.get(0));
-        }
-      } else {
-        remains.remove(cur);
-      }
+      // adjust rackmap, moreThanOne, and exactlyOne
+      replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
+          exactlyOne, cur);
 
       nonExcess.remove(cur);
 

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1410690&r1=1410689&r2=1410690&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java Sat Nov 17 03:25:18 2012
@@ -31,21 +31,24 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 
 /** Utilities for HDFS tests */
 public class DFSTestUtil {
@@ -372,4 +375,9 @@ public class DFSTestUtil {
 
     return result;
   }  
+  
+  public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
+      String rackLocation) {
+    return new DatanodeDescriptor(new DatanodeID(ipAddr), rackLocation);
+  }
 }

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java?rev=1410690&r1=1410689&r2=1410690&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java Sat Nov 17 03:25:18 2012
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import junit.framework.TestCase;
 
@@ -682,5 +683,46 @@ public class TestReplicationPolicy exten
     blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
     assertEquals(blocksReplWorkMultiplier, 3);
   }
-  
+    
+  /**
+   * Test for the chooseReplicaToDelete are processed based on block locality
+   * and free space
+   */
+  public void testChooseReplicaToDelete() throws Exception {
+    List<DatanodeDescriptor> replicaNodeList = new ArrayList<DatanodeDescriptor>();
+    final Map<String, List<DatanodeDescriptor>> rackMap = new HashMap<String, List<DatanodeDescriptor>>();
+
+    dataNodes[0].setRemaining(4 * 1024 * 1024);
+    replicaNodeList.add(dataNodes[0]);
+
+    dataNodes[1].setRemaining(3 * 1024 * 1024);
+    replicaNodeList.add(dataNodes[1]);
+
+    dataNodes[2].setRemaining(2 * 1024 * 1024);
+    replicaNodeList.add(dataNodes[2]);
+
+    dataNodes[5].setRemaining(1 * 1024 * 1024);
+    replicaNodeList.add(dataNodes[5]);
+
+    List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
+    List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
+    replicator.splitNodesWithRack(replicaNodeList, rackMap, first, second);
+    // dataNodes[0] and dataNodes[1] are in first set as their rack has two
+    // replica nodes, while datanodes[2] and dataNodes[5] are in second set.
+    assertEquals(2, first.size());
+    assertEquals(2, second.size());
+    DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(null,
+        null, (short) 3, first, second);
+    // Within first set, dataNodes[1] with less free space
+    assertEquals(chosenNode, dataNodes[1]);
+
+    replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
+    assertEquals(0, first.size());
+    assertEquals(3, second.size());
+    // Within second set, dataNodes[5] with less free space
+    chosenNode = replicator.chooseReplicaToDelete(null, null, (short) 2, first,
+        second);
+    assertEquals(chosenNode, dataNodes[5]);
+  }
+
 }

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java?rev=1410690&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java Sat Nov 17 03:25:18 2012
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.junit.Test;
+
+public class TestReplicationPolicyWithNodeGroup {
+  private static final int BLOCK_SIZE = 1024;
+  private static final int NUM_OF_DATANODES = 8;
+  private static final Configuration CONF = new Configuration();
+  private static final NetworkTopology cluster;
+  private static final NameNode namenode;
+  private static final BlockPlacementPolicy replicator;
+  private static final String filename = "/dummyfile.txt";
+
+  private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
+      DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
+      DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
+      DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n2"),
+      DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2/n3"),
+      DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
+      DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n4"),
+      DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/n5"),
+      DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/n6")
+  };
+
+  private final static DatanodeDescriptor NODE = 
+      new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7"));
+
+  static {
+    try {
+      FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
+      CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+      // Set properties to make HDFS aware of NodeGroup.
+      CONF.set("dfs.block.replicator.classname", 
+          "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyWithNodeGroup");
+      CONF.set("net.topology.impl", 
+          "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
+      NameNode.format(CONF);
+      namenode = new NameNode(CONF);
+    } catch (IOException e) {
+      e.printStackTrace();
+      throw (RuntimeException)new RuntimeException().initCause(e);
+    }
+    FSNamesystem fsNamesystem = FSNamesystem.getFSNamesystem();
+    replicator = fsNamesystem.replicator;
+    cluster = fsNamesystem.clusterMap;
+    // construct network topology
+    for(int i=0; i<NUM_OF_DATANODES; i++) {
+      cluster.add(dataNodes[i]);
+    }
+    setupDataNodeCapacity();
+  }
+
+  private static void setupDataNodeCapacity() {
+    for(int i=0; i<NUM_OF_DATANODES; i++) {
+      dataNodes[i].updateHeartbeat(
+          2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
+    }
+  }
+
+  /**
+   * In this testcase, client is dataNodes[0]. So the 1st replica should be
+   * placed on dataNodes[0], the 2nd replica should be placed on 
+   * different rack and third should be placed on different node (and node group)
+   * of rack chosen for 2nd node.
+   * The only excpetion is when the <i>numOfReplicas</i> is 2, 
+   * the 1st is on dataNodes[0] and the 2nd is on a different rack.
+   * @throws Exception
+   */
+  @Test
+  public void testChooseTarget1() throws Exception {
+    dataNodes[0].updateHeartbeat(
+        2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 
+        FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 4); // overloaded
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 0, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename, 1, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertEquals(targets[0], dataNodes[0]);
+
+    targets = replicator.chooseTarget(filename, 2, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertEquals(targets[0], dataNodes[0]);
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+    targets = replicator.chooseTarget(filename, 3, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertEquals(targets[0], dataNodes[0]);
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+    assertFalse(cluster.isOnSameNodeGroup(targets[1], targets[2]));
+
+    targets = replicator.chooseTarget(filename, 4, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 4);
+    assertEquals(targets[0], dataNodes[0]);
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+               cluster.isOnSameRack(targets[2], targets[3]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+    // Make sure no more than one replicas are on the same nodegroup 
+    verifyNoTwoTargetsOnSameNodeGroup(targets);
+
+    dataNodes[0].updateHeartbeat(
+        2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+        FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0); 
+  }
+
+  private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeDescriptor[] targets) {
+    Set<String> nodeGroupSet = new HashSet<String>();
+    for (DatanodeDescriptor target: targets) {
+      nodeGroupSet.add(target.getNetworkLocation());
+    }
+    assertEquals(nodeGroupSet.size(), targets.length);
+  }
+
+  /**
+   * In this testcase, client is dataNodes[0], but the dataNodes[1] is
+   * not allowed to be chosen. So the 1st replica should be
+   * placed on dataNodes[0], the 2nd replica should be placed on a different
+   * rack, the 3rd should be on same rack as the 2nd replica but in different
+   * node group, and the rest should be placed on a third rack.
+   * @throws Exception
+   */
+  @Test
+  public void testChooseTarget2() throws Exception { 
+    HashMap<Node, Node> excludedNodes;
+    DatanodeDescriptor[] targets;
+    BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+
+    excludedNodes = new HashMap<Node, Node>();
+    excludedNodes.put(dataNodes[1], dataNodes[1]); 
+    targets = repl.chooseTarget(4, dataNodes[0], chosenNodes, 
+        excludedNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 4);
+    assertEquals(targets[0], dataNodes[0]);
+    assertTrue(cluster.isNodeGroupAware());
+    // Make sure no replicas are on the same nodegroup 
+    for (int i=1;i<4;i++) {
+      assertFalse(cluster.isOnSameNodeGroup(targets[0], targets[i]));
+    }
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+               cluster.isOnSameRack(targets[2], targets[3]));
+    assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+  }
+
+  /**
+   * In this testcase, client is dataNodes[0], but dataNodes[0] is not qualified
+   * to be chosen. So the 1st replica should be placed on dataNodes[1], 
+   * the 2nd replica should be placed on a different rack,
+   * the 3rd replica should be placed on the same rack as the 2nd replica but in different nodegroup,
+   * and the rest should be placed on the third rack.
+   * @throws Exception
+   */
+  @Test
+  public void testChooseTarget3() throws Exception {
+    // make data node 0 to be not qualified to choose
+    dataNodes[0].updateHeartbeat(
+        2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+        (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0); // no space
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 0, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename, 1, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertEquals(targets[0], dataNodes[1]);
+
+    targets = replicator.chooseTarget(filename, 2, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertEquals(targets[0], dataNodes[1]);
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+    targets = replicator.chooseTarget(filename, 3, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertEquals(targets[0], dataNodes[1]);
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+    targets = replicator.chooseTarget(filename, 4, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 4);
+    assertEquals(targets[0], dataNodes[1]);
+    assertTrue(cluster.isNodeGroupAware());
+    verifyNoTwoTargetsOnSameNodeGroup(targets);
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+               cluster.isOnSameRack(targets[2], targets[3]));
+
+    dataNodes[0].updateHeartbeat(
+        2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+        FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0); 
+  }
+
+  /**
+   * In this testcase, client is dataNodes[0], but none of the nodes on rack 1
+   * is qualified to be chosen. So the 1st replica should be placed on either
+   * rack 2 or rack 3. 
+   * the 2nd replica should be placed on a different rack,
+   * the 3rd replica should be placed on the same rack as the 1st replica, but 
+   * in different node group.
+   * @throws Exception
+   */
+  @Test
+  public void testChooseTarget4() throws Exception {
+    // make data node 0-2 to be not qualified to choose: not enough disk space
+    for(int i=0; i<3; i++) {
+      dataNodes[i].updateHeartbeat(
+          2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0);
+    }
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 0, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename, 1, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+
+    targets = replicator.chooseTarget(filename, 2, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+    targets = replicator.chooseTarget(filename, 3, dataNodes[0],
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    for(int i=0; i<3; i++) {
+      assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
+    }
+    verifyNoTwoTargetsOnSameNodeGroup(targets);
+    assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
+               cluster.isOnSameRack(targets[1], targets[2]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+  }
+
+  /**
+   * In this testcase, client is is a node outside of file system.
+   * So the 1st replica can be placed on any node. 
+   * the 2nd replica should be placed on a different rack,
+   * the 3rd replica should be placed on the same rack as the 2nd replica,
+   * @throws Exception
+   */
+  @Test
+  public void testChooseTarget5() throws Exception {
+    setupDataNodeCapacity();
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename, 0, NODE,
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename, 1, NODE,
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+
+    targets = replicator.chooseTarget(filename, 2, NODE,
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+    targets = replicator.chooseTarget(filename, 3, NODE,
+        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    verifyNoTwoTargetsOnSameNodeGroup(targets);
+  }
+
+  /**
+   * This testcase tests re-replication, when dataNodes[0] is already chosen.
+   * So the 1st replica can be placed on random rack. 
+   * the 2nd replica should be placed on different node and nodegroup by same rack as 
+   * the 1st replica. The 3rd replica can be placed randomly.
+   * @throws Exception
+   */
+  @Test
+  public void testRereplicate1() throws Exception {
+    setupDataNodeCapacity();
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    chosenNodes.add(dataNodes[0]);
+    DatanodeDescriptor[] targets;
+    
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+    
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    
+    targets = replicator.chooseTarget(filename,
+                                      3, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+  }
+
+  /**
+   * This testcase tests re-replication, 
+   * when dataNodes[0] and dataNodes[1] are already chosen.
+   * So the 1st replica should be placed on a different rack of rack 1. 
+   * the rest replicas can be placed randomly,
+   * @throws Exception
+   */
+  @Test
+  public void testRereplicate2() throws Exception {
+    setupDataNodeCapacity();
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    chosenNodes.add(dataNodes[0]);
+    chosenNodes.add(dataNodes[1]);
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]) && 
+        cluster.isOnSameRack(dataNodes[0], targets[1]));
+  }
+
+  /**
+   * This testcase tests re-replication, 
+   * when dataNodes[0] and dataNodes[3] are already chosen.
+   * So the 1st replica should be placed on the rack that the writer resides. 
+   * the rest replicas can be placed randomly,
+   * @throws Exception
+   */
+  @Test
+  public void testRereplicate3() throws Exception {
+    setupDataNodeCapacity();
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    chosenNodes.add(dataNodes[0]);
+    chosenNodes.add(dataNodes[3]);
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(cluster.isOnSameRack(dataNodes[3], targets[0]));
+
+    targets = replicator.chooseTarget(filename,
+                               1, dataNodes[3], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
+    assertFalse(cluster.isOnSameNodeGroup(dataNodes[3], targets[0]));
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
+    targets = replicator.chooseTarget(filename,
+                               2, dataNodes[3], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
+  }
+  
+  /**
+   * Test for the chooseReplicaToDelete are processed based on 
+   * block locality and free space
+   */
+  @Test
+  public void testChooseReplicaToDelete() throws Exception {
+    List<DatanodeDescriptor> replicaNodeList = 
+        new ArrayList<DatanodeDescriptor>();
+    final Map<String, List<DatanodeDescriptor>> rackMap = 
+        new HashMap<String, List<DatanodeDescriptor>>();
+    dataNodes[0].setRemaining(4*1024*1024);
+    replicaNodeList.add(dataNodes[0]);
+
+    dataNodes[1].setRemaining(3*1024*1024);
+    replicaNodeList.add(dataNodes[1]);
+
+    dataNodes[2].setRemaining(2*1024*1024);
+    replicaNodeList.add(dataNodes[2]);
+
+    dataNodes[5].setRemaining(1*1024*1024);
+    replicaNodeList.add(dataNodes[5]);
+
+    List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
+    List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
+    replicator.splitNodesWithRack(
+        replicaNodeList, rackMap, first, second);
+    assertEquals(3, first.size());
+    assertEquals(1, second.size());
+    DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(
+        null, null, (short)3, first, second);
+    // Within first set {dataNodes[0], dataNodes[1], dataNodes[2]}, 
+    // dataNodes[0] and dataNodes[1] are in the same nodegroup, 
+    // but dataNodes[1] is chosen as less free space
+    assertEquals(chosenNode, dataNodes[1]);
+
+    replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
+    assertEquals(2, first.size());
+    assertEquals(1, second.size());
+    // Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen
+    // as less free space
+    chosenNode = replicator.chooseReplicaToDelete(
+        null, null, (short)2, first, second);
+    assertEquals(chosenNode, dataNodes[2]);
+
+    replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
+    assertEquals(0, first.size());
+    assertEquals(2, second.size());
+    // Within second set, dataNodes[5] with less free space
+    chosenNode = replicator.chooseReplicaToDelete(
+        null, null, (short)1, first, second);
+    assertEquals(chosenNode, dataNodes[5]);
+  }
+
+}