You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by dh...@apache.org on 2009/09/15 07:38:57 UTC
svn commit: r815001 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/server/balancer/
src/java/org/apache/hadoop/hdfs/server/namenode/
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Author: dhruba
Date: Tue Sep 15 05:38:57 2009
New Revision: 815001
URL: http://svn.apache.org/viewvc?rev=815001&view=rev
Log:
HDFS-385. Add support for an API that allows a module external
to HDFS to specify how HDFS blocks should be placed. (dhruba)
Added:
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Sep 15 05:38:57 2009
@@ -37,6 +37,9 @@
HDFS-235. Add support for byte ranges in HftpFileSystem to serve
range of bytes from a file. (Bill Zeller via suresh)
+ HDFS-385. Add support for an experimental API that allows a module external
+ to HDFS to specify how HDFS blocks should be placed. (dhruba)
+
IMPROVEMENTS
HDFS-381. Remove blocks from DataNode maps when corresponding file
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Tue Sep 15 05:38:57 2009
@@ -25,6 +25,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.lang.Class;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -64,6 +65,9 @@
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.io.IOUtils;
@@ -772,18 +776,31 @@
}
}
}
+
+ /* Check that this Balancer is compatible with the Block Placement Policy
+ * used by the Namenode.
+ */
+ private void checkReplicationPolicyCompatibility(Configuration conf) throws UnsupportedActionException {
+ if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() !=
+ BlockPlacementPolicyDefault.class) {
+ throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
+ }
+ }
/** Default constructor */
- Balancer() {
+ Balancer() throws UnsupportedActionException {
+ checkReplicationPolicyCompatibility(getConf());
}
/** Construct a balancer from the given configuration */
- Balancer(Configuration conf) {
+ Balancer(Configuration conf) throws UnsupportedActionException {
+ checkReplicationPolicyCompatibility(conf);
setConf(conf);
}
/** Construct a balancer from the given configuration and threshold */
- Balancer(Configuration conf, double threshold) {
+ Balancer(Configuration conf, double threshold) throws UnsupportedActionException {
+ checkReplicationPolicyCompatibility(conf);
setConf(conf);
this.threshold = threshold;
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Tue Sep 15 05:38:57 2009
@@ -117,7 +117,7 @@
Random r = new Random();
// for block replicas placement
- ReplicationTargetChooser replicator;
+ BlockPlacementPolicy replicator;
BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
this(fsn, conf, DEFAULT_INITIAL_MAP_CAPACITY);
@@ -134,8 +134,8 @@
}
void setConfigurationParameters(Configuration conf) throws IOException {
- this.replicator = new ReplicationTargetChooser(
- conf.getBoolean("dfs.replication.considerLoad", true),
+ this.replicator = BlockPlacementPolicy.getInstance(
+ conf,
namesystem,
namesystem.clusterMap);
@@ -639,12 +639,13 @@
int requiredReplication, numEffectiveReplicas;
List<DatanodeDescriptor> containingNodes;
DatanodeDescriptor srcNode;
+ INodeFile fileINode = null;
int additionalReplRequired;
synchronized (namesystem) {
synchronized (neededReplications) {
// block should belong to a file
- INodeFile fileINode = blocksMap.getINode(block);
+ fileINode = blocksMap.getINode(block);
// abandoned block or block reopened for append
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
@@ -691,9 +692,11 @@
}
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
+ // It is costly to extract the filename for which chooseTargets is called,
+ // so for now we pass in the Inode itself.
DatanodeDescriptor targets[] =
- replicator.chooseTarget(additionalReplRequired,
- srcNode, containingNodes, null, block.getNumBytes());
+ replicator.chooseTarget(fileINode, additionalReplRequired,
+ srcNode, containingNodes, block.getNumBytes());
if(targets.length == 0)
return false;
@@ -701,7 +704,7 @@
synchronized (neededReplications) {
// Recheck since global lock was released
// block should belong to a file
- INodeFile fileINode = blocksMap.getINode(block);
+ fileINode = blocksMap.getINode(block);
// abandoned block or block reopened for append
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
@@ -1162,7 +1165,7 @@
}
}
namesystem.chooseExcessReplicates(nonExcess, block, replication,
- addedNode, delNodeHint);
+ addedNode, delNodeHint, replicator);
}
void addToExcessReplicate(DatanodeInfo dn, Block block) {
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java?rev=815001&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java Tue Sep 15 05:38:57 2009
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.util.ReflectionUtils;
+import java.util.*;
+
+/**
+ * This interface is used for choosing the desired number of targets
+ * for placing block replicas.
+ */
+public abstract class BlockPlacementPolicy {
+
+ public static class NotEnoughReplicasException extends Exception {
+ private static final long serialVersionUID = 1L;
+ NotEnoughReplicasException(String msg) {
+ super(msg);
+ }
+ }
+
+ /**
+ * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
+ * to re-replicate a block with size <i>blocksize</i>
+ * If not, return as many as we can.
+ *
+ * @param srcPath the file to which this chooseTargets is being invoked.
+ * @param numOfReplicas additional number of replicas wanted.
+ * @param writer the writer's machine, null if not in the cluster.
+ * @param chosenNodes datanodes that have been chosen as targets.
+ * @param blocksize size of the data to be written.
+ * @return array of DatanodeDescriptor instances chosen as target
+ * and sorted as a pipeline.
+ */
+ abstract DatanodeDescriptor[] chooseTarget(String srcPath,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ List<DatanodeDescriptor> chosenNodes,
+ long blocksize);
+
+ /**
+ * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
+ * to re-replicate a block with size <i>blocksize</i>
+ * If not, return as many as we can.
+ * The base implemenatation extracts the pathname of the file from the
+ * specified srcInode, but this could be a costly operation depending on the
+ * file system implementation. Concrete implementations of this class should
+ * override this method to avoid this overhead.
+ *
+ * @param srcInode The inode of the file for which chooseTarget is being invoked.
+ * @param numOfReplicas additional number of replicas wanted.
+ * @param writer the writer's machine, null if not in the cluster.
+ * @param chosenNodes datanodes that have been chosen as targets.
+ * @param blocksize size of the data to be written.
+ * @return array of DatanodeDescriptor instances chosen as target
+ * and sorted as a pipeline.
+ */
+ DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ List<DatanodeDescriptor> chosenNodes,
+ long blocksize) {
+ return chooseTarget(srcInode.getFullPathName(), numOfReplicas, writer,
+ chosenNodes, blocksize);
+ }
+
+ /**
+ * Verify that the block is replicated on at least minRacks different racks
+ * if there is more than minRacks rack in the system.
+ *
+ * @param srcPath the full pathname of the file to be verified
+ * @param lBlk block with locations
+ * @param minRacks number of racks the block should be replicated to
+ * @return the difference between the required and the actual number of racks
+ * the block is replicated to.
+ */
+ abstract public int verifyBlockPlacement(String srcPath,
+ LocatedBlock lBlk,
+ int minRacks);
+ /**
+ * Decide whether deleting the specified replica of the block still makes
+ * the block conform to the configured block placement policy.
+ *
+ * @param srcInode The inode of the file to which the block-to-be-deleted belongs
+ * @param block The block to be deleted
+ * @param replicationFactor The required number of replicas for this block
+ * @param existingReplicas The replica locations of this block that are present
+ on at least two unique racks.
+ * @param moreExistingReplicas Replica locations of this block that are not
+ listed in the previous parameter.
+ * @return the replica that is the best candidate for deletion
+ */
+ abstract public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo srcInode,
+ Block block,
+ short replicationFactor,
+ Collection<DatanodeDescriptor> existingReplicas,
+ Collection<DatanodeDescriptor> moreExistingReplicas);
+
+ /**
+ * Used to setup a BlockPlacementPolicy object. This should be defined by
+ * all implementations of a BlockPlacementPolicy.
+ *
+ * @param conf the configuration object
+ * @param stats retrieve cluster status from here
+ * @param clusterMap cluster topology
+ */
+ abstract protected void initialize(Configuration conf, FSClusterStats stats,
+ NetworkTopology clusterMap);
+
+ /**
+ * Get an instance of the configured Block Placement Policy based on the
+ * value of the configuration paramater dfs.block.replicator.classname.
+ *
+ * @param conf the configuration to be used
+ * @param stats an object thatis used to retrieve the load on the cluster
+ * @param clusterMap the network topology of the cluster
+ * @return an instance of BlockPlacementPolicy
+ */
+ public static BlockPlacementPolicy getInstance(Configuration conf,
+ FSClusterStats stats,
+ NetworkTopology clusterMap) {
+ Class<? extends BlockPlacementPolicy> replicatorClass =
+ conf.getClass("dfs.block.replicator.classname",
+ BlockPlacementPolicyDefault.class,
+ BlockPlacementPolicy.class);
+ BlockPlacementPolicy replicator = (BlockPlacementPolicy) ReflectionUtils.newInstance(
+ replicatorClass, conf);
+ replicator.initialize(conf, stats, clusterMap);
+ return replicator;
+ }
+
+ /**
+ * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
+ * a block with size <i>blocksize</i>
+ * If not, return as many as we can.
+ *
+ * @param srcPath a string representation of the file for which chooseTarget is invoked
+ * @param numOfReplicas number of replicas wanted.
+ * @param writer the writer's machine, null if not in the cluster.
+ * @param blocksize size of the data to be written.
+ * @return array of DatanodeDescriptor instances chosen as targets
+ * and sorted as a pipeline.
+ */
+ DatanodeDescriptor[] chooseTarget(String srcPath,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ long blocksize) {
+ return chooseTarget(srcPath, numOfReplicas, writer,
+ new ArrayList<DatanodeDescriptor>(),
+ blocksize);
+ }
+}
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=815001&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java Tue Sep 15 05:38:57 2009
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import java.util.*;
+
+/** The class is responsible for choosing the desired number of targets
+ * for placing block replicas.
+ * The replica placement strategy is that if the writer is on a datanode,
+ * the 1st replica is placed on the local machine,
+ * otherwise a random datanode. The 2nd replica is placed on a datanode
+ * that is on a different rack. The 3rd replica is placed on a datanode
+ * which is on a different node of the rack as the second replica.
+ */
+public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
+ private boolean considerLoad;
+ private NetworkTopology clusterMap;
+ private FSClusterStats stats;
+
+ BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
+ NetworkTopology clusterMap) {
+ initialize(conf, stats, clusterMap);
+ }
+
+ BlockPlacementPolicyDefault() {
+ }
+
+ /** {@inheritDoc} */
+ public void initialize(Configuration conf, FSClusterStats stats,
+ NetworkTopology clusterMap) {
+ this.considerLoad = conf.getBoolean("dfs.replication.considerLoad", true);
+ this.stats = stats;
+ this.clusterMap = clusterMap;
+ }
+
+ /** {@inheritDoc} */
+ public DatanodeDescriptor[] chooseTarget(String srcPath,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ List<DatanodeDescriptor> chosenNodes,
+ long blocksize) {
+ return chooseTarget(numOfReplicas, writer, chosenNodes, null, blocksize);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ List<DatanodeDescriptor> chosenNodes,
+ long blocksize) {
+ return chooseTarget(numOfReplicas, writer, chosenNodes, null, blocksize);
+ }
+
+ /**
+ * This is not part of the public API but is used by the unit tests.
+ */
+ DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+ DatanodeDescriptor writer,
+ List<DatanodeDescriptor> chosenNodes,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize) {
+ if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
+ return new DatanodeDescriptor[0];
+ }
+
+ if (excludedNodes == null) {
+ excludedNodes = new HashMap<Node, Node>();
+ }
+
+ int clusterSize = clusterMap.getNumOfLeaves();
+ int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
+ if (totalNumOfReplicas > clusterSize) {
+ numOfReplicas -= (totalNumOfReplicas-clusterSize);
+ totalNumOfReplicas = clusterSize;
+ }
+
+ int maxNodesPerRack =
+ (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
+
+ List<DatanodeDescriptor> results =
+ new ArrayList<DatanodeDescriptor>(chosenNodes);
+ for (Node node:chosenNodes) {
+ excludedNodes.put(node, node);
+ }
+
+ if (!clusterMap.contains(writer)) {
+ writer=null;
+ }
+
+ DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
+ excludedNodes, blocksize, maxNodesPerRack, results);
+
+ results.removeAll(chosenNodes);
+
+ // sorting nodes to form a pipeline
+ return getPipeline((writer==null)?localNode:writer,
+ results.toArray(new DatanodeDescriptor[results.size()]));
+ }
+
+ /* choose <i>numOfReplicas</i> from all data nodes */
+ private DatanodeDescriptor chooseTarget(int numOfReplicas,
+ DatanodeDescriptor writer,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List<DatanodeDescriptor> results) {
+
+ if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
+ return writer;
+ }
+
+ int numOfResults = results.size();
+ boolean newBlock = (numOfResults==0);
+ if (writer == null && !newBlock) {
+ writer = results.get(0);
+ }
+
+ try {
+ if (numOfResults == 0) {
+ writer = chooseLocalNode(writer, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ if (--numOfReplicas == 0) {
+ return writer;
+ }
+ }
+ if (numOfResults <= 1) {
+ chooseRemoteRack(1, results.get(0), excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ if (--numOfReplicas == 0) {
+ return writer;
+ }
+ }
+ if (numOfResults <= 2) {
+ if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
+ chooseRemoteRack(1, results.get(0), excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ } else if (newBlock){
+ chooseLocalRack(results.get(1), excludedNodes, blocksize,
+ maxNodesPerRack, results);
+ } else {
+ chooseLocalRack(writer, excludedNodes, blocksize,
+ maxNodesPerRack, results);
+ }
+ if (--numOfReplicas == 0) {
+ return writer;
+ }
+ }
+ chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ } catch (NotEnoughReplicasException e) {
+ FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
+ + numOfReplicas);
+ }
+ return writer;
+ }
+
+ /* choose <i>localMachine</i> as the target.
+ * if <i>localMachine</i> is not available,
+ * choose a node on the same rack
+ * @return the chosen node
+ */
+ private DatanodeDescriptor chooseLocalNode(
+ DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List<DatanodeDescriptor> results)
+ throws NotEnoughReplicasException {
+ // if no local machine, randomly choose one node
+ if (localMachine == null)
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+
+ // otherwise try local machine first
+ Node oldNode = excludedNodes.put(localMachine, localMachine);
+ if (oldNode == null) { // was not in the excluded list
+ if (isGoodTarget(localMachine, blocksize,
+ maxNodesPerRack, false, results)) {
+ results.add(localMachine);
+ return localMachine;
+ }
+ }
+
+ // try a node on local rack
+ return chooseLocalRack(localMachine, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+
+ /* choose one node from the rack that <i>localMachine</i> is on.
+ * if no such node is available, choose one node from the rack where
+ * a second replica is on.
+ * if still no such node is available, choose a random node
+ * in the cluster.
+ * @return the chosen node
+ */
+ private DatanodeDescriptor chooseLocalRack(
+ DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List<DatanodeDescriptor> results)
+ throws NotEnoughReplicasException {
+ // no local machine, so choose a random machine
+ if (localMachine == null) {
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+
+ // choose one from the local rack
+ try {
+ return chooseRandom(
+ localMachine.getNetworkLocation(),
+ excludedNodes, blocksize, maxNodesPerRack, results);
+ } catch (NotEnoughReplicasException e1) {
+ // find the second replica
+ DatanodeDescriptor newLocal=null;
+ for(Iterator<DatanodeDescriptor> iter=results.iterator();
+ iter.hasNext();) {
+ DatanodeDescriptor nextNode = iter.next();
+ if (nextNode != localMachine) {
+ newLocal = nextNode;
+ break;
+ }
+ }
+ if (newLocal != null) {
+ try {
+ return chooseRandom(
+ newLocal.getNetworkLocation(),
+ excludedNodes, blocksize, maxNodesPerRack, results);
+ } catch(NotEnoughReplicasException e2) {
+ //otherwise randomly choose one from the network
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+ } else {
+ //otherwise randomly choose one from the network
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+ }
+ }
+
+ /* choose <i>numOfReplicas</i> nodes from the racks
+ * that <i>localMachine</i> is NOT on.
+ * if not enough nodes are available, choose the remaining ones
+ * from the local rack
+ */
+
+ private void chooseRemoteRack(int numOfReplicas,
+ DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize,
+ int maxReplicasPerRack,
+ List<DatanodeDescriptor> results)
+ throws NotEnoughReplicasException {
+ int oldNumOfReplicas = results.size();
+ // randomly choose one node from remote racks
+ try {
+ chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
+ excludedNodes, blocksize, maxReplicasPerRack, results);
+ } catch (NotEnoughReplicasException e) {
+ chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
+ localMachine.getNetworkLocation(), excludedNodes, blocksize,
+ maxReplicasPerRack, results);
+ }
+ }
+
+ /* Randomly choose one target from <i>nodes</i>.
+ * @return the chosen node
+ */
+ private DatanodeDescriptor chooseRandom(
+ String nodes,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List<DatanodeDescriptor> results)
+ throws NotEnoughReplicasException {
+ int numOfAvailableNodes =
+ clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
+ while(numOfAvailableNodes > 0) {
+ DatanodeDescriptor chosenNode =
+ (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
+
+ Node oldNode = excludedNodes.put(chosenNode, chosenNode);
+ if (oldNode == null) { // choosendNode was not in the excluded list
+ numOfAvailableNodes--;
+ if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+ results.add(chosenNode);
+ return chosenNode;
+ }
+ }
+ }
+
+ throw new NotEnoughReplicasException(
+ "Not able to place enough replicas");
+ }
+
+ /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
+ */
+ private void chooseRandom(int numOfReplicas,
+ String nodes,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List<DatanodeDescriptor> results)
+ throws NotEnoughReplicasException {
+
+ int numOfAvailableNodes =
+ clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
+ while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
+ DatanodeDescriptor chosenNode =
+ (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
+ Node oldNode = excludedNodes.put(chosenNode, chosenNode);
+ if (oldNode == null) {
+ numOfAvailableNodes--;
+
+ if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+ numOfReplicas--;
+ results.add(chosenNode);
+ }
+ }
+ }
+
+ if (numOfReplicas>0) {
+ throw new NotEnoughReplicasException(
+ "Not able to place enough replicas");
+ }
+ }
+
+ /* judge if a node is a good target.
+ * return true if <i>node</i> has enough space,
+ * does not have too much load, and the rack does not have too many nodes
+ */
+ private boolean isGoodTarget(DatanodeDescriptor node,
+ long blockSize, int maxTargetPerLoc,
+ List<DatanodeDescriptor> results) {
+ return isGoodTarget(node, blockSize, maxTargetPerLoc,
+ this.considerLoad, results);
+ }
+
+ private boolean isGoodTarget(DatanodeDescriptor node,
+ long blockSize, int maxTargetPerLoc,
+ boolean considerLoad,
+ List<DatanodeDescriptor> results) {
+ Log logr = FSNamesystem.LOG;
+ // check if the node is (being) decommissed
+ if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+ logr.debug("Node "+NodeBase.getPath(node)+
+ " is not chosen because the node is (being) decommissioned");
+ return false;
+ }
+
+ long remaining = node.getRemaining() -
+ (node.getBlocksScheduled() * blockSize);
+ // check the remaining capacity of the target machine
+ if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
+ logr.debug("Node "+NodeBase.getPath(node)+
+ " is not chosen because the node does not have enough space");
+ return false;
+ }
+
+ // check the communication traffic of the target machine
+ if (considerLoad) {
+ double avgLoad = 0;
+ int size = clusterMap.getNumOfLeaves();
+ if (size != 0 && stats != null) {
+ avgLoad = (double)stats.getTotalLoad()/size;
+ }
+ if (node.getXceiverCount() > (2.0 * avgLoad)) {
+ logr.debug("Node "+NodeBase.getPath(node)+
+ " is not chosen because the node is too busy");
+ return false;
+ }
+ }
+
+ // check if the target rack has chosen too many nodes
+ String rackname = node.getNetworkLocation();
+ int counter=1;
+ for(Iterator<DatanodeDescriptor> iter = results.iterator();
+ iter.hasNext();) {
+ Node result = iter.next();
+ if (rackname.equals(result.getNetworkLocation())) {
+ counter++;
+ }
+ }
+ if (counter>maxTargetPerLoc) {
+ logr.debug("Node "+NodeBase.getPath(node)+
+ " is not chosen because the rack has too many chosen nodes");
+ return false;
+ }
+ return true;
+ }
+
+ /* Return a pipeline of nodes.
+ * The pipeline is formed finding a shortest path that
+ * starts from the writer and traverses all <i>nodes</i>
+ * This is basically a traveling salesman problem.
+ */
+ private DatanodeDescriptor[] getPipeline(
+ DatanodeDescriptor writer,
+ DatanodeDescriptor[] nodes) {
+ if (nodes.length==0) return nodes;
+
+ synchronized(clusterMap) {
+ int index=0;
+ if (writer == null || !clusterMap.contains(writer)) {
+ writer = nodes[0];
+ }
+ for(;index<nodes.length; index++) {
+ DatanodeDescriptor shortestNode = nodes[index];
+ int shortestDistance = clusterMap.getDistance(writer, shortestNode);
+ int shortestIndex = index;
+ for(int i=index+1; i<nodes.length; i++) {
+ DatanodeDescriptor currentNode = nodes[i];
+ int currentDistance = clusterMap.getDistance(writer, currentNode);
+ if (shortestDistance>currentDistance) {
+ shortestDistance = currentDistance;
+ shortestNode = currentNode;
+ shortestIndex = i;
+ }
+ }
+ //switch position index & shortestIndex
+ if (index != shortestIndex) {
+ nodes[shortestIndex] = nodes[index];
+ nodes[index] = shortestNode;
+ }
+ writer = shortestNode;
+ }
+ }
+ return nodes;
+ }
+
+ /** {@inheritDoc} */
+ public int verifyBlockPlacement(String srcPath,
+ LocatedBlock lBlk,
+ int minRacks) {
+ DatanodeInfo[] locs = lBlk.getLocations();
+ if (locs == null)
+ locs = new DatanodeInfo[0];
+ int numRacks = clusterMap.getNumOfRacks();
+ if(numRacks <= 1) // only one rack
+ return 0;
+ minRacks = Math.min(minRacks, numRacks);
+ // 1. Check that all locations are different.
+ // 2. Count locations on different racks.
+ Set<String> racks = new TreeSet<String>();
+ for (DatanodeInfo dn : locs)
+ racks.add(dn.getNetworkLocation());
+ return minRacks - racks.size();
+ }
+
+ /** {@inheritDoc} */
+ public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode,
+ Block block,
+ short replicationFactor,
+ Collection<DatanodeDescriptor> first,
+ Collection<DatanodeDescriptor> second) {
+ long minSpace = Long.MAX_VALUE;
+ DatanodeDescriptor cur = null;
+
+ // pick replica from the first Set. If first is empty, then pick replicas
+ // from second set.
+ Iterator<DatanodeDescriptor> iter =
+ first.isEmpty() ? second.iterator() : first.iterator();
+
+ // pick node with least free space
+ while (iter.hasNext() ) {
+ DatanodeDescriptor node = iter.next();
+ long free = node.getRemaining();
+ if (minSpace > free) {
+ minSpace = free;
+ cur = node;
+ }
+ }
+ return cur;
+ }
+}
+
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=815001&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Tue Sep 15 05:38:57 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+/**
+ * This interface is used for retrieving the load related statistics of
+ * the cluster.
+ */
+public interface FSClusterStats {
+
+ /**
+ * an indication of the total load of the cluster.
+ *
+ * @return a count of the total number of block transfers and block
+ * writes that are currently occuring on the cluster.
+ */
+
+ public int getTotalLoad() ;
+}
+
+
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Sep 15 05:38:57 2009
@@ -932,6 +932,23 @@
}
return fullPathName.toString();
}
+
+ /** Return the full path name of the specified inode */
+ static String getFullPathName(INode inode) {
+ // calculate the depth of this inode from root
+ int depth = 0;
+ for (INode i = inode; i != null; i = i.parent) {
+ depth++;
+ }
+ INode[] inodes = new INode[depth];
+
+ // fill up the inodes in the path from this inode to root
+ for (int i = 0; i < depth; i++) {
+ inodes[depth-i-1] = inode;
+ inode = inode.parent;
+ }
+ return getFullPathName(inodes, depth-1);
+ }
/**
* Create a directory
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java?rev=815001&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java Tue Sep 15 05:38:57 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+/**
+ * This interface is used used the pluggable block placement policy
+ * to expose a few characteristics of an Inode.
+ */
+public interface FSInodeInfo {
+
+ /**
+ * a string representation of an inode
+ *
+ * @return the full pathname (from root) that this inode represents
+ */
+
+ public String getFullPathName() ;
+}
+
+
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Sep 15 05:38:57 2009
@@ -90,7 +90,7 @@
* 4) machine --> blocklist (inverted #2)
* 5) LRU cache of updated-heartbeat machines
***************************************************/
-public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterStats {
public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
public static final String AUDIT_FORMAT =
"ugi=%s\t" + // ugi
@@ -1123,7 +1123,7 @@
// choose targets for the new block to be allocated.
DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget(
- replication, clientNode, null, blockSize);
+ src, replication, clientNode, blockSize);
if (targets.length < blockManager.minReplication) {
throw new IOException("File " + src + " could only be replicated to " +
targets.length + " nodes, instead of " +
@@ -2344,8 +2344,10 @@
void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess,
Block b, short replication,
DatanodeDescriptor addedNode,
- DatanodeDescriptor delNodeHint) {
+ DatanodeDescriptor delNodeHint,
+ BlockPlacementPolicy replicator) {
// first form a rack to datanodes map and
+ INodeFile inode = blockManager.getINode(b);
HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
new HashMap<String, ArrayList<DatanodeDescriptor>>();
for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
@@ -2389,17 +2391,7 @@
(priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) {
cur = delNodeHint;
} else { // regular excessive replica removal
- Iterator<DatanodeDescriptor> iter =
- priSet.isEmpty() ? remains.iterator() : priSet.iterator();
- while( iter.hasNext() ) {
- DatanodeDescriptor node = iter.next();
- long free = node.getRemaining();
-
- if (minSpace > free) {
- minSpace = free;
- cur = node;
- }
- }
+ cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, remains);
}
firstOne = false;
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Tue Sep 15 05:38:57 2009
@@ -33,7 +33,7 @@
* This is a base INode class containing common fields for file and
* directory inodes.
*/
-abstract class INode implements Comparable<byte[]> {
+abstract class INode implements Comparable<byte[]>, FSInodeInfo {
protected byte[] name;
protected INodeDirectory parent;
protected long modificationTime;
@@ -247,6 +247,12 @@
}
/** {@inheritDoc} */
+ public String getFullPathName() {
+ // Get the full path name of this inode.
+ return FSDirectory.getFullPathName(this);
+ }
+
+ /** {@inheritDoc} */
public String toString() {
return "\"" + getLocalName() + "\":" + getPermissionStatus();
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Tue Sep 15 05:38:57 2009
@@ -253,8 +253,8 @@
locs.length + " replica(s).");
}
// verify block placement policy
- int missingRacks = ReplicationTargetChooser.verifyBlockPlacement(
- lBlk, targetFileReplication, networktopology);
+ int missingRacks = BlockPlacementPolicy.getInstance(conf, null, networktopology).
+ verifyBlockPlacement(path, lBlk, Math.min(2,targetFileReplication));
if (missingRacks > 0) {
res.numMisReplicatedBlocks++;
misReplicatedPerFile++;
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java Tue Sep 15 05:38:57 2009
@@ -1,514 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
-import java.util.*;
-
-/** The class is responsible for choosing the desired number of targets
- * for placing block replicas.
- * The replica placement strategy is that if the writer is on a datanode,
- * the 1st replica is placed on the local machine,
- * otherwise a random datanode. The 2nd replica is placed on a datanode
- * that is on a different rack. The 3rd replica is placed on a datanode
- * which is on a different node of the rack as the second replica.
- */
-class ReplicationTargetChooser {
- private final boolean considerLoad;
- private NetworkTopology clusterMap;
- private FSNamesystem fs;
-
- ReplicationTargetChooser(boolean considerLoad, FSNamesystem fs,
- NetworkTopology clusterMap) {
- this.considerLoad = considerLoad;
- this.fs = fs;
- this.clusterMap = clusterMap;
- }
-
- private static class NotEnoughReplicasException extends Exception {
- private static final long serialVersionUID = 1L;
-
- NotEnoughReplicasException(String msg) {
- super(msg);
- }
- }
-
- /**
- * choose <i>numOfReplicas</i> data nodes for <i>writer</i> to replicate
- * a block with size <i>blocksize</i>
- * If not, return as many as we can.
- *
- * @param numOfReplicas: number of replicas wanted.
- * @param writer: the writer's machine, null if not in the cluster.
- * @param excludedNodes: datanodes that should not be considered targets.
- * @param blocksize: size of the data to be written.
- * @return array of DatanodeDescriptor instances chosen as targets
- * and sorted as a pipeline.
- */
- DatanodeDescriptor[] chooseTarget(int numOfReplicas,
- DatanodeDescriptor writer,
- HashMap<Node, Node> excludedNodes,
- long blocksize) {
- if (excludedNodes == null) {
- excludedNodes = new HashMap<Node, Node>();
- }
-
- return chooseTarget(numOfReplicas, writer,
- new ArrayList<DatanodeDescriptor>(), excludedNodes, blocksize);
- }
-
- /**
- * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
- * to re-replicate a block with size <i>blocksize</i>
- * If not, return as many as we can.
- *
- * @param numOfReplicas: additional number of replicas wanted.
- * @param writer: the writer's machine, null if not in the cluster.
- * @param choosenNodes: datanodes that have been chosen as targets.
- * @param excludedNodes: datanodes that should not be considered targets.
- * @param blocksize: size of the data to be written.
- * @return array of DatanodeDescriptor instances chosen as target
- * and sorted as a pipeline.
- */
- DatanodeDescriptor[] chooseTarget(int numOfReplicas,
- DatanodeDescriptor writer,
- List<DatanodeDescriptor> choosenNodes,
- HashMap<Node, Node> excludedNodes,
- long blocksize) {
- if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
- return new DatanodeDescriptor[0];
- }
-
- if (excludedNodes == null) {
- excludedNodes = new HashMap<Node, Node>();
- }
-
- int clusterSize = clusterMap.getNumOfLeaves();
- int totalNumOfReplicas = choosenNodes.size()+numOfReplicas;
- if (totalNumOfReplicas > clusterSize) {
- numOfReplicas -= (totalNumOfReplicas-clusterSize);
- totalNumOfReplicas = clusterSize;
- }
-
- int maxNodesPerRack =
- (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
-
- List<DatanodeDescriptor> results =
- new ArrayList<DatanodeDescriptor>(choosenNodes);
- for (Node node:choosenNodes) {
- excludedNodes.put(node, node);
- }
-
- if (!clusterMap.contains(writer)) {
- writer=null;
- }
-
- DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
- excludedNodes, blocksize, maxNodesPerRack, results);
-
- results.removeAll(choosenNodes);
-
- // sorting nodes to form a pipeline
- return getPipeline((writer==null)?localNode:writer,
- results.toArray(new DatanodeDescriptor[results.size()]));
- }
-
- /* choose <i>numOfReplicas</i> from all data nodes */
- private DatanodeDescriptor chooseTarget(int numOfReplicas,
- DatanodeDescriptor writer,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results) {
-
- if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
- return writer;
- }
-
- int numOfResults = results.size();
- boolean newBlock = (numOfResults==0);
- if (writer == null && !newBlock) {
- writer = results.get(0);
- }
-
- try {
- if (numOfResults == 0) {
- writer = chooseLocalNode(writer, excludedNodes,
- blocksize, maxNodesPerRack, results);
- if (--numOfReplicas == 0) {
- return writer;
- }
- }
- if (numOfResults <= 1) {
- chooseRemoteRack(1, results.get(0), excludedNodes,
- blocksize, maxNodesPerRack, results);
- if (--numOfReplicas == 0) {
- return writer;
- }
- }
- if (numOfResults <= 2) {
- if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
- chooseRemoteRack(1, results.get(0), excludedNodes,
- blocksize, maxNodesPerRack, results);
- } else if (newBlock){
- chooseLocalRack(results.get(1), excludedNodes, blocksize,
- maxNodesPerRack, results);
- } else {
- chooseLocalRack(writer, excludedNodes, blocksize,
- maxNodesPerRack, results);
- }
- if (--numOfReplicas == 0) {
- return writer;
- }
- }
- chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
- } catch (NotEnoughReplicasException e) {
- FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
- + numOfReplicas);
- }
- return writer;
- }
-
- /* choose <i>localMachine</i> as the target.
- * if <i>localMachine</i> is not available,
- * choose a node on the same rack
- * @return the chosen node
- */
- private DatanodeDescriptor chooseLocalNode(
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
- // if no local machine, randomly choose one node
- if (localMachine == null)
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
-
- // otherwise try local machine first
- Node oldNode = excludedNodes.put(localMachine, localMachine);
- if (oldNode == null) { // was not in the excluded list
- if (isGoodTarget(localMachine, blocksize,
- maxNodesPerRack, false, results)) {
- results.add(localMachine);
- return localMachine;
- }
- }
-
- // try a node on local rack
- return chooseLocalRack(localMachine, excludedNodes,
- blocksize, maxNodesPerRack, results);
- }
-
- /* choose one node from the rack that <i>localMachine</i> is on.
- * if no such node is available, choose one node from the rack where
- * a second replica is on.
- * if still no such node is available, choose a random node
- * in the cluster.
- * @return the chosen node
- */
- private DatanodeDescriptor chooseLocalRack(
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
- // no local machine, so choose a random machine
- if (localMachine == null) {
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
- }
-
- // choose one from the local rack
- try {
- return chooseRandom(
- localMachine.getNetworkLocation(),
- excludedNodes, blocksize, maxNodesPerRack, results);
- } catch (NotEnoughReplicasException e1) {
- // find the second replica
- DatanodeDescriptor newLocal=null;
- for(Iterator<DatanodeDescriptor> iter=results.iterator();
- iter.hasNext();) {
- DatanodeDescriptor nextNode = iter.next();
- if (nextNode != localMachine) {
- newLocal = nextNode;
- break;
- }
- }
- if (newLocal != null) {
- try {
- return chooseRandom(
- newLocal.getNetworkLocation(),
- excludedNodes, blocksize, maxNodesPerRack, results);
- } catch(NotEnoughReplicasException e2) {
- //otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
- }
- } else {
- //otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
- }
- }
- }
-
- /* choose <i>numOfReplicas</i> nodes from the racks
- * that <i>localMachine</i> is NOT on.
- * if not enough nodes are available, choose the remaining ones
- * from the local rack
- */
-
- private void chooseRemoteRack(int numOfReplicas,
- DatanodeDescriptor localMachine,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxReplicasPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
- int oldNumOfReplicas = results.size();
- // randomly choose one node from remote racks
- try {
- chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
- excludedNodes, blocksize, maxReplicasPerRack, results);
- } catch (NotEnoughReplicasException e) {
- chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
- localMachine.getNetworkLocation(), excludedNodes, blocksize,
- maxReplicasPerRack, results);
- }
- }
-
- /* Randomly choose one target from <i>nodes</i>.
- * @return the chosen node
- */
- private DatanodeDescriptor chooseRandom(
- String nodes,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
- int numOfAvailableNodes =
- clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
- while(numOfAvailableNodes > 0) {
- DatanodeDescriptor choosenNode =
- (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
-
- Node oldNode = excludedNodes.put(choosenNode, choosenNode);
- if (oldNode == null) { // choosendNode was not in the excluded list
- numOfAvailableNodes--;
- if (isGoodTarget(choosenNode, blocksize, maxNodesPerRack, results)) {
- results.add(choosenNode);
- return choosenNode;
- }
- }
- }
-
- throw new NotEnoughReplicasException(
- "Not able to place enough replicas");
- }
-
- /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
- */
- private void chooseRandom(int numOfReplicas,
- String nodes,
- HashMap<Node, Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
-
- int numOfAvailableNodes =
- clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
- while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
- DatanodeDescriptor choosenNode =
- (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
- Node oldNode = excludedNodes.put(choosenNode, choosenNode);
- if (oldNode == null) {
- numOfAvailableNodes--;
-
- if (isGoodTarget(choosenNode, blocksize, maxNodesPerRack, results)) {
- numOfReplicas--;
- results.add(choosenNode);
- }
- }
- }
-
- if (numOfReplicas>0) {
- throw new NotEnoughReplicasException(
- "Not able to place enough replicas");
- }
- }
-
- /* judge if a node is a good target.
- * return true if <i>node</i> has enough space,
- * does not have too much load, and the rack does not have too many nodes
- */
- private boolean isGoodTarget(DatanodeDescriptor node,
- long blockSize, int maxTargetPerLoc,
- List<DatanodeDescriptor> results) {
- return isGoodTarget(node, blockSize, maxTargetPerLoc,
- this.considerLoad, results);
- }
-
- private boolean isGoodTarget(DatanodeDescriptor node,
- long blockSize, int maxTargetPerLoc,
- boolean considerLoad,
- List<DatanodeDescriptor> results) {
- Log logr = FSNamesystem.LOG;
- // check if the node is (being) decommissed
- if (node.isDecommissionInProgress() || node.isDecommissioned()) {
- logr.debug("Node "+NodeBase.getPath(node)+
- " is not chosen because the node is (being) decommissioned");
- return false;
- }
-
- long remaining = node.getRemaining() -
- (node.getBlocksScheduled() * blockSize);
- // check the remaining capacity of the target machine
- if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
- logr.debug("Node "+NodeBase.getPath(node)+
- " is not chosen because the node does not have enough space");
- return false;
- }
-
- // check the communication traffic of the target machine
- if (considerLoad) {
- double avgLoad = 0;
- int size = clusterMap.getNumOfLeaves();
- if (size != 0) {
- avgLoad = (double)fs.getTotalLoad()/size;
- }
- if (node.getXceiverCount() > (2.0 * avgLoad)) {
- logr.debug("Node "+NodeBase.getPath(node)+
- " is not chosen because the node is too busy");
- return false;
- }
- }
-
- // check if the target rack has chosen too many nodes
- String rackname = node.getNetworkLocation();
- int counter=1;
- for(Iterator<DatanodeDescriptor> iter = results.iterator();
- iter.hasNext();) {
- Node result = iter.next();
- if (rackname.equals(result.getNetworkLocation())) {
- counter++;
- }
- }
- if (counter>maxTargetPerLoc) {
- logr.debug("Node "+NodeBase.getPath(node)+
- " is not chosen because the rack has too many chosen nodes");
- return false;
- }
- return true;
- }
-
- /* Return a pipeline of nodes.
- * The pipeline is formed finding a shortest path that
- * starts from the writer and traverses all <i>nodes</i>
- * This is basically a traveling salesman problem.
- */
- private DatanodeDescriptor[] getPipeline(
- DatanodeDescriptor writer,
- DatanodeDescriptor[] nodes) {
- if (nodes.length==0) return nodes;
-
- synchronized(clusterMap) {
- int index=0;
- if (writer == null || !clusterMap.contains(writer)) {
- writer = nodes[0];
- }
- for(;index<nodes.length; index++) {
- DatanodeDescriptor shortestNode = nodes[index];
- int shortestDistance = clusterMap.getDistance(writer, shortestNode);
- int shortestIndex = index;
- for(int i=index+1; i<nodes.length; i++) {
- DatanodeDescriptor currentNode = nodes[i];
- int currentDistance = clusterMap.getDistance(writer, currentNode);
- if (shortestDistance>currentDistance) {
- shortestDistance = currentDistance;
- shortestNode = currentNode;
- shortestIndex = i;
- }
- }
- //switch position index & shortestIndex
- if (index != shortestIndex) {
- nodes[shortestIndex] = nodes[index];
- nodes[index] = shortestNode;
- }
- writer = shortestNode;
- }
- }
- return nodes;
- }
-
- /**
- * Verify that the block is replicated on at least 2 different racks
- * if there is more than one rack in the system.
- *
- * @param lBlk block with locations
- * @param cluster
- * @return 1 if the block must be replicated on additional rack,
- * or 0 if the number of racks is sufficient.
- */
- public static int verifyBlockPlacement(LocatedBlock lBlk,
- short replication,
- NetworkTopology cluster) {
- int numRacks = verifyBlockPlacement(lBlk, Math.min(2,replication), cluster);
- return numRacks < 0 ? 0 : numRacks;
- }
-
- /**
- * Verify that the block is replicated on at least minRacks different racks
- * if there is more than minRacks rack in the system.
- *
- * @param lBlk block with locations
- * @param minRacks number of racks the block should be replicated to
- * @param cluster
- * @return the difference between the required and the actual number of racks
- * the block is replicated to.
- */
- public static int verifyBlockPlacement(LocatedBlock lBlk,
- int minRacks,
- NetworkTopology cluster) {
- DatanodeInfo[] locs = lBlk.getLocations();
- if (locs == null)
- locs = new DatanodeInfo[0];
- int numRacks = cluster.getNumOfRacks();
- if(numRacks <= 1) // only one rack
- return 0;
- minRacks = Math.min(minRacks, numRacks);
- // 1. Check that all locations are different.
- // 2. Count locations on different racks.
- Set<String> racks = new TreeSet<String>();
- for (DatanodeInfo dn : locs)
- racks.add(dn.getNetworkLocation());
- return minRacks - racks.size();
- }
-} //end of Replicator
-
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java?rev=815001&r1=815000&r2=815001&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java Tue Sep 15 05:38:57 2009
@@ -38,7 +38,8 @@
private static final Configuration CONF = new Configuration();
private static final NetworkTopology cluster;
private static final NameNode namenode;
- private static final ReplicationTargetChooser replicator;
+ private static final BlockPlacementPolicy replicator;
+ private static final String filename = "/dummyfile.txt";
private static final DatanodeDescriptor dataNodes[] =
new DatanodeDescriptor[] {
new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
@@ -91,30 +92,30 @@
FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 4); // overloaded
DatanodeDescriptor[] targets;
- targets = replicator.chooseTarget(
- 0, dataNodes[0], null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 0, dataNodes[0], BLOCK_SIZE);
assertEquals(targets.length, 0);
- targets = replicator.chooseTarget(
- 1, dataNodes[0], null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[0], BLOCK_SIZE);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[0]);
- targets = replicator.chooseTarget(
- 2, dataNodes[0], null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[0], BLOCK_SIZE);
assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
- targets = replicator.chooseTarget(
- 3, dataNodes[0], null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 3, dataNodes[0], BLOCK_SIZE);
assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
- targets = replicator.chooseTarget(
- 4, dataNodes[0], null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 4, dataNodes[0], BLOCK_SIZE);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[0]);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
@@ -137,41 +138,47 @@
public void testChooseTarget2() throws Exception {
HashMap<Node, Node> excludedNodes;
DatanodeDescriptor[] targets;
+ BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
+ List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
excludedNodes = new HashMap<Node, Node>();
excludedNodes.put(dataNodes[1], dataNodes[1]);
- targets = replicator.chooseTarget(
- 0, dataNodes[0], excludedNodes, BLOCK_SIZE);
+ targets = repl.chooseTarget(
+ 0, dataNodes[0], chosenNodes, excludedNodes, BLOCK_SIZE);
assertEquals(targets.length, 0);
excludedNodes.clear();
+ chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]);
- targets = replicator.chooseTarget(
- 1, dataNodes[0], excludedNodes, BLOCK_SIZE);
+ targets = repl.chooseTarget(
+ 1, dataNodes[0], chosenNodes, excludedNodes, BLOCK_SIZE);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[0]);
excludedNodes.clear();
+ chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]);
- targets = replicator.chooseTarget(
- 2, dataNodes[0], excludedNodes, BLOCK_SIZE);
+ targets = repl.chooseTarget(
+ 2, dataNodes[0], chosenNodes, excludedNodes, BLOCK_SIZE);
assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
excludedNodes.clear();
+ chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]);
- targets = replicator.chooseTarget(
- 3, dataNodes[0], excludedNodes, BLOCK_SIZE);
+ targets = repl.chooseTarget(
+ 3, dataNodes[0], chosenNodes, excludedNodes, BLOCK_SIZE);
assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
excludedNodes.clear();
+ chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]);
- targets = replicator.chooseTarget(
- 4, dataNodes[0], excludedNodes, BLOCK_SIZE);
+ targets = repl.chooseTarget(
+ 4, dataNodes[0], chosenNodes, excludedNodes, BLOCK_SIZE);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[0]);
for(int i=1; i<4; i++) {
@@ -197,30 +204,30 @@
(FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0); // no space
DatanodeDescriptor[] targets;
- targets = replicator.chooseTarget(
- 0, dataNodes[0], null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 0, dataNodes[0], BLOCK_SIZE);
assertEquals(targets.length, 0);
- targets = replicator.chooseTarget(
- 1, dataNodes[0], null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[0], BLOCK_SIZE);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[1]);
- targets = replicator.chooseTarget(
- 2, dataNodes[0], null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[0], BLOCK_SIZE);
assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[1]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
- targets = replicator.chooseTarget(
- 3, dataNodes[0], null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 3, dataNodes[0], BLOCK_SIZE);
assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[1]);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
- targets = replicator.chooseTarget(
- 4, dataNodes[0], null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 4, dataNodes[0], BLOCK_SIZE);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[1]);
for(int i=1; i<4; i++) {
@@ -252,23 +259,23 @@
}
DatanodeDescriptor[] targets;
- targets = replicator.chooseTarget(
- 0, dataNodes[0], null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 0, dataNodes[0], BLOCK_SIZE);
assertEquals(targets.length, 0);
- targets = replicator.chooseTarget(
- 1, dataNodes[0], null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[0], BLOCK_SIZE);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
- targets = replicator.chooseTarget(
- 2, dataNodes[0], null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[0], BLOCK_SIZE);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
- targets = replicator.chooseTarget(
- 3, dataNodes[0], null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 3, dataNodes[0], BLOCK_SIZE);
assertEquals(targets.length, 3);
for(int i=0; i<3; i++) {
assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
@@ -292,21 +299,21 @@
*/
public void testChooseTarget5() throws Exception {
DatanodeDescriptor[] targets;
- targets = replicator.chooseTarget(
- 0, NODE, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 0, NODE, BLOCK_SIZE);
assertEquals(targets.length, 0);
- targets = replicator.chooseTarget(
- 1, NODE, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 1, NODE, BLOCK_SIZE);
assertEquals(targets.length, 1);
- targets = replicator.chooseTarget(
- 2, NODE, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 2, NODE, BLOCK_SIZE);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
- targets = replicator.chooseTarget(
- 3, NODE, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 3, NODE, BLOCK_SIZE);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
@@ -324,23 +331,23 @@
chosenNodes.add(dataNodes[0]);
DatanodeDescriptor[] targets;
- targets = replicator.chooseTarget(
- 0, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 0, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 0);
- targets = replicator.chooseTarget(
- 1, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
- targets = replicator.chooseTarget(
- 2, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
- targets = replicator.chooseTarget(
- 3, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 3, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
@@ -359,17 +366,17 @@
chosenNodes.add(dataNodes[1]);
DatanodeDescriptor[] targets;
- targets = replicator.chooseTarget(
- 0, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 0, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 0);
- targets = replicator.chooseTarget(
- 1, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
- targets = replicator.chooseTarget(
- 2, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
@@ -388,29 +395,29 @@
chosenNodes.add(dataNodes[2]);
DatanodeDescriptor[] targets;
- targets = replicator.chooseTarget(
- 0, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 0, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 0);
- targets = replicator.chooseTarget(
- 1, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 1);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(dataNodes[2], targets[0]));
- targets = replicator.chooseTarget(
- 1, dataNodes[2], chosenNodes, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 1, dataNodes[2], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 1);
assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
- targets = replicator.chooseTarget(
- 2, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
- targets = replicator.chooseTarget(
- 2, dataNodes[2], chosenNodes, null, BLOCK_SIZE);
+ targets = replicator.chooseTarget(filename,
+ 2, dataNodes[2], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
}