You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by dh...@apache.org on 2009/09/15 07:38:57 UTC

svn commit: r815001 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/balancer/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/

Author: dhruba
Date: Tue Sep 15 05:38:57 2009
New Revision: 815001

URL: http://svn.apache.org/viewvc?rev=815001&view=rev
Log:
HDFS-385. Add support for an API that allows a module external
to HDFS to specify how HDFS blocks should be placed. (dhruba)


Added:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Sep 15 05:38:57 2009
@@ -37,6 +37,9 @@
     HDFS-235. Add support for byte ranges in HftpFileSystem to serve
     range of bytes from a file. (Bill Zeller via suresh)
 
+    HDFS-385. Add support for an experimental API that allows a module external
+    to HDFS to specify how HDFS blocks should be placed. (dhruba)
+
   IMPROVEMENTS
 
     HDFS-381. Remove blocks from DataNode maps when corresponding file

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Tue Sep 15 05:38:57 2009
@@ -25,6 +25,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.Class;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -64,6 +65,9 @@
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.io.IOUtils;
@@ -772,18 +776,31 @@
       }
     }
   }
+
+  /* Check that this Balancer is compatible with the Block Placement Policy
+   * used by the Namenode.
+   */
+  private void checkReplicationPolicyCompatibility(Configuration conf) throws UnsupportedActionException {
+    if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != 
+        BlockPlacementPolicyDefault.class) {
+      throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
+    }
+  }
   
   /** Default constructor */
-  Balancer() {
+  Balancer() throws UnsupportedActionException {
+    checkReplicationPolicyCompatibility(getConf());
   }
   
   /** Construct a balancer from the given configuration */
-  Balancer(Configuration conf) {
+  Balancer(Configuration conf) throws UnsupportedActionException {
+    checkReplicationPolicyCompatibility(conf);
     setConf(conf);
   } 
 
   /** Construct a balancer from the given configuration and threshold */
-  Balancer(Configuration conf, double threshold) {
+  Balancer(Configuration conf, double threshold) throws UnsupportedActionException {
+    checkReplicationPolicyCompatibility(conf);
     setConf(conf);
     this.threshold = threshold;
   }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Tue Sep 15 05:38:57 2009
@@ -117,7 +117,7 @@
   Random r = new Random();
 
   // for block replicas placement
-  ReplicationTargetChooser replicator;
+  BlockPlacementPolicy replicator;
 
   BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
     this(fsn, conf, DEFAULT_INITIAL_MAP_CAPACITY);
@@ -134,8 +134,8 @@
   }
 
   void setConfigurationParameters(Configuration conf) throws IOException {
-    this.replicator = new ReplicationTargetChooser(
-                         conf.getBoolean("dfs.replication.considerLoad", true),
+    this.replicator = BlockPlacementPolicy.getInstance(
+                         conf,
                          namesystem,
                          namesystem.clusterMap);
 
@@ -639,12 +639,13 @@
     int requiredReplication, numEffectiveReplicas;
     List<DatanodeDescriptor> containingNodes;
     DatanodeDescriptor srcNode;
+    INodeFile fileINode = null;
     int additionalReplRequired;
 
     synchronized (namesystem) {
       synchronized (neededReplications) {
         // block should belong to a file
-        INodeFile fileINode = blocksMap.getINode(block);
+        fileINode = blocksMap.getINode(block);
         // abandoned block or block reopened for append
         if(fileINode == null || fileINode.isUnderConstruction()) {
           neededReplications.remove(block, priority); // remove from neededReplications
@@ -691,9 +692,11 @@
     }
 
     // choose replication targets: NOT HOLDING THE GLOBAL LOCK
+    // It is costly to extract the filename for which chooseTargets is called,
+    // so for now we pass in the Inode itself.
     DatanodeDescriptor targets[] = 
-                       replicator.chooseTarget(additionalReplRequired,
-                       srcNode, containingNodes, null, block.getNumBytes());
+                       replicator.chooseTarget(fileINode, additionalReplRequired,
+                       srcNode, containingNodes, block.getNumBytes());
     if(targets.length == 0)
       return false;
 
@@ -701,7 +704,7 @@
       synchronized (neededReplications) {
         // Recheck since global lock was released
         // block should belong to a file
-        INodeFile fileINode = blocksMap.getINode(block);
+        fileINode = blocksMap.getINode(block);
         // abandoned block or block reopened for append
         if(fileINode == null || fileINode.isUnderConstruction()) {
           neededReplications.remove(block, priority); // remove from neededReplications
@@ -1162,7 +1165,7 @@
       }
     }
     namesystem.chooseExcessReplicates(nonExcess, block, replication, 
-        addedNode, delNodeHint);
+        addedNode, delNodeHint, replicator);
   }
 
   void addToExcessReplicate(DatanodeInfo dn, Block block) {

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java?rev=815001&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java Tue Sep 15 05:38:57 2009
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node; 
+import org.apache.hadoop.util.ReflectionUtils;
+import java.util.*;
+
+/** 
+ * This interface is used for choosing the desired number of targets
+ * for placing block replicas.
+ */
+public abstract class BlockPlacementPolicy {
+    
+  public static class NotEnoughReplicasException extends Exception {
+    private static final long serialVersionUID = 1L;
+    NotEnoughReplicasException(String msg) {
+      super(msg);
+    }
+  }
+    
+  /**
+   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
+   * to re-replicate a block with size <i>blocksize</i> 
+   * If not, return as many as we can.
+   * 
+   * @param srcPath the file to which this chooseTargets is being invoked. 
+   * @param numOfReplicas additional number of replicas wanted.
+   * @param writer the writer's machine, null if not in the cluster.
+   * @param chosenNodes datanodes that have been chosen as targets.
+   * @param blocksize size of the data to be written.
+   * @return array of DatanodeDescriptor instances chosen as target 
+   * and sorted as a pipeline.
+   */
+  abstract DatanodeDescriptor[] chooseTarget(String srcPath,
+                                             int numOfReplicas,
+                                             DatanodeDescriptor writer,
+                                             List<DatanodeDescriptor> chosenNodes,
+                                             long blocksize);
+
+  /**
+   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
+   * to re-replicate a block with size <i>blocksize</i> 
+   * If not, return as many as we can.
+   * The base implemenatation extracts the pathname of the file from the
+   * specified srcInode, but this could be a costly operation depending on the
+   * file system implementation. Concrete implementations of this class should
+   * override this method to avoid this overhead.
+   * 
+   * @param srcInode The inode of the file for which chooseTarget is being invoked.
+   * @param numOfReplicas additional number of replicas wanted.
+   * @param writer the writer's machine, null if not in the cluster.
+   * @param chosenNodes datanodes that have been chosen as targets.
+   * @param blocksize size of the data to be written.
+   * @return array of DatanodeDescriptor instances chosen as target 
+   * and sorted as a pipeline.
+   */
+  DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> chosenNodes,
+                                    long blocksize) {
+    return chooseTarget(srcInode.getFullPathName(), numOfReplicas, writer,
+                        chosenNodes, blocksize);
+  }
+
+  /**
+   * Verify that the block is replicated on at least minRacks different racks
+   * if there is more than minRacks rack in the system.
+   * 
+   * @param srcPath the full pathname of the file to be verified
+   * @param lBlk block with locations
+   * @param minRacks number of racks the block should be replicated to
+   * @return the difference between the required and the actual number of racks
+   * the block is replicated to.
+   */
+  abstract public int verifyBlockPlacement(String srcPath,
+                                           LocatedBlock lBlk,
+                                           int minRacks);
+  /**
+   * Decide whether deleting the specified replica of the block still makes 
+   * the block conform to the configured block placement policy.
+   * 
+   * @param srcInode The inode of the file to which the block-to-be-deleted belongs
+   * @param block The block to be deleted
+   * @param replicationFactor The required number of replicas for this block
+   * @param existingReplicas The replica locations of this block that are present
+                  on at least two unique racks. 
+   * @param moreExistingReplicas Replica locations of this block that are not
+                   listed in the previous parameter.
+   * @return the replica that is the best candidate for deletion
+   */
+  abstract public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo srcInode,
+                                      Block block, 
+                                      short replicationFactor,
+                                      Collection<DatanodeDescriptor> existingReplicas,
+                                      Collection<DatanodeDescriptor> moreExistingReplicas);
+
+  /**
+   * Used to setup a BlockPlacementPolicy object. This should be defined by 
+   * all implementations of a BlockPlacementPolicy.
+   * 
+   * @param conf the configuration object
+   * @param stats retrieve cluster status from here
+   * @param clusterMap cluster topology
+   */
+  abstract protected void initialize(Configuration conf,  FSClusterStats stats, 
+                                     NetworkTopology clusterMap);
+    
+  /**
+   * Get an instance of the configured Block Placement Policy based on the
+   * value of the configuration paramater dfs.block.replicator.classname.
+   * 
+   * @param conf the configuration to be used
+   * @param stats an object thatis used to retrieve the load on the cluster
+   * @param clusterMap the network topology of the cluster
+   * @return an instance of BlockPlacementPolicy
+   */
+  public static BlockPlacementPolicy getInstance(Configuration conf, 
+                                                 FSClusterStats stats,
+                                                 NetworkTopology clusterMap) {
+    Class<? extends BlockPlacementPolicy> replicatorClass =
+                      conf.getClass("dfs.block.replicator.classname",
+                                    BlockPlacementPolicyDefault.class,
+                                    BlockPlacementPolicy.class);
+    BlockPlacementPolicy replicator = (BlockPlacementPolicy) ReflectionUtils.newInstance(
+                                                             replicatorClass, conf);
+    replicator.initialize(conf, stats, clusterMap);
+    return replicator;
+  }
+
+  /**
+   * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
+   * a block with size <i>blocksize</i> 
+   * If not, return as many as we can.
+   * 
+   * @param srcPath a string representation of the file for which chooseTarget is invoked
+   * @param numOfReplicas number of replicas wanted.
+   * @param writer the writer's machine, null if not in the cluster.
+   * @param blocksize size of the data to be written.
+   * @return array of DatanodeDescriptor instances chosen as targets
+   * and sorted as a pipeline.
+   */
+  DatanodeDescriptor[] chooseTarget(String srcPath,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    long blocksize) {
+    return chooseTarget(srcPath, numOfReplicas, writer,
+                        new ArrayList<DatanodeDescriptor>(),
+                        blocksize);
+  }
+}

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=815001&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java Tue Sep 15 05:38:57 2009
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import java.util.*;
+
+/** The class is responsible for choosing the desired number of targets
+ * for placing block replicas.
+ * The replica placement strategy is that if the writer is on a datanode,
+ * the 1st replica is placed on the local machine, 
+ * otherwise a random datanode. The 2nd replica is placed on a datanode
+ * that is on a different rack. The 3rd replica is placed on a datanode
+ * which is on a different node of the rack as the second replica.
+ */
+public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
+  private boolean considerLoad; 
+  private NetworkTopology clusterMap;
+  private FSClusterStats stats;
+
+  BlockPlacementPolicyDefault(Configuration conf,  FSClusterStats stats,
+                           NetworkTopology clusterMap) {
+    initialize(conf, stats, clusterMap);
+  }
+
+  BlockPlacementPolicyDefault() {
+  }
+    
+  /** {@inheritDoc} */
+  public void initialize(Configuration conf,  FSClusterStats stats,
+                         NetworkTopology clusterMap) {
+    this.considerLoad = conf.getBoolean("dfs.replication.considerLoad", true);
+    this.stats = stats;
+    this.clusterMap = clusterMap;
+  }
+
+  /** {@inheritDoc} */
+  public DatanodeDescriptor[] chooseTarget(String srcPath,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> chosenNodes,
+                                    long blocksize) {
+    return chooseTarget(numOfReplicas, writer, chosenNodes, null, blocksize);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> chosenNodes,
+                                    long blocksize) {
+    return chooseTarget(numOfReplicas, writer, chosenNodes, null, blocksize);
+  }
+    
+  /**
+   * This is not part of the public API but is used by the unit tests.
+   */
+  DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> chosenNodes,
+                                    HashMap<Node, Node> excludedNodes,
+                                    long blocksize) {
+    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
+      return new DatanodeDescriptor[0];
+    }
+      
+    if (excludedNodes == null) {
+      excludedNodes = new HashMap<Node, Node>();
+    }
+     
+    int clusterSize = clusterMap.getNumOfLeaves();
+    int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
+    if (totalNumOfReplicas > clusterSize) {
+      numOfReplicas -= (totalNumOfReplicas-clusterSize);
+      totalNumOfReplicas = clusterSize;
+    }
+      
+    int maxNodesPerRack = 
+      (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
+      
+    List<DatanodeDescriptor> results = 
+      new ArrayList<DatanodeDescriptor>(chosenNodes);
+    for (Node node:chosenNodes) {
+      excludedNodes.put(node, node);
+    }
+      
+    if (!clusterMap.contains(writer)) {
+      writer=null;
+    }
+      
+    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
+                                                excludedNodes, blocksize, maxNodesPerRack, results);
+      
+    results.removeAll(chosenNodes);
+      
+    // sorting nodes to form a pipeline
+    return getPipeline((writer==null)?localNode:writer,
+                       results.toArray(new DatanodeDescriptor[results.size()]));
+  }
+    
+  /* choose <i>numOfReplicas</i> from all data nodes */
+  private DatanodeDescriptor chooseTarget(int numOfReplicas,
+                                          DatanodeDescriptor writer,
+                                          HashMap<Node, Node> excludedNodes,
+                                          long blocksize,
+                                          int maxNodesPerRack,
+                                          List<DatanodeDescriptor> results) {
+      
+    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
+      return writer;
+    }
+      
+    int numOfResults = results.size();
+    boolean newBlock = (numOfResults==0);
+    if (writer == null && !newBlock) {
+      writer = results.get(0);
+    }
+      
+    try {
+      if (numOfResults == 0) {
+        writer = chooseLocalNode(writer, excludedNodes, 
+                                 blocksize, maxNodesPerRack, results);
+        if (--numOfReplicas == 0) {
+          return writer;
+        }
+      }
+      if (numOfResults <= 1) {
+        chooseRemoteRack(1, results.get(0), excludedNodes, 
+                         blocksize, maxNodesPerRack, results);
+        if (--numOfReplicas == 0) {
+          return writer;
+        }
+      }
+      if (numOfResults <= 2) {
+        if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
+          chooseRemoteRack(1, results.get(0), excludedNodes,
+                           blocksize, maxNodesPerRack, results);
+        } else if (newBlock){
+          chooseLocalRack(results.get(1), excludedNodes, blocksize, 
+                          maxNodesPerRack, results);
+        } else {
+          chooseLocalRack(writer, excludedNodes, blocksize,
+                          maxNodesPerRack, results);
+        }
+        if (--numOfReplicas == 0) {
+          return writer;
+        }
+      }
+      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
+                   blocksize, maxNodesPerRack, results);
+    } catch (NotEnoughReplicasException e) {
+      FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
+               + numOfReplicas);
+    }
+    return writer;
+  }
+    
+  /* choose <i>localMachine</i> as the target.
+   * if <i>localMachine</i> is not available, 
+   * choose a node on the same rack
+   * @return the chosen node
+   */
+  private DatanodeDescriptor chooseLocalNode(
+                                             DatanodeDescriptor localMachine,
+                                             HashMap<Node, Node> excludedNodes,
+                                             long blocksize,
+                                             int maxNodesPerRack,
+                                             List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+    // if no local machine, randomly choose one node
+    if (localMachine == null)
+      return chooseRandom(NodeBase.ROOT, excludedNodes, 
+                          blocksize, maxNodesPerRack, results);
+      
+    // otherwise try local machine first
+    Node oldNode = excludedNodes.put(localMachine, localMachine);
+    if (oldNode == null) { // was not in the excluded list
+      if (isGoodTarget(localMachine, blocksize,
+                       maxNodesPerRack, false, results)) {
+        results.add(localMachine);
+        return localMachine;
+      }
+    } 
+      
+    // try a node on local rack
+    return chooseLocalRack(localMachine, excludedNodes, 
+                           blocksize, maxNodesPerRack, results);
+  }
+    
+  /* choose one node from the rack that <i>localMachine</i> is on.
+   * if no such node is available, choose one node from the rack where
+   * a second replica is on.
+   * if still no such node is available, choose a random node 
+   * in the cluster.
+   * @return the chosen node
+   */
+  private DatanodeDescriptor chooseLocalRack(
+                                             DatanodeDescriptor localMachine,
+                                             HashMap<Node, Node> excludedNodes,
+                                             long blocksize,
+                                             int maxNodesPerRack,
+                                             List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+    // no local machine, so choose a random machine
+    if (localMachine == null) {
+      return chooseRandom(NodeBase.ROOT, excludedNodes, 
+                          blocksize, maxNodesPerRack, results);
+    }
+      
+    // choose one from the local rack
+    try {
+      return chooseRandom(
+                          localMachine.getNetworkLocation(),
+                          excludedNodes, blocksize, maxNodesPerRack, results);
+    } catch (NotEnoughReplicasException e1) {
+      // find the second replica
+      DatanodeDescriptor newLocal=null;
+      for(Iterator<DatanodeDescriptor> iter=results.iterator();
+          iter.hasNext();) {
+        DatanodeDescriptor nextNode = iter.next();
+        if (nextNode != localMachine) {
+          newLocal = nextNode;
+          break;
+        }
+      }
+      if (newLocal != null) {
+        try {
+          return chooseRandom(
+                              newLocal.getNetworkLocation(),
+                              excludedNodes, blocksize, maxNodesPerRack, results);
+        } catch(NotEnoughReplicasException e2) {
+          //otherwise randomly choose one from the network
+          return chooseRandom(NodeBase.ROOT, excludedNodes,
+                              blocksize, maxNodesPerRack, results);
+        }
+      } else {
+        //otherwise randomly choose one from the network
+        return chooseRandom(NodeBase.ROOT, excludedNodes,
+                            blocksize, maxNodesPerRack, results);
+      }
+    }
+  }
+    
+  /* choose <i>numOfReplicas</i> nodes from the racks 
+   * that <i>localMachine</i> is NOT on.
+   * if not enough nodes are available, choose the remaining ones 
+   * from the local rack
+   */
+    
+  private void chooseRemoteRack(int numOfReplicas,
+                                DatanodeDescriptor localMachine,
+                                HashMap<Node, Node> excludedNodes,
+                                long blocksize,
+                                int maxReplicasPerRack,
+                                List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+    int oldNumOfReplicas = results.size();
+    // randomly choose one node from remote racks
+    try {
+      chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
+                   excludedNodes, blocksize, maxReplicasPerRack, results);
+    } catch (NotEnoughReplicasException e) {
+      chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
+                   localMachine.getNetworkLocation(), excludedNodes, blocksize, 
+                   maxReplicasPerRack, results);
+    }
+  }
+
+  /* Randomly choose one target from <i>nodes</i>.
+   * @return the chosen node
+   */
+  private DatanodeDescriptor chooseRandom(
+                                          String nodes,
+                                          HashMap<Node, Node> excludedNodes,
+                                          long blocksize,
+                                          int maxNodesPerRack,
+                                          List<DatanodeDescriptor> results) 
+    throws NotEnoughReplicasException {
+    int numOfAvailableNodes =
+      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
+    while(numOfAvailableNodes > 0) {
+      DatanodeDescriptor chosenNode = 
+        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
+
+      Node oldNode = excludedNodes.put(chosenNode, chosenNode);
+      if (oldNode == null) { // choosendNode was not in the excluded list
+        numOfAvailableNodes--;
+        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+          results.add(chosenNode);
+          return chosenNode;
+        }
+      }
+    }
+
+    throw new NotEnoughReplicasException(
+        "Not able to place enough replicas");
+  }
+    
+  /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
+   */
+  private void chooseRandom(int numOfReplicas,
+                            String nodes,
+                            HashMap<Node, Node> excludedNodes,
+                            long blocksize,
+                            int maxNodesPerRack,
+                            List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+      
+    int numOfAvailableNodes =
+      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
+    while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
+      DatanodeDescriptor chosenNode = 
+        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
+      Node oldNode = excludedNodes.put(chosenNode, chosenNode);
+      if (oldNode == null) {
+        numOfAvailableNodes--;
+
+        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+          numOfReplicas--;
+          results.add(chosenNode);
+        }
+      }
+    }
+      
+    if (numOfReplicas>0) {
+      throw new NotEnoughReplicasException(
+                                           "Not able to place enough replicas");
+    }
+  }
+    
+  /* judge if a node is a good target.
+   * return true if <i>node</i> has enough space, 
+   * does not have too much load, and the rack does not have too many nodes
+   */
+  private boolean isGoodTarget(DatanodeDescriptor node,
+                               long blockSize, int maxTargetPerLoc,
+                               List<DatanodeDescriptor> results) {
+    return isGoodTarget(node, blockSize, maxTargetPerLoc,
+                        this.considerLoad, results);
+  }
+    
+  private boolean isGoodTarget(DatanodeDescriptor node,
+                               long blockSize, int maxTargetPerLoc,
+                               boolean considerLoad,
+                               List<DatanodeDescriptor> results) {
+    Log logr = FSNamesystem.LOG;
+    // check if the node is (being) decommissed
+    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      logr.debug("Node "+NodeBase.getPath(node)+
+                " is not chosen because the node is (being) decommissioned");
+      return false;
+    }
+
+    long remaining = node.getRemaining() - 
+                     (node.getBlocksScheduled() * blockSize); 
+    // check the remaining capacity of the target machine
+    if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
+      logr.debug("Node "+NodeBase.getPath(node)+
+                " is not chosen because the node does not have enough space");
+      return false;
+    }
+      
+    // check the communication traffic of the target machine
+    if (considerLoad) {
+      double avgLoad = 0;
+      int size = clusterMap.getNumOfLeaves();
+      if (size != 0 && stats != null) {
+        avgLoad = (double)stats.getTotalLoad()/size;
+      }
+      if (node.getXceiverCount() > (2.0 * avgLoad)) {
+        logr.debug("Node "+NodeBase.getPath(node)+
+                  " is not chosen because the node is too busy");
+        return false;
+      }
+    }
+      
+    // check if the target rack has chosen too many nodes
+    String rackname = node.getNetworkLocation();
+    int counter=1;
+    for(Iterator<DatanodeDescriptor> iter = results.iterator();
+        iter.hasNext();) {
+      Node result = iter.next();
+      if (rackname.equals(result.getNetworkLocation())) {
+        counter++;
+      }
+    }
+    if (counter>maxTargetPerLoc) {
+      logr.debug("Node "+NodeBase.getPath(node)+
+                " is not chosen because the rack has too many chosen nodes");
+      return false;
+    }
+    return true;
+  }
+    
+  /* Return a pipeline of nodes.
+   * The pipeline is formed finding a shortest path that 
+   * starts from the writer and traverses all <i>nodes</i>
+   * This is basically a traveling salesman problem.
+   */
+  private DatanodeDescriptor[] getPipeline(
+                                           DatanodeDescriptor writer,
+                                           DatanodeDescriptor[] nodes) {
+    if (nodes.length==0) return nodes;
+      
+    synchronized(clusterMap) {
+      int index=0;
+      if (writer == null || !clusterMap.contains(writer)) {
+        writer = nodes[0];
+      }
+      for(;index<nodes.length; index++) {
+        DatanodeDescriptor shortestNode = nodes[index];
+        int shortestDistance = clusterMap.getDistance(writer, shortestNode);
+        int shortestIndex = index;
+        for(int i=index+1; i<nodes.length; i++) {
+          DatanodeDescriptor currentNode = nodes[i];
+          int currentDistance = clusterMap.getDistance(writer, currentNode);
+          if (shortestDistance>currentDistance) {
+            shortestDistance = currentDistance;
+            shortestNode = currentNode;
+            shortestIndex = i;
+          }
+        }
+        //switch position index & shortestIndex
+        if (index != shortestIndex) {
+          nodes[shortestIndex] = nodes[index];
+          nodes[index] = shortestNode;
+        }
+        writer = shortestNode;
+      }
+    }
+    return nodes;
+  }
+
+  /** {@inheritDoc} */
+  public int verifyBlockPlacement(String srcPath,
+                                  LocatedBlock lBlk,
+                                  int minRacks) {
+    DatanodeInfo[] locs = lBlk.getLocations();
+    if (locs == null)
+      locs = new DatanodeInfo[0];
+    int numRacks = clusterMap.getNumOfRacks();
+    if(numRacks <= 1) // only one rack
+      return 0;
+    minRacks = Math.min(minRacks, numRacks);
+    // 1. Check that all locations are different.
+    // 2. Count locations on different racks.
+    Set<String> racks = new TreeSet<String>();
+    for (DatanodeInfo dn : locs)
+      racks.add(dn.getNetworkLocation());
+    return minRacks - racks.size();
+  }
+
+  /** {@inheritDoc} */
+  public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode,
+                                                 Block block,
+                                                 short replicationFactor,
+                                                 Collection<DatanodeDescriptor> first, 
+                                                 Collection<DatanodeDescriptor> second) {
+    long minSpace = Long.MAX_VALUE;
+    DatanodeDescriptor cur = null;
+
+    // pick replica from the first Set. If first is empty, then pick replicas
+    // from second set.
+    Iterator<DatanodeDescriptor> iter =
+          first.isEmpty() ? second.iterator() : first.iterator();
+
+    // pick node with least free space
+    while (iter.hasNext() ) {
+      DatanodeDescriptor node = iter.next();
+      long free = node.getRemaining();
+      if (minSpace > free) {
+        minSpace = free;
+        cur = node;
+      }
+    }
+    return cur;
+  }
+}
+

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=815001&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Tue Sep 15 05:38:57 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+/** 
+ * This interface is used for retrieving the load related statistics of 
+ * the cluster.
+ */
+public interface FSClusterStats {
+
+  /**
+   * an indication of the total load of the cluster.
+   * 
+   * @return a count of the total number of block transfers and block
+   *         writes that are currently occuring on the cluster.
+   */
+
+  public int getTotalLoad() ;
+}
+    
+    

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Sep 15 05:38:57 2009
@@ -932,6 +932,23 @@
     }
     return fullPathName.toString();
   }
+
+  /** Return the full path name of the specified inode */
+  static String getFullPathName(INode inode) {
+    // calculate the depth of this inode from root
+    int depth = 0;
+    for (INode i = inode; i != null; i = i.parent) {
+      depth++;
+    }
+    INode[] inodes = new INode[depth];
+
+    // fill up the inodes in the path from this inode to root
+    for (int i = 0; i < depth; i++) {
+      inodes[depth-i-1] = inode;
+      inode = inode.parent;
+    }
+    return getFullPathName(inodes, depth-1);
+  }
   
   /**
    * Create a directory 

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java?rev=815001&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java Tue Sep 15 05:38:57 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+/** 
+ * This interface is used used the pluggable block placement policy
+ * to expose a few characteristics of an Inode.
+ */
+public interface FSInodeInfo {
+
+  /**
+   * a string representation of an inode
+   * 
+   * @return the full pathname (from root) that this inode represents
+   */
+
+  public String getFullPathName() ;
+}
+    
+    

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Sep 15 05:38:57 2009
@@ -90,7 +90,7 @@
  * 4)  machine --> blocklist (inverted #2)
  * 5)  LRU cache of updated-heartbeat machines
  ***************************************************/
-public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterStats {
   public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
   public static final String AUDIT_FORMAT =
     "ugi=%s\t" +  // ugi
@@ -1123,7 +1123,7 @@
 
     // choose targets for the new block to be allocated.
     DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget(
-        replication, clientNode, null, blockSize);
+        src, replication, clientNode, blockSize);
     if (targets.length < blockManager.minReplication) {
       throw new IOException("File " + src + " could only be replicated to " +
                             targets.length + " nodes, instead of " +
@@ -2344,8 +2344,10 @@
   void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, 
                               Block b, short replication,
                               DatanodeDescriptor addedNode,
-                              DatanodeDescriptor delNodeHint) {
+                              DatanodeDescriptor delNodeHint,
+                              BlockPlacementPolicy replicator) {
     // first form a rack to datanodes map and
+    INodeFile inode = blockManager.getINode(b);
     HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
       new HashMap<String, ArrayList<DatanodeDescriptor>>();
     for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
@@ -2389,17 +2391,7 @@
             (priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) {
           cur = delNodeHint;
       } else { // regular excessive replica removal
-        Iterator<DatanodeDescriptor> iter = 
-          priSet.isEmpty() ? remains.iterator() : priSet.iterator();
-          while( iter.hasNext() ) {
-            DatanodeDescriptor node = iter.next();
-            long free = node.getRemaining();
-
-            if (minSpace > free) {
-              minSpace = free;
-              cur = node;
-            }
-          }
+        cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, remains);
       }
 
       firstOne = false;

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Tue Sep 15 05:38:57 2009
@@ -33,7 +33,7 @@
  * This is a base INode class containing common fields for file and 
  * directory inodes.
  */
-abstract class INode implements Comparable<byte[]> {
+abstract class INode implements Comparable<byte[]>, FSInodeInfo {
   protected byte[] name;
   protected INodeDirectory parent;
   protected long modificationTime;
@@ -247,6 +247,12 @@
   }
 
   /** {@inheritDoc} */
+  public String getFullPathName() {
+    // Get the full path name of this inode.
+    return FSDirectory.getFullPathName(this);
+  }
+
+  /** {@inheritDoc} */
   public String toString() {
     return "\"" + getLocalName() + "\":" + getPermissionStatus();
   }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Tue Sep 15 05:38:57 2009
@@ -253,8 +253,8 @@
                     locs.length + " replica(s).");
       }
       // verify block placement policy
-      int missingRacks = ReplicationTargetChooser.verifyBlockPlacement(
-                    lBlk, targetFileReplication, networktopology);
+      int missingRacks = BlockPlacementPolicy.getInstance(conf, null, networktopology).
+                           verifyBlockPlacement(path, lBlk, Math.min(2,targetFileReplication));
       if (missingRacks > 0) {
         res.numMisReplicatedBlocks++;
         misReplicatedPerFile++;

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java Tue Sep 15 05:38:57 2009
@@ -1,514 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
-import java.util.*;
-
-/** The class is responsible for choosing the desired number of targets
- * for placing block replicas.
- * The replica placement strategy is that if the writer is on a datanode,
- * the 1st replica is placed on the local machine, 
- * otherwise a random datanode. The 2nd replica is placed on a datanode
- * that is on a different rack. The 3rd replica is placed on a datanode
- * which is on a different node of the rack as the second replica.
- */
-class ReplicationTargetChooser {
-  private final boolean considerLoad; 
-  private NetworkTopology clusterMap;
-  private FSNamesystem fs;
-    
-  ReplicationTargetChooser(boolean considerLoad,  FSNamesystem fs,
-                           NetworkTopology clusterMap) {
-    this.considerLoad = considerLoad;
-    this.fs = fs;
-    this.clusterMap = clusterMap;
-  }
-    
-  private static class NotEnoughReplicasException extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    NotEnoughReplicasException(String msg) {
-      super(msg);
-    }
-  }
-    
-  /**
-   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> to replicate
-   * a block with size <i>blocksize</i> 
-   * If not, return as many as we can.
-   * 
-   * @param numOfReplicas: number of replicas wanted.
-   * @param writer: the writer's machine, null if not in the cluster.
-   * @param excludedNodes: datanodes that should not be considered targets.
-   * @param blocksize: size of the data to be written.
-   * @return array of DatanodeDescriptor instances chosen as targets
-   * and sorted as a pipeline.
-   */
-  DatanodeDescriptor[] chooseTarget(int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    HashMap<Node, Node> excludedNodes,
-                                    long blocksize) {
-    if (excludedNodes == null) {
-      excludedNodes = new HashMap<Node, Node>();
-    }
-      
-    return chooseTarget(numOfReplicas, writer, 
-                        new ArrayList<DatanodeDescriptor>(), excludedNodes, blocksize);
-  }
-    
-  /**
-   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
-   * to re-replicate a block with size <i>blocksize</i> 
-   * If not, return as many as we can.
-   * 
-   * @param numOfReplicas: additional number of replicas wanted.
-   * @param writer: the writer's machine, null if not in the cluster.
-   * @param choosenNodes: datanodes that have been chosen as targets.
-   * @param excludedNodes: datanodes that should not be considered targets.
-   * @param blocksize: size of the data to be written.
-   * @return array of DatanodeDescriptor instances chosen as target 
-   * and sorted as a pipeline.
-   */
-  DatanodeDescriptor[] chooseTarget(int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    List<DatanodeDescriptor> choosenNodes,
-                                    HashMap<Node, Node> excludedNodes,
-                                    long blocksize) {
-    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
-      return new DatanodeDescriptor[0];
-    }
-      
-    if (excludedNodes == null) {
-      excludedNodes = new HashMap<Node, Node>();
-    }
-      
-    int clusterSize = clusterMap.getNumOfLeaves();
-    int totalNumOfReplicas = choosenNodes.size()+numOfReplicas;
-    if (totalNumOfReplicas > clusterSize) {
-      numOfReplicas -= (totalNumOfReplicas-clusterSize);
-      totalNumOfReplicas = clusterSize;
-    }
-      
-    int maxNodesPerRack = 
-      (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
-      
-    List<DatanodeDescriptor> results = 
-      new ArrayList<DatanodeDescriptor>(choosenNodes);
-    for (Node node:choosenNodes) {
-      excludedNodes.put(node, node);
-    }
-      
-    if (!clusterMap.contains(writer)) {
-      writer=null;
-    }
-      
-    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
-                                                excludedNodes, blocksize, maxNodesPerRack, results);
-      
-    results.removeAll(choosenNodes);
-      
-    // sorting nodes to form a pipeline
-    return getPipeline((writer==null)?localNode:writer,
-                       results.toArray(new DatanodeDescriptor[results.size()]));
-  }
-    
-  /* choose <i>numOfReplicas</i> from all data nodes */
-  private DatanodeDescriptor chooseTarget(int numOfReplicas,
-                                          DatanodeDescriptor writer,
-                                          HashMap<Node, Node> excludedNodes,
-                                          long blocksize,
-                                          int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) {
-      
-    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
-      return writer;
-    }
-      
-    int numOfResults = results.size();
-    boolean newBlock = (numOfResults==0);
-    if (writer == null && !newBlock) {
-      writer = results.get(0);
-    }
-      
-    try {
-      if (numOfResults == 0) {
-        writer = chooseLocalNode(writer, excludedNodes, 
-                                 blocksize, maxNodesPerRack, results);
-        if (--numOfReplicas == 0) {
-          return writer;
-        }
-      }
-      if (numOfResults <= 1) {
-        chooseRemoteRack(1, results.get(0), excludedNodes, 
-                         blocksize, maxNodesPerRack, results);
-        if (--numOfReplicas == 0) {
-          return writer;
-        }
-      }
-      if (numOfResults <= 2) {
-        if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
-          chooseRemoteRack(1, results.get(0), excludedNodes,
-                           blocksize, maxNodesPerRack, results);
-        } else if (newBlock){
-          chooseLocalRack(results.get(1), excludedNodes, blocksize, 
-                          maxNodesPerRack, results);
-        } else {
-          chooseLocalRack(writer, excludedNodes, blocksize,
-                          maxNodesPerRack, results);
-        }
-        if (--numOfReplicas == 0) {
-          return writer;
-        }
-      }
-      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
-                   blocksize, maxNodesPerRack, results);
-    } catch (NotEnoughReplicasException e) {
-      FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
-               + numOfReplicas);
-    }
-    return writer;
-  }
-    
-  /* choose <i>localMachine</i> as the target.
-   * if <i>localMachine</i> is not available, 
-   * choose a node on the same rack
-   * @return the chosen node
-   */
-  private DatanodeDescriptor chooseLocalNode(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
-    throws NotEnoughReplicasException {
-    // if no local machine, randomly choose one node
-    if (localMachine == null)
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
-      
-    // otherwise try local machine first
-    Node oldNode = excludedNodes.put(localMachine, localMachine);
-    if (oldNode == null) { // was not in the excluded list
-      if (isGoodTarget(localMachine, blocksize,
-                       maxNodesPerRack, false, results)) {
-        results.add(localMachine);
-        return localMachine;
-      }
-    } 
-      
-    // try a node on local rack
-    return chooseLocalRack(localMachine, excludedNodes, 
-                           blocksize, maxNodesPerRack, results);
-  }
-    
-  /* choose one node from the rack that <i>localMachine</i> is on.
-   * if no such node is available, choose one node from the rack where
-   * a second replica is on.
-   * if still no such node is available, choose a random node 
-   * in the cluster.
-   * @return the chosen node
-   */
-  private DatanodeDescriptor chooseLocalRack(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
-    throws NotEnoughReplicasException {
-    // no local machine, so choose a random machine
-    if (localMachine == null) {
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
-    }
-      
-    // choose one from the local rack
-    try {
-      return chooseRandom(
-                          localMachine.getNetworkLocation(),
-                          excludedNodes, blocksize, maxNodesPerRack, results);
-    } catch (NotEnoughReplicasException e1) {
-      // find the second replica
-      DatanodeDescriptor newLocal=null;
-      for(Iterator<DatanodeDescriptor> iter=results.iterator();
-          iter.hasNext();) {
-        DatanodeDescriptor nextNode = iter.next();
-        if (nextNode != localMachine) {
-          newLocal = nextNode;
-          break;
-        }
-      }
-      if (newLocal != null) {
-        try {
-          return chooseRandom(
-                              newLocal.getNetworkLocation(),
-                              excludedNodes, blocksize, maxNodesPerRack, results);
-        } catch(NotEnoughReplicasException e2) {
-          //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes,
-                              blocksize, maxNodesPerRack, results);
-        }
-      } else {
-        //otherwise randomly choose one from the network
-        return chooseRandom(NodeBase.ROOT, excludedNodes,
-                            blocksize, maxNodesPerRack, results);
-      }
-    }
-  }
-    
-  /* choose <i>numOfReplicas</i> nodes from the racks 
-   * that <i>localMachine</i> is NOT on.
-   * if not enough nodes are available, choose the remaining ones 
-   * from the local rack
-   */
-    
-  private void chooseRemoteRack(int numOfReplicas,
-                                DatanodeDescriptor localMachine,
-                                HashMap<Node, Node> excludedNodes,
-                                long blocksize,
-                                int maxReplicasPerRack,
-                                List<DatanodeDescriptor> results)
-    throws NotEnoughReplicasException {
-    int oldNumOfReplicas = results.size();
-    // randomly choose one node from remote racks
-    try {
-      chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
-                   excludedNodes, blocksize, maxReplicasPerRack, results);
-    } catch (NotEnoughReplicasException e) {
-      chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
-                   localMachine.getNetworkLocation(), excludedNodes, blocksize, 
-                   maxReplicasPerRack, results);
-    }
-  }
-
-  /* Randomly choose one target from <i>nodes</i>.
-   * @return the chosen node
-   */
-  private DatanodeDescriptor chooseRandom(
-                                          String nodes,
-                                          HashMap<Node, Node> excludedNodes,
-                                          long blocksize,
-                                          int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) 
-    throws NotEnoughReplicasException {
-    int numOfAvailableNodes =
-      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
-    while(numOfAvailableNodes > 0) {
-      DatanodeDescriptor choosenNode = 
-        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
-
-      Node oldNode = excludedNodes.put(choosenNode, choosenNode);
-      if (oldNode == null) { // choosendNode was not in the excluded list
-        numOfAvailableNodes--;
-        if (isGoodTarget(choosenNode, blocksize, maxNodesPerRack, results)) {
-          results.add(choosenNode);
-          return choosenNode;
-        }
-      }
-    }
-
-    throw new NotEnoughReplicasException(
-        "Not able to place enough replicas");
-  }
-    
-  /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
-   */
-  private void chooseRandom(int numOfReplicas,
-                            String nodes,
-                            HashMap<Node, Node> excludedNodes,
-                            long blocksize,
-                            int maxNodesPerRack,
-                            List<DatanodeDescriptor> results)
-    throws NotEnoughReplicasException {
-      
-    int numOfAvailableNodes =
-      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
-    while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
-      DatanodeDescriptor choosenNode = 
-        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
-      Node oldNode = excludedNodes.put(choosenNode, choosenNode);
-      if (oldNode == null) {
-        numOfAvailableNodes--;
-
-        if (isGoodTarget(choosenNode, blocksize, maxNodesPerRack, results)) {
-          numOfReplicas--;
-          results.add(choosenNode);
-        }
-      }
-    }
-      
-    if (numOfReplicas>0) {
-      throw new NotEnoughReplicasException(
-                                           "Not able to place enough replicas");
-    }
-  }
-    
-  /* judge if a node is a good target.
-   * return true if <i>node</i> has enough space, 
-   * does not have too much load, and the rack does not have too many nodes
-   */
-  private boolean isGoodTarget(DatanodeDescriptor node,
-                               long blockSize, int maxTargetPerLoc,
-                               List<DatanodeDescriptor> results) {
-    return isGoodTarget(node, blockSize, maxTargetPerLoc,
-                        this.considerLoad, results);
-  }
-    
-  private boolean isGoodTarget(DatanodeDescriptor node,
-                               long blockSize, int maxTargetPerLoc,
-                               boolean considerLoad,
-                               List<DatanodeDescriptor> results) {
-    Log logr = FSNamesystem.LOG;
-    // check if the node is (being) decommissed
-    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      logr.debug("Node "+NodeBase.getPath(node)+
-                " is not chosen because the node is (being) decommissioned");
-      return false;
-    }
-
-    long remaining = node.getRemaining() - 
-                     (node.getBlocksScheduled() * blockSize); 
-    // check the remaining capacity of the target machine
-    if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
-      logr.debug("Node "+NodeBase.getPath(node)+
-                " is not chosen because the node does not have enough space");
-      return false;
-    }
-      
-    // check the communication traffic of the target machine
-    if (considerLoad) {
-      double avgLoad = 0;
-      int size = clusterMap.getNumOfLeaves();
-      if (size != 0) {
-        avgLoad = (double)fs.getTotalLoad()/size;
-      }
-      if (node.getXceiverCount() > (2.0 * avgLoad)) {
-        logr.debug("Node "+NodeBase.getPath(node)+
-                  " is not chosen because the node is too busy");
-        return false;
-      }
-    }
-      
-    // check if the target rack has chosen too many nodes
-    String rackname = node.getNetworkLocation();
-    int counter=1;
-    for(Iterator<DatanodeDescriptor> iter = results.iterator();
-        iter.hasNext();) {
-      Node result = iter.next();
-      if (rackname.equals(result.getNetworkLocation())) {
-        counter++;
-      }
-    }
-    if (counter>maxTargetPerLoc) {
-      logr.debug("Node "+NodeBase.getPath(node)+
-                " is not chosen because the rack has too many chosen nodes");
-      return false;
-    }
-    return true;
-  }
-    
-  /* Return a pipeline of nodes.
-   * The pipeline is formed finding a shortest path that 
-   * starts from the writer and traverses all <i>nodes</i>
-   * This is basically a traveling salesman problem.
-   */
-  private DatanodeDescriptor[] getPipeline(
-                                           DatanodeDescriptor writer,
-                                           DatanodeDescriptor[] nodes) {
-    if (nodes.length==0) return nodes;
-      
-    synchronized(clusterMap) {
-      int index=0;
-      if (writer == null || !clusterMap.contains(writer)) {
-        writer = nodes[0];
-      }
-      for(;index<nodes.length; index++) {
-        DatanodeDescriptor shortestNode = nodes[index];
-        int shortestDistance = clusterMap.getDistance(writer, shortestNode);
-        int shortestIndex = index;
-        for(int i=index+1; i<nodes.length; i++) {
-          DatanodeDescriptor currentNode = nodes[i];
-          int currentDistance = clusterMap.getDistance(writer, currentNode);
-          if (shortestDistance>currentDistance) {
-            shortestDistance = currentDistance;
-            shortestNode = currentNode;
-            shortestIndex = i;
-          }
-        }
-        //switch position index & shortestIndex
-        if (index != shortestIndex) {
-          nodes[shortestIndex] = nodes[index];
-          nodes[index] = shortestNode;
-        }
-        writer = shortestNode;
-      }
-    }
-    return nodes;
-  }
-
-  /**
-   * Verify that the block is replicated on at least 2 different racks
-   * if there is more than one rack in the system.
-   * 
-   * @param lBlk block with locations
-   * @param cluster 
-   * @return 1 if the block must be replicated on additional rack,
-   * or 0 if the number of racks is sufficient.
-   */
-  public static int verifyBlockPlacement(LocatedBlock lBlk,
-                                         short replication,
-                                         NetworkTopology cluster) {
-    int numRacks = verifyBlockPlacement(lBlk, Math.min(2,replication), cluster);
-    return numRacks < 0 ? 0 : numRacks;
-  }
-
-  /**
-   * Verify that the block is replicated on at least minRacks different racks
-   * if there is more than minRacks rack in the system.
-   * 
-   * @param lBlk block with locations
-   * @param minRacks number of racks the block should be replicated to
-   * @param cluster 
-   * @return the difference between the required and the actual number of racks
-   * the block is replicated to.
-   */
-  public static int verifyBlockPlacement(LocatedBlock lBlk,
-                                         int minRacks,
-                                         NetworkTopology cluster) {
-    DatanodeInfo[] locs = lBlk.getLocations();
-    if (locs == null)
-      locs = new DatanodeInfo[0];
-    int numRacks = cluster.getNumOfRacks();
-    if(numRacks <= 1) // only one rack
-      return 0;
-    minRacks = Math.min(minRacks, numRacks);
-    // 1. Check that all locations are different.
-    // 2. Count locations on different racks.
-    Set<String> racks = new TreeSet<String>();
-    for (DatanodeInfo dn : locs)
-      racks.add(dn.getNetworkLocation());
-    return minRacks - racks.size();
-  }
-} //end of Replicator
-

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java Tue Sep 15 05:38:57 2009
@@ -38,7 +38,8 @@
   private static final Configuration CONF = new Configuration();
   private static final NetworkTopology cluster;
   private static final NameNode namenode;
-  private static final ReplicationTargetChooser replicator;
+  private static final BlockPlacementPolicy replicator;
+  private static final String filename = "/dummyfile.txt";
   private static final DatanodeDescriptor dataNodes[] = 
     new DatanodeDescriptor[] {
       new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
@@ -91,30 +92,30 @@
         FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 4); // overloaded
 
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(
-                                      0, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(
-                                      1, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertEquals(targets[0], dataNodes[0]);
     
-    targets = replicator.chooseTarget(
-                                      2, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertEquals(targets[0], dataNodes[0]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(
-                                      3, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      3, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 3);
     assertEquals(targets[0], dataNodes[0]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
 
-    targets = replicator.chooseTarget(
-                                     4, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                     4, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 4);
     assertEquals(targets[0], dataNodes[0]);
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
@@ -137,41 +138,47 @@
   public void testChooseTarget2() throws Exception { 
     HashMap<Node, Node> excludedNodes;
     DatanodeDescriptor[] targets;
+    BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
     
     excludedNodes = new HashMap<Node, Node>();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = replicator.chooseTarget(
-                                      0, dataNodes[0], excludedNodes, BLOCK_SIZE);
+    targets = repl.chooseTarget(
+                                0, dataNodes[0], chosenNodes, excludedNodes, BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
     excludedNodes.clear();
+    chosenNodes.clear();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = replicator.chooseTarget(
-                                      1, dataNodes[0], excludedNodes, BLOCK_SIZE);
+    targets = repl.chooseTarget(
+                                1, dataNodes[0], chosenNodes, excludedNodes, BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertEquals(targets[0], dataNodes[0]);
     
     excludedNodes.clear();
+    chosenNodes.clear();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = replicator.chooseTarget(
-                                      2, dataNodes[0], excludedNodes, BLOCK_SIZE);
+    targets = repl.chooseTarget(
+                                2, dataNodes[0], chosenNodes, excludedNodes, BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertEquals(targets[0], dataNodes[0]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
     excludedNodes.clear();
+    chosenNodes.clear();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = replicator.chooseTarget(
-                                      3, dataNodes[0], excludedNodes, BLOCK_SIZE);
+    targets = repl.chooseTarget(
+                                3, dataNodes[0], chosenNodes, excludedNodes, BLOCK_SIZE);
     assertEquals(targets.length, 3);
     assertEquals(targets[0], dataNodes[0]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
     
     excludedNodes.clear();
+    chosenNodes.clear();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = replicator.chooseTarget(
-                                      4, dataNodes[0], excludedNodes, BLOCK_SIZE);
+    targets = repl.chooseTarget(
+                                4, dataNodes[0], chosenNodes, excludedNodes, BLOCK_SIZE);
     assertEquals(targets.length, 4);
     assertEquals(targets[0], dataNodes[0]);
     for(int i=1; i<4; i++) {
@@ -197,30 +204,30 @@
         (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0); // no space
         
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(
-                                      0, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(
-                                      1, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertEquals(targets[0], dataNodes[1]);
     
-    targets = replicator.chooseTarget(
-                                      2, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertEquals(targets[0], dataNodes[1]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(
-                                      3, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      3, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 3);
     assertEquals(targets[0], dataNodes[1]);
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(
-                                      4, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      4, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 4);
     assertEquals(targets[0], dataNodes[1]);
     for(int i=1; i<4; i++) {
@@ -252,23 +259,23 @@
     }
       
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(
-                                      0, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(
-                                      1, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
     
-    targets = replicator.chooseTarget(
-                                      2, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(
-                                      3, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      3, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 3);
     for(int i=0; i<3; i++) {
       assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
@@ -292,21 +299,21 @@
    */
   public void testChooseTarget5() throws Exception {
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(
-                                      0, NODE, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      0, NODE, BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(
-                                      1, NODE, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      1, NODE, BLOCK_SIZE);
     assertEquals(targets.length, 1);
     
-    targets = replicator.chooseTarget(
-                                      2, NODE, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      2, NODE, BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(
-                                      3, NODE, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      3, NODE, BLOCK_SIZE);
     assertEquals(targets.length, 3);
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));    
@@ -324,23 +331,23 @@
     chosenNodes.add(dataNodes[0]);    
     DatanodeDescriptor[] targets;
     
-    targets = replicator.chooseTarget(
-                                      0, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(
-                                      1, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
     
-    targets = replicator.chooseTarget(
-                                      2, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(
-                                      3, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      3, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 3);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
@@ -359,17 +366,17 @@
     chosenNodes.add(dataNodes[1]);
 
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(
-                                      0, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(
-                                      1, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
     
-    targets = replicator.chooseTarget(
-                                      2, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
@@ -388,29 +395,29 @@
     chosenNodes.add(dataNodes[2]);
     
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(
-                                      0, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(
-                                      1, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameRack(dataNodes[2], targets[0]));
     
-    targets = replicator.chooseTarget(
-                               1, dataNodes[2], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                               1, dataNodes[2], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
 
-    targets = replicator.chooseTarget(
-                                      2, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     
-    targets = replicator.chooseTarget(
-                               2, dataNodes[2], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                               2, dataNodes[2], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
   }