You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/07/05 03:31:58 UTC

svn commit: r1357442 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/

Author: szetszwo
Date: Thu Jul  5 01:31:57 2012
New Revision: 1357442

URL: http://svn.apache.org/viewvc?rev=1357442&view=rev
Log:
HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement with 4-layer network topology.  Contributed by Junping Du

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1357442&r1=1357441&r2=1357442&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Jul  5 01:31:57 2012
@@ -13,6 +13,9 @@ Trunk (unreleased changes)
 
     HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
 
+    HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement
+    with 4-layer network topology.  (Junping Du via szetszwo)
+
   IMPROVEMENTS
 
     HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1357442&r1=1357441&r2=1357442&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Thu Jul  5 01:31:57 2012
@@ -148,6 +148,7 @@ public class BlockPlacementPolicyDefault
       new ArrayList<DatanodeDescriptor>(chosenNodes);
     for (Node node:chosenNodes) {
       excludedNodes.put(node, node);
+      adjustExcludedNodes(excludedNodes, node);
     }
       
     if (!clusterMap.contains(writer)) {
@@ -359,10 +360,11 @@ public class BlockPlacementPolicyDefault
         (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
 
       Node oldNode = excludedNodes.put(chosenNode, chosenNode);
-      if (oldNode == null) { // choosendNode was not in the excluded list
+      if (oldNode == null) { // chosenNode was not in the excluded list
         numOfAvailableNodes--;
         if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
           results.add(chosenNode);
+          adjustExcludedNodes(excludedNodes, chosenNode);
           return chosenNode;
         } else {
           badTarget = true;
@@ -409,6 +411,7 @@ public class BlockPlacementPolicyDefault
         if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
           numOfReplicas--;
           results.add(chosenNode);
+          adjustExcludedNodes(excludedNodes, chosenNode);
         } else {
           badTarget = true;
         }
@@ -426,7 +429,21 @@ public class BlockPlacementPolicyDefault
       throw new NotEnoughReplicasException(detail);
     }
   }
-    
+  
+  /**
+   * After choosing a node to place replica, adjust excluded nodes accordingly.
+   * It should do nothing here as chosenNode is already put into exlcudeNodes, 
+   * but it can be overridden in subclass to put more related nodes into 
+   * excludedNodes.
+   * 
+   * @param excludedNodes
+   * @param chosenNode
+   */
+  protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
+      Node chosenNode) {
+    // do nothing here.
+  }
+
   /* judge if a node is a good target.
    * return true if <i>node</i> has enough space, 
    * does not have too much load, and the rack does not have too many nodes

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1357442&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Thu Jul  5 01:31:57 2012
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+
+/** The class is responsible for choosing the desired number of targets
+ * for placing block replicas on environment with node-group layer.
+ * The replica placement strategy is adjusted to:
+ * If the writer is on a datanode, the 1st replica is placed on the local 
+ *     node (or local node-group), otherwise a random datanode. 
+ * The 2nd replica is placed on a datanode that is on a different rack with 1st
+ *     replica node. 
+ * The 3rd replica is placed on a datanode which is on a different node-group
+ *     but the same rack as the second replica node.
+ */
+public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault {
+
+  BlockPlacementPolicyWithNodeGroup(Configuration conf,  FSClusterStats stats,
+      NetworkTopology clusterMap) {
+    initialize(conf, stats, clusterMap);
+  }
+
+  BlockPlacementPolicyWithNodeGroup() {
+  }
+
+  public void initialize(Configuration conf,  FSClusterStats stats,
+          NetworkTopology clusterMap) {
+    super.initialize(conf, stats, clusterMap);
+  }
+
+  /** choose local node of localMachine as the target.
+   * if localMachine is not available, choose a node on the same nodegroup or 
+   * rack instead.
+   * @return the chosen node
+   */
+  @Override
+  protected DatanodeDescriptor chooseLocalNode(
+      DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes,
+      long blocksize,
+      int maxNodesPerRack,
+      List<DatanodeDescriptor> results)
+        throws NotEnoughReplicasException {
+    // if no local machine, randomly choose one node
+    if (localMachine == null)
+      return chooseRandom(NodeBase.ROOT, excludedNodes, 
+          blocksize, maxNodesPerRack, results);
+
+    // otherwise try local machine first
+    Node oldNode = excludedNodes.put(localMachine, localMachine);
+    if (oldNode == null) { // was not in the excluded list
+      if (isGoodTarget(localMachine, blocksize,
+          maxNodesPerRack, false, results)) {
+        results.add(localMachine);
+        // Nodes under same nodegroup should be excluded.
+        addNodeGroupToExcludedNodes(excludedNodes,
+            localMachine.getNetworkLocation());
+        return localMachine;
+      }
+    } 
+
+    // try a node on local node group
+    DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
+        (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, 
+        blocksize, maxNodesPerRack, results);
+    if (chosenNode != null) {
+      return chosenNode;
+    }
+    // try a node on local rack
+    return chooseLocalRack(localMachine, excludedNodes, 
+        blocksize, maxNodesPerRack, results);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
+      Node chosenNode) {
+    // as node-group aware implementation, it should make sure no two replica
+    // are placing on the same node group.
+    addNodeGroupToExcludedNodes(excludedNodes, chosenNode.getNetworkLocation());
+  }
+  
+  // add all nodes under specific nodegroup to excludedNodes.
+  private void addNodeGroupToExcludedNodes(HashMap<Node, Node> excludedNodes,
+      String nodeGroup) {
+    List<Node> leafNodes = clusterMap.getLeaves(nodeGroup);
+    for (Node node : leafNodes) {
+      excludedNodes.put(node, node);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected DatanodeDescriptor chooseLocalRack(
+                                             DatanodeDescriptor localMachine,
+                                             HashMap<Node, Node> excludedNodes,
+                                             long blocksize,
+                                             int maxNodesPerRack,
+                                             List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+    // no local machine, so choose a random machine
+    if (localMachine == null) {
+      return chooseRandom(NodeBase.ROOT, excludedNodes, 
+                          blocksize, maxNodesPerRack, results);
+    }
+
+    // choose one from the local rack, but off-nodegroup
+    try {
+      return chooseRandom(NetworkTopology.getFirstHalf(
+                              localMachine.getNetworkLocation()),
+                          excludedNodes, blocksize, 
+                          maxNodesPerRack, results);
+    } catch (NotEnoughReplicasException e1) {
+      // find the second replica
+      DatanodeDescriptor newLocal=null;
+      for(Iterator<DatanodeDescriptor> iter=results.iterator();
+          iter.hasNext();) {
+        DatanodeDescriptor nextNode = iter.next();
+        if (nextNode != localMachine) {
+          newLocal = nextNode;
+          break;
+        }
+      }
+      if (newLocal != null) {
+        try {
+          return chooseRandom(clusterMap.getRack(newLocal.getNetworkLocation()),
+                              excludedNodes, blocksize, maxNodesPerRack, results);
+        } catch(NotEnoughReplicasException e2) {
+          //otherwise randomly choose one from the network
+          return chooseRandom(NodeBase.ROOT, excludedNodes,
+                              blocksize, maxNodesPerRack, results);
+        }
+      } else {
+        //otherwise randomly choose one from the network
+        return chooseRandom(NodeBase.ROOT, excludedNodes,
+                            blocksize, maxNodesPerRack, results);
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected void chooseRemoteRack(int numOfReplicas,
+          DatanodeDescriptor localMachine,
+          HashMap<Node, Node> excludedNodes,
+          long blocksize,
+          int maxReplicasPerRack,
+          List<DatanodeDescriptor> results)
+          throws NotEnoughReplicasException {
+    int oldNumOfReplicas = results.size();
+    // randomly choose one node from remote racks
+    try {
+      chooseRandom(numOfReplicas, "~"+NetworkTopology.getFirstHalf(
+          localMachine.getNetworkLocation()),
+      excludedNodes, blocksize, maxReplicasPerRack, results);
+    } catch (NotEnoughReplicasException e) {
+      chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
+      localMachine.getNetworkLocation(), excludedNodes, blocksize, 
+      maxReplicasPerRack, results);
+    }
+  }
+
+  /* choose one node from the nodegroup that <i>localMachine</i> is on.
+   * if no such node is available, choose one node from the nodegroup where
+   * a second replica is on.
+   * if still no such node is available, choose a random node in the cluster.
+   * @return the chosen node
+   */
+  private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clusterMap,
+      DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes, long blocksize, 
+      int maxNodesPerRack, List<DatanodeDescriptor> results) throws NotEnoughReplicasException {
+    // no local machine, so choose a random machine
+    if (localMachine == null) {
+      return chooseRandom(NodeBase.ROOT, excludedNodes, 
+      blocksize, maxNodesPerRack, results);
+    }
+
+    // choose one from the local node group
+    try {
+      return chooseRandom(clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
+      excludedNodes, blocksize, maxNodesPerRack, results);
+    } catch (NotEnoughReplicasException e1) {
+      // find the second replica
+      DatanodeDescriptor newLocal=null;
+      for(Iterator<DatanodeDescriptor> iter=results.iterator();
+        iter.hasNext();) {
+        DatanodeDescriptor nextNode = iter.next();
+        if (nextNode != localMachine) {
+          newLocal = nextNode;
+          break;
+        }
+      }
+      if (newLocal != null) {
+        try {
+          return chooseRandom(clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
+            excludedNodes, blocksize, maxNodesPerRack, results);
+        } catch(NotEnoughReplicasException e2) {
+          //otherwise randomly choose one from the network
+          return chooseRandom(NodeBase.ROOT, excludedNodes,
+              blocksize, maxNodesPerRack, results);
+        }
+      } else {
+        //otherwise randomly choose one from the network
+        return chooseRandom(NodeBase.ROOT, excludedNodes,
+            blocksize, maxNodesPerRack, results);
+      }
+    }
+  }
+
+  @Override
+  protected String getRack(final DatanodeInfo cur) {
+    String nodeGroupString = cur.getNetworkLocation();
+    return NetworkTopology.getFirstHalf(nodeGroupString);
+  }
+
+  /**
+   * Pick up replica node set for deleting replica as over-replicated. 
+   * First set contains replica nodes on rack with more than one
+   * replica while second set contains remaining replica nodes.
+   * If first is not empty, divide first set into two subsets:
+   *   moreThanOne contains nodes on nodegroup with more than one replica
+   *   exactlyOne contains the remaining nodes in first set
+   * then pickup priSet if not empty.
+   * If first is empty, then pick second.
+   */
+  @Override
+  public Iterator<DatanodeDescriptor> pickupReplicaSet(
+      Collection<DatanodeDescriptor> first,
+      Collection<DatanodeDescriptor> second) {
+    // If no replica within same rack, return directly.
+    if (first.isEmpty()) {
+      return second.iterator();
+    }
+    // Split data nodes in the first set into two sets, 
+    // moreThanOne contains nodes on nodegroup with more than one replica
+    // exactlyOne contains the remaining nodes
+    Map<String, List<DatanodeDescriptor>> nodeGroupMap = 
+        new HashMap<String, List<DatanodeDescriptor>>();
+    
+    for(DatanodeDescriptor node : first) {
+      final String nodeGroupName = 
+          NetworkTopology.getLastHalf(node.getNetworkLocation());
+      List<DatanodeDescriptor> datanodeList = 
+          nodeGroupMap.get(nodeGroupName);
+      if (datanodeList == null) {
+        datanodeList = new ArrayList<DatanodeDescriptor>();
+        nodeGroupMap.put(nodeGroupName, datanodeList);
+      }
+      datanodeList.add(node);
+    }
+    
+    final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
+    final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
+    // split nodes into two sets
+    for(List<DatanodeDescriptor> datanodeList : nodeGroupMap.values()) {
+      if (datanodeList.size() == 1 ) {
+        // exactlyOne contains nodes on nodegroup with exactly one replica
+        exactlyOne.add(datanodeList.get(0));
+      } else {
+        // moreThanOne contains nodes on nodegroup with more than one replica
+        moreThanOne.addAll(datanodeList);
+      }
+    }
+    
+    Iterator<DatanodeDescriptor> iter =
+        moreThanOne.isEmpty() ? exactlyOne.iterator() : moreThanOne.iterator();
+    return iter;
+  }
+  
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java?rev=1357442&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java Thu Jul  5 01:31:57 2012
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.junit.Test;
+
+public class TestReplicationPolicyWithNodeGroup extends TestCase {
+  private static final int BLOCK_SIZE = 1024;
+  private static final int NUM_OF_DATANODES = 8;
+  private static final Configuration CONF = new HdfsConfiguration();
+  private static final NetworkTopology cluster;
+  private static final NameNode namenode;
+  private static final BlockPlacementPolicy replicator;
+  private static final String filename = "/dummyfile.txt";
+
+  private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
+      DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
+      DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
+      DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n2"),
+      DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2/n3"),
+      DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
+      DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n4"),
+      DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/n5"),
+      DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/n6")
+  };
+
+  private final static DatanodeDescriptor NODE = 
+      new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7"));
+
+  static {
+    try {
+      FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
+      CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+      // Set properties to make HDFS aware of NodeGroup.
+      CONF.set("dfs.block.replicator.classname", 
+          "org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup");
+      CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, 
+          "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
+      DFSTestUtil.formatNameNode(CONF);
+      namenode = new NameNode(CONF);
+    } catch (IOException e) {
+      e.printStackTrace();
+      throw (RuntimeException)new RuntimeException().initCause(e);
+    }
+    final BlockManager bm = namenode.getNamesystem().getBlockManager();
+    replicator = bm.getBlockPlacementPolicy();
+    cluster = bm.getDatanodeManager().getNetworkTopology();
+    // construct network topology
+    for(int i=0; i<NUM_OF_DATANODES; i++) {
+      cluster.add(dataNodes[i]);
+    }
+    setupDataNodeCapacity();
+  }
+
+  private static void setupDataNodeCapacity() {
+    for(int i=0; i<NUM_OF_DATANODES; i++) {
+      dataNodes[i].updateHeartbeat(
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+    }
+  }
+
+  /**
+   * In this testcase, client is dataNodes[0]. So the 1st replica should be
+   * placed on dataNodes[0], the 2nd replica should be placed on 
+   * different rack and third should be placed on different node (and node group)
+   * of rack chosen for 2nd node.
+   * The only excpetion is when the <i>numOfReplicas</i> is 2, 
+   * the 1st is on dataNodes[0] and the 2nd is on a different rack.
+   * @throws Exception
+   */
+  public void testChooseTarget1() throws Exception {
+    dataNodes[0].updateHeartbeat(
+        2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 
+        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertEquals(targets[0], dataNodes[0]);
+
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertEquals(targets[0], dataNodes[0]);
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+    targets = replicator.chooseTarget(filename,
+                                      3, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertEquals(targets[0], dataNodes[0]);
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+    assertFalse(cluster.isOnSameNodeGroup(targets[1], targets[2]));
+
+    targets = replicator.chooseTarget(filename,
+                                      4, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 4);
+    assertEquals(targets[0], dataNodes[0]);
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+               cluster.isOnSameRack(targets[2], targets[3]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+    // Make sure no more than one replicas are on the same nodegroup 
+    verifyNoTwoTargetsOnSameNodeGroup(targets);
+
+    dataNodes[0].updateHeartbeat(
+        2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); 
+  }
+
+  private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeDescriptor[] targets) {
+    Set<String> nodeGroupSet = new HashSet<String>();
+    for (DatanodeDescriptor target: targets) {
+      nodeGroupSet.add(target.getNetworkLocation());
+    }
+    assertEquals(nodeGroupSet.size(), targets.length);
+  }
+
+  /**
+   * In this testcase, client is dataNodes[0], but the dataNodes[1] is
+   * not allowed to be chosen. So the 1st replica should be
+   * placed on dataNodes[0], the 2nd replica should be placed on a different
+   * rack, the 3rd should be on same rack as the 2nd replica but in different
+   * node group, and the rest should be placed on a third rack.
+   * @throws Exception
+   */
+  public void testChooseTarget2() throws Exception { 
+    HashMap<Node, Node> excludedNodes;
+    DatanodeDescriptor[] targets;
+    BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+
+    excludedNodes = new HashMap<Node, Node>();
+    excludedNodes.put(dataNodes[1], dataNodes[1]); 
+    targets = repl.chooseTarget(4, dataNodes[0], chosenNodes, false, 
+        excludedNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 4);
+    assertEquals(targets[0], dataNodes[0]);
+    assertTrue(cluster.isNodeGroupAware());
+    // Make sure no replicas are on the same nodegroup 
+    for (int i=1;i<4;i++) {
+      assertFalse(cluster.isOnSameNodeGroup(targets[0], targets[i]));
+    }
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+               cluster.isOnSameRack(targets[2], targets[3]));
+    assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+
+    excludedNodes.clear();
+    chosenNodes.clear();
+    excludedNodes.put(dataNodes[1], dataNodes[1]); 
+    chosenNodes.add(dataNodes[2]);
+    targets = repl.chooseTarget(1, dataNodes[0], chosenNodes, true,
+        excludedNodes, BLOCK_SIZE);
+    System.out.println("targets=" + Arrays.asList(targets));
+    assertEquals(2, targets.length);
+    //make sure that the chosen node is in the target.
+    int i = 0;
+    for(; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
+    assertTrue(i < targets.length);
+  }
+
+  /**
+   * In this testcase, client is dataNodes[0], but dataNodes[0] is not qualified
+   * to be chosen. So the 1st replica should be placed on dataNodes[1], 
+   * the 2nd replica should be placed on a different rack,
+   * the 3rd replica should be placed on the same rack as the 2nd replica but in different nodegroup,
+   * and the rest should be placed on the third rack.
+   * @throws Exception
+   */
+  public void testChooseTarget3() throws Exception {
+    // make data node 0 to be not qualified to choose
+    dataNodes[0].updateHeartbeat(
+        2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+        (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertEquals(targets[0], dataNodes[1]);
+
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertEquals(targets[0], dataNodes[1]);
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+    targets = replicator.chooseTarget(filename,
+                                      3, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertEquals(targets[0], dataNodes[1]);
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+    targets = replicator.chooseTarget(filename,
+                                      4, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 4);
+    assertEquals(targets[0], dataNodes[1]);
+    assertTrue(cluster.isNodeGroupAware());
+    verifyNoTwoTargetsOnSameNodeGroup(targets);
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+               cluster.isOnSameRack(targets[2], targets[3]));
+
+    dataNodes[0].updateHeartbeat(
+        2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); 
+  }
+
+  /**
+   * In this testcase, client is dataNodes[0], but none of the nodes on rack 1
+   * is qualified to be chosen. So the 1st replica should be placed on either
+   * rack 2 or rack 3. 
+   * the 2nd replica should be placed on a different rack,
+   * the 3rd replica should be placed on the same rack as the 1st replica, but 
+   * in different node group.
+   * @throws Exception
+   */
+  public void testChooseTarget4() throws Exception {
+    // make data node 0-2 to be not qualified to choose: not enough disk space
+    for(int i=0; i<3; i++) {
+      dataNodes[i].updateHeartbeat(
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+          (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
+    }
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+
+    targets = replicator.chooseTarget(filename,
+                                      3, dataNodes[0], BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    for(int i=0; i<3; i++) {
+      assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
+    }
+    verifyNoTwoTargetsOnSameNodeGroup(targets);
+    assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
+               cluster.isOnSameRack(targets[1], targets[2]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+  }
+
+  /**
+   * In this testcase, client is is a node outside of file system.
+   * So the 1st replica can be placed on any node. 
+   * the 2nd replica should be placed on a different rack,
+   * the 3rd replica should be placed on the same rack as the 2nd replica,
+   * @throws Exception
+   */
+  public void testChooseTarget5() throws Exception {
+    setupDataNodeCapacity();
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename,
+                                      0, NODE, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+    
+    targets = replicator.chooseTarget(filename,
+                                      1, NODE, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    
+    targets = replicator.chooseTarget(filename,
+                                      2, NODE, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    
+    targets = replicator.chooseTarget(filename,
+                                      3, NODE, BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    verifyNoTwoTargetsOnSameNodeGroup(targets);
+  }
+
+  /**
+   * This testcase tests re-replication, when dataNodes[0] is already chosen.
+   * So the 1st replica can be placed on random rack. 
+   * the 2nd replica should be placed on different node and nodegroup by same rack as 
+   * the 1st replica. The 3rd replica can be placed randomly.
+   * @throws Exception
+   */
+  public void testRereplicate1() throws Exception {
+    setupDataNodeCapacity();
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    chosenNodes.add(dataNodes[0]);
+    DatanodeDescriptor[] targets;
+    
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+    
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    
+    targets = replicator.chooseTarget(filename,
+                                      3, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+  }
+
+  /**
+   * This testcase tests re-replication, 
+   * when dataNodes[0] and dataNodes[1] are already chosen.
+   * So the 1st replica should be placed on a different rack of rack 1. 
+   * the rest replicas can be placed randomly,
+   * @throws Exception
+   */
+  public void testRereplicate2() throws Exception {
+    setupDataNodeCapacity();
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    chosenNodes.add(dataNodes[0]);
+    chosenNodes.add(dataNodes[1]);
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]) && 
+        cluster.isOnSameRack(dataNodes[0], targets[1]));
+  }
+
+  /**
+   * This testcase tests re-replication, 
+   * when dataNodes[0] and dataNodes[3] are already chosen.
+   * So the 1st replica should be placed on the rack that the writer resides. 
+   * the rest replicas can be placed randomly,
+   * @throws Exception
+   */
+  public void testRereplicate3() throws Exception {
+    setupDataNodeCapacity();
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
+    chosenNodes.add(dataNodes[0]);
+    chosenNodes.add(dataNodes[3]);
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(cluster.isOnSameRack(dataNodes[3], targets[0]));
+
+    targets = replicator.chooseTarget(filename,
+                               1, dataNodes[3], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
+    assertFalse(cluster.isOnSameNodeGroup(dataNodes[3], targets[0]));
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
+    targets = replicator.chooseTarget(filename,
+                               2, dataNodes[3], chosenNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
+  }
+  
+  /**
+   * Test for the chooseReplicaToDelete are processed based on 
+   * block locality and free space
+   */
+  @Test
+  public void testChooseReplicaToDelete() throws Exception {
+    List<DatanodeDescriptor> replicaNodeList = 
+        new ArrayList<DatanodeDescriptor>();
+    final Map<String, List<DatanodeDescriptor>> rackMap = 
+        new HashMap<String, List<DatanodeDescriptor>>();
+    dataNodes[0].setRemaining(4*1024*1024);
+    replicaNodeList.add(dataNodes[0]);
+
+    dataNodes[1].setRemaining(3*1024*1024);
+    replicaNodeList.add(dataNodes[1]);
+
+    dataNodes[2].setRemaining(2*1024*1024);
+    replicaNodeList.add(dataNodes[2]);
+
+    dataNodes[5].setRemaining(1*1024*1024);
+    replicaNodeList.add(dataNodes[5]);
+
+    List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
+    List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
+    replicator.splitNodesWithRack(
+        replicaNodeList, rackMap, first, second);
+    assertEquals(3, first.size());
+    assertEquals(1, second.size());
+    DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(
+        null, null, (short)3, first, second);
+    // Within first set {dataNodes[0], dataNodes[1], dataNodes[2]}, 
+    // dataNodes[0] and dataNodes[1] are in the same nodegroup, 
+    // but dataNodes[1] is chosen as less free space
+    assertEquals(chosenNode, dataNodes[1]);
+
+    replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
+    assertEquals(2, first.size());
+    assertEquals(1, second.size());
+    // Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen
+    // as less free space
+    chosenNode = replicator.chooseReplicaToDelete(
+        null, null, (short)2, first, second);
+    assertEquals(chosenNode, dataNodes[2]);
+
+    replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
+    assertEquals(0, first.size());
+    assertEquals(2, second.size());
+    // Within second set, dataNodes[5] with less free space
+    chosenNode = replicator.chooseReplicaToDelete(
+        null, null, (short)1, first, second);
+    assertEquals(chosenNode, dataNodes[5]);
+  }
+
+}