You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2012/08/06 14:08:18 UTC
svn commit: r1369804 [1/2] - in /hadoop/common/branches/branch-1-win: ./
src/core/org/apache/hadoop/net/
src/hdfs/org/apache/hadoop/hdfs/server/balancer/
src/hdfs/org/apache/hadoop/hdfs/server/namenode/
src/test/org/apache/hadoop/hdfs/server/namenode/
Author: szetszwo
Date: Mon Aug 6 12:08:17 2012
New Revision: 1369804
URL: http://svn.apache.org/viewvc?rev=1369804&view=rev
Log:
HDFS-385. Backport: Add support for an experimental API that allows a module external to HDFS to specify how HDFS blocks should be placed. Contributed by Sumadhur Reddy Bolli
Added:
hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java
hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnsupportedActionException.java
Modified:
hadoop/common/branches/branch-1-win/CHANGES.txt
hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java
hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java
hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java
hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
Modified: hadoop/common/branches/branch-1-win/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.txt?rev=1369804&r1=1369803&r2=1369804&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1-win/CHANGES.txt Mon Aug 6 12:08:17 2012
@@ -1,5 +1,13 @@
Hadoop Change Log
+Release 1.2.0 - unreleased
+
+ NEW FEATURES
+
+ HDFS-385. Backport: Add support for an experimental API that allows a
+ module external to HDFS to specify how HDFS blocks should be placed.
+ (Sumadhur Reddy Bolli via szetszwo)
+
Release 1.1.0 - unreleased
NEW FEATURES
Modified: hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java?rev=1369804&r1=1369803&r2=1369804&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java (original)
+++ hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java Mon Aug 6 12:08:17 2012
@@ -546,7 +546,7 @@ public class NetworkTopology {
* @return number of available nodes
*/
public int countNumOfAvailableNodes(String scope,
- List<Node> excludedNodes) {
+ Collection<Node> excludedNodes) {
boolean isExcluded=false;
if (scope.startsWith("~")) {
isExcluded=true;
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1369804&r1=1369803&r2=1369804&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java Mon Aug 6 12:08:17 2012
@@ -25,6 +25,7 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.lang.Class;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -61,6 +62,9 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.fs.FileSystem;
@@ -786,18 +790,34 @@ public class Balancer implements Tool {
}
}
}
-
+
+ /*
+ * Check that this Balancer is compatible with the Block Placement Policy used
+ * by the Namenode.
+ */
+ private void checkReplicationPolicyCompatibility(Configuration conf)
+ throws UnsupportedActionException {
+ if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != BlockPlacementPolicyDefault.class) {
+ throw new UnsupportedActionException(
+ "Balancer without BlockPlacementPolicyDefault");
+ }
+ }
+
/** Default constructor */
- Balancer() {
+ Balancer() throws UnsupportedActionException {
+ checkReplicationPolicyCompatibility(getConf());
}
-
+
/** Construct a balancer from the given configuration */
- Balancer(Configuration conf) {
+ Balancer(Configuration conf) throws UnsupportedActionException {
+ checkReplicationPolicyCompatibility(conf);
setConf(conf);
- }
+ }
/** Construct a balancer from the given configuration and threshold */
- Balancer(Configuration conf, double threshold) {
+ Balancer(Configuration conf, double threshold)
+ throws UnsupportedActionException {
+ checkReplicationPolicyCompatibility(conf);
setConf(conf);
this.threshold = threshold;
}
Added: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java?rev=1369804&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java (added)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java Mon Aug 6 12:08:17 2012
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.util.ReflectionUtils;
+import java.util.*;
+
+/**
+ * This interface is used for choosing the desired number of targets
+ * for placing block replicas.
+ */
+@InterfaceAudience.Private
+public abstract class BlockPlacementPolicy {
+
+ @InterfaceAudience.Private
+ public static class NotEnoughReplicasException extends Exception {
+ private static final long serialVersionUID = 1L;
+ NotEnoughReplicasException(String msg) {
+ super(msg);
+ }
+ }
+
+ /**
+ * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
+ * to re-replicate a block with size <i>blocksize</i>
+ * If not, return as many as we can.
+ *
+ * @param srcPath the file to which this chooseTargets is being invoked.
+ * @param numOfReplicas additional number of replicas wanted.
+ * @param writer the writer's machine, null if not in the cluster.
+ * @param chosenNodes datanodes that have been chosen as targets.
+ * @param blocksize size of the data to be written.
+ * @return array of DatanodeDescriptor instances chosen as target
+ * and sorted as a pipeline.
+ */
+ abstract DatanodeDescriptor[] chooseTarget(String srcPath,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ List<DatanodeDescriptor> chosenNodes,
+ long blocksize);
+
+ /**
+ * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
+ * to re-replicate a block with size <i>blocksize</i>
+ * If not, return as many as we can.
+ *
+ * @param srcPath the file to which this chooseTargets is being invoked.
+ * @param numOfReplicas additional number of replicas wanted.
+ * @param writer the writer's machine, null if not in the cluster.
+ * @param chosenNodes datanodes that have been chosen as targets.
+ * @param excludedNodes: datanodes that should not be considered as targets.
+ * @param blocksize size of the data to be written.
+ * @return array of DatanodeDescriptor instances chosen as target
+ * and sorted as a pipeline.
+ */
+ public abstract DatanodeDescriptor[] chooseTarget(String srcPath,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ List<DatanodeDescriptor> chosenNodes,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize);
+
+ /**
+ * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
+ * If not, return as many as we can.
+ * The base implemenatation extracts the pathname of the file from the
+ * specified srcInode, but this could be a costly operation depending on the
+ * file system implementation. Concrete implementations of this class should
+ * override this method to avoid this overhead.
+ *
+ * @param srcInode The inode of the file for which chooseTarget is being invoked.
+ * @param numOfReplicas additional number of replicas wanted.
+ * @param writer the writer's machine, null if not in the cluster.
+ * @param chosenNodes datanodes that have been chosen as targets.
+ * @param blocksize size of the data to be written.
+ * @return array of DatanodeDescriptor instances chosen as target
+ * and sorted as a pipeline.
+ */
+ DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ List<DatanodeDescriptor> chosenNodes,
+ long blocksize) {
+ return chooseTarget(srcInode.getFullPathName(), numOfReplicas, writer,
+ chosenNodes, blocksize);
+ }
+
+ /**
+ * Verify that the block is replicated on at least minRacks different racks
+ * if there is more than minRacks rack in the system.
+ *
+ * @param srcPath the full pathname of the file to be verified
+ * @param lBlk block with locations
+ * @param minRacks number of racks the block should be replicated to
+ * @return the difference between the required and the actual number of racks
+ * the block is replicated to.
+ */
+ abstract public int verifyBlockPlacement(String srcPath,
+ LocatedBlock lBlk,
+ int minRacks);
+ /**
+ * Decide whether deleting the specified replica of the block still makes
+ * the block conform to the configured block placement policy.
+ *
+ * @param srcInode The inode of the file to which the block-to-be-deleted belongs
+ * @param block The block to be deleted
+ * @param replicationFactor The required number of replicas for this block
+ * @param existingReplicas The replica locations of this block that are present
+ on at least two unique racks.
+ * @param moreExistingReplicas Replica locations of this block that are not
+ listed in the previous parameter.
+ * @return the replica that is the best candidate for deletion
+ */
+ abstract public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo srcInode,
+ Block block,
+ short replicationFactor,
+ Collection<DatanodeDescriptor> existingReplicas,
+ Collection<DatanodeDescriptor> moreExistingReplicas);
+
+ /**
+ * Used to setup a BlockPlacementPolicy object. This should be defined by
+ * all implementations of a BlockPlacementPolicy.
+ *
+ * @param conf the configuration object
+ * @param stats retrieve cluster status from here
+ * @param clusterMap cluster topology
+ */
+ abstract protected void initialize(Configuration conf, FSClusterStats stats,
+ NetworkTopology clusterMap);
+
+ /**
+ * Get an instance of the configured Block Placement Policy based on the
+ * value of the configuration paramater dfs.block.replicator.classname.
+ *
+ * @param conf the configuration to be used
+ * @param stats an object that is used to retrieve the load on the cluster
+ * @param clusterMap the network topology of the cluster
+ * @return an instance of BlockPlacementPolicy
+ */
+ public static BlockPlacementPolicy getInstance(Configuration conf,
+ FSClusterStats stats,
+ NetworkTopology clusterMap) {
+ Class<? extends BlockPlacementPolicy> replicatorClass =
+ conf.getClass("dfs.block.replicator.classname",
+ BlockPlacementPolicyDefault.class,
+ BlockPlacementPolicy.class);
+ BlockPlacementPolicy replicator = (BlockPlacementPolicy) ReflectionUtils.newInstance(
+ replicatorClass, conf);
+ replicator.initialize(conf, stats, clusterMap);
+ return replicator;
+ }
+
+ /**
+ * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
+ * a block with size <i>blocksize</i>
+ * If not, return as many as we can.
+ *
+ * @param srcPath a string representation of the file for which chooseTarget is invoked
+ * @param numOfReplicas number of replicas wanted.
+ * @param writer the writer's machine, null if not in the cluster.
+ * @param blocksize size of the data to be written.
+ * @return array of DatanodeDescriptor instances chosen as targets
+ * and sorted as a pipeline.
+ */
+ DatanodeDescriptor[] chooseTarget(String srcPath,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ long blocksize) {
+ return chooseTarget(srcPath, numOfReplicas, writer,
+ new ArrayList<DatanodeDescriptor>(),
+ blocksize);
+ }
+
+ /**
+ * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
+ * a block with size <i>blocksize</i>
+ * If not, return as many as we can.
+ *
+ * @param srcPath a string representation of the file for which chooseTarget is invoked
+ * @param numOfReplicas number of replicas wanted.
+ * @param writer the writer's machine, null if not in the cluster.
+ * @param blocksize size of the data to be written.
+ * @param excludedNodes datanodes that should not be considered as targets.
+ * @return array of DatanodeDescriptor instances chosen as targets
+ * and sorted as a pipeline.
+ */
+ public DatanodeDescriptor[] chooseTarget(String srcPath,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize) {
+ return chooseTarget(srcPath, numOfReplicas, writer,
+ new ArrayList<DatanodeDescriptor>(),
+ excludedNodes,
+ blocksize);
+ }
+
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=1369804&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java (added)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java Mon Aug 6 12:08:17 2012
@@ -0,0 +1,516 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import java.util.*;
+
+/** The class is responsible for choosing the desired number of targets
+ * for placing block replicas.
+ * The replica placement strategy is that if the writer is on a datanode,
+ * the 1st replica is placed on the local machine,
+ * otherwise a random datanode. The 2nd replica is placed on a datanode
+ * that is on a different rack. The 3rd replica is placed on a datanode
+ * which is on a different node of the rack as the second replica.
+ */
+@InterfaceAudience.Private
+public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
+ private boolean considerLoad;
+ private NetworkTopology clusterMap;
+ private FSClusterStats stats;
+
+ BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
+ NetworkTopology clusterMap) {
+ initialize(conf, stats, clusterMap);
+ }
+
+ BlockPlacementPolicyDefault() {
+ }
+
+ /** {@inheritDoc} */
+ public void initialize(Configuration conf, FSClusterStats stats,
+ NetworkTopology clusterMap) {
+ this.considerLoad = conf.getBoolean("dfs.replication.considerLoad", true);
+ this.stats = stats;
+ this.clusterMap = clusterMap;
+ }
+
+ /** {@inheritDoc} */
+ public DatanodeDescriptor[] chooseTarget(String srcPath,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ List<DatanodeDescriptor> chosenNodes,
+ long blocksize) {
+ return chooseTarget(numOfReplicas, writer, chosenNodes, null, blocksize);
+ }
+
+ /** {@inheritDoc} */
+ public DatanodeDescriptor[] chooseTarget(String srcPath,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ List<DatanodeDescriptor> chosenNodes,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize) {
+ return chooseTarget(numOfReplicas, writer, chosenNodes, excludedNodes, blocksize);
+ }
+
+
+ /** {@inheritDoc} */
+ @Override
+ public DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ List<DatanodeDescriptor> chosenNodes,
+ long blocksize) {
+ return chooseTarget(numOfReplicas, writer, chosenNodes, null, blocksize);
+ }
+
+ /**
+ * This is not part of the public API but is used by the unit tests.
+ */
+ DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+ DatanodeDescriptor writer,
+ List<DatanodeDescriptor> chosenNodes,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize) {
+ if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
+ return new DatanodeDescriptor[0];
+ }
+
+ if (excludedNodes == null) {
+ excludedNodes = new HashMap<Node, Node>();
+ }
+
+ int clusterSize = clusterMap.getNumOfLeaves();
+ int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
+ if (totalNumOfReplicas > clusterSize) {
+ numOfReplicas -= (totalNumOfReplicas-clusterSize);
+ totalNumOfReplicas = clusterSize;
+ }
+
+ int maxNodesPerRack =
+ (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
+
+ List<DatanodeDescriptor> results =
+ new ArrayList<DatanodeDescriptor>(chosenNodes);
+ for (Node node:chosenNodes) {
+ excludedNodes.put(node, node);
+ }
+
+ if (!clusterMap.contains(writer)) {
+ writer=null;
+ }
+
+ DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
+ excludedNodes, blocksize, maxNodesPerRack, results);
+
+ results.removeAll(chosenNodes);
+
+ // sorting nodes to form a pipeline
+ return getPipeline((writer==null)?localNode:writer,
+ results.toArray(new DatanodeDescriptor[results.size()]));
+ }
+
+ /* choose <i>numOfReplicas</i> from all data nodes */
+ private DatanodeDescriptor chooseTarget(int numOfReplicas,
+ DatanodeDescriptor writer,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List<DatanodeDescriptor> results) {
+
+ if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
+ return writer;
+ }
+
+ int numOfResults = results.size();
+ boolean newBlock = (numOfResults==0);
+ if (writer == null && !newBlock) {
+ writer = results.get(0);
+ }
+
+ try {
+ if (numOfResults == 0) {
+ writer = chooseLocalNode(writer, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ if (--numOfReplicas == 0) {
+ return writer;
+ }
+ }
+ if (numOfResults <= 1) {
+ chooseRemoteRack(1, results.get(0), excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ if (--numOfReplicas == 0) {
+ return writer;
+ }
+ }
+ if (numOfResults <= 2) {
+ if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
+ chooseRemoteRack(1, results.get(0), excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ } else if (newBlock){
+ chooseLocalRack(results.get(1), excludedNodes, blocksize,
+ maxNodesPerRack, results);
+ } else {
+ chooseLocalRack(writer, excludedNodes, blocksize,
+ maxNodesPerRack, results);
+ }
+ if (--numOfReplicas == 0) {
+ return writer;
+ }
+ }
+ chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ } catch (NotEnoughReplicasException e) {
+ FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
+ + numOfReplicas);
+ }
+ return writer;
+ }
+
+ /* choose <i>localMachine</i> as the target.
+ * if <i>localMachine</i> is not available,
+ * choose a node on the same rack
+ * @return the chosen node
+ */
+ private DatanodeDescriptor chooseLocalNode(
+ DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List<DatanodeDescriptor> results)
+ throws NotEnoughReplicasException {
+ // if no local machine, randomly choose one node
+ if (localMachine == null)
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+
+ // otherwise try local machine first
+ Node oldNode = excludedNodes.put(localMachine, localMachine);
+ if (oldNode == null) { // was not in the excluded list
+ if (isGoodTarget(localMachine, blocksize,
+ maxNodesPerRack, false, results)) {
+ results.add(localMachine);
+ return localMachine;
+ }
+ }
+
+ // try a node on local rack
+ return chooseLocalRack(localMachine, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+
+ /* choose one node from the rack that <i>localMachine</i> is on.
+ * if no such node is available, choose one node from the rack where
+ * a second replica is on.
+ * if still no such node is available, choose a random node
+ * in the cluster.
+ * @return the chosen node
+ */
+ private DatanodeDescriptor chooseLocalRack(
+ DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List<DatanodeDescriptor> results)
+ throws NotEnoughReplicasException {
+ // no local machine, so choose a random machine
+ if (localMachine == null) {
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+
+ // choose one from the local rack
+ try {
+ return chooseRandom(
+ localMachine.getNetworkLocation(),
+ excludedNodes, blocksize, maxNodesPerRack, results);
+ } catch (NotEnoughReplicasException e1) {
+ // find the second replica
+ DatanodeDescriptor newLocal=null;
+ for(Iterator<DatanodeDescriptor> iter=results.iterator();
+ iter.hasNext();) {
+ DatanodeDescriptor nextNode = iter.next();
+ if (nextNode != localMachine) {
+ newLocal = nextNode;
+ break;
+ }
+ }
+ if (newLocal != null) {
+ try {
+ return chooseRandom(
+ newLocal.getNetworkLocation(),
+ excludedNodes, blocksize, maxNodesPerRack, results);
+ } catch(NotEnoughReplicasException e2) {
+ //otherwise randomly choose one from the network
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+ } else {
+ //otherwise randomly choose one from the network
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
+ blocksize, maxNodesPerRack, results);
+ }
+ }
+ }
+
+ /* choose <i>numOfReplicas</i> nodes from the racks
+ * that <i>localMachine</i> is NOT on.
+ * if not enough nodes are available, choose the remaining ones
+ * from the local rack
+ */
+
+ private void chooseRemoteRack(int numOfReplicas,
+ DatanodeDescriptor localMachine,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize,
+ int maxReplicasPerRack,
+ List<DatanodeDescriptor> results)
+ throws NotEnoughReplicasException {
+ int oldNumOfReplicas = results.size();
+ // randomly choose one node from remote racks
+ try {
+ chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
+ excludedNodes, blocksize, maxReplicasPerRack, results);
+ } catch (NotEnoughReplicasException e) {
+ chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
+ localMachine.getNetworkLocation(), excludedNodes, blocksize,
+ maxReplicasPerRack, results);
+ }
+ }
+
+ /* Randomly choose one target from <i>nodes</i>.
+ * @return the chosen node
+ */
+ private DatanodeDescriptor chooseRandom(
+ String nodes,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List<DatanodeDescriptor> results)
+ throws NotEnoughReplicasException {
+ int numOfAvailableNodes =
+ clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
+ while(numOfAvailableNodes > 0) {
+ DatanodeDescriptor chosenNode =
+ (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
+
+ Node oldNode = excludedNodes.put(chosenNode, chosenNode);
+ if (oldNode == null) { // choosendNode was not in the excluded list
+ numOfAvailableNodes--;
+ if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+ results.add(chosenNode);
+ return chosenNode;
+ }
+ }
+ }
+
+ throw new NotEnoughReplicasException(
+ "Not able to place enough replicas");
+ }
+
+ /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
+ */
+ private void chooseRandom(int numOfReplicas,
+ String nodes,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize,
+ int maxNodesPerRack,
+ List<DatanodeDescriptor> results)
+ throws NotEnoughReplicasException {
+
+ int numOfAvailableNodes =
+ clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
+ while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
+ DatanodeDescriptor chosenNode =
+ (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
+ Node oldNode = excludedNodes.put(chosenNode, chosenNode);
+ if (oldNode == null) {
+ numOfAvailableNodes--;
+
+ if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+ numOfReplicas--;
+ results.add(chosenNode);
+ }
+ }
+ }
+
+ if (numOfReplicas>0) {
+ throw new NotEnoughReplicasException(
+ "Not able to place enough replicas");
+ }
+ }
+
+ /* judge if a node is a good target.
+ * return true if <i>node</i> has enough space,
+ * does not have too much load, and the rack does not have too many nodes
+ */
+ private boolean isGoodTarget(DatanodeDescriptor node,
+ long blockSize, int maxTargetPerLoc,
+ List<DatanodeDescriptor> results) {
+ return isGoodTarget(node, blockSize, maxTargetPerLoc,
+ this.considerLoad, results);
+ }
+
+ private boolean isGoodTarget(DatanodeDescriptor node,
+ long blockSize, int maxTargetPerLoc,
+ boolean considerLoad,
+ List<DatanodeDescriptor> results) {
+ Log logr = FSNamesystem.LOG;
+ // check if the node is (being) decommissed
+ if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+ logr.debug("Node "+NodeBase.getPath(node)+
+ " is not chosen because the node is (being) decommissioned");
+ return false;
+ }
+
+ long remaining = node.getRemaining() -
+ (node.getBlocksScheduled() * blockSize);
+ // check the remaining capacity of the target machine
+ if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
+ logr.debug("Node "+NodeBase.getPath(node)+
+ " is not chosen because the node does not have enough space");
+ return false;
+ }
+
+ // check the communication traffic of the target machine
+ if (considerLoad) {
+ double avgLoad = 0;
+ int size = clusterMap.getNumOfLeaves();
+ if (size != 0 && stats != null) {
+ avgLoad = (double)stats.getTotalLoad()/size;
+ }
+ if (node.getXceiverCount() > (2.0 * avgLoad)) {
+ logr.debug("Node "+NodeBase.getPath(node)+
+ " is not chosen because the node is too busy");
+ return false;
+ }
+ }
+
+ // check if the target rack has chosen too many nodes
+ String rackname = node.getNetworkLocation();
+ int counter=1;
+ for(Iterator<DatanodeDescriptor> iter = results.iterator();
+ iter.hasNext();) {
+ Node result = iter.next();
+ if (rackname.equals(result.getNetworkLocation())) {
+ counter++;
+ }
+ }
+ if (counter>maxTargetPerLoc) {
+ logr.debug("Node "+NodeBase.getPath(node)+
+ " is not chosen because the rack has too many chosen nodes");
+ return false;
+ }
+ return true;
+ }
+
+ /* Return a pipeline of nodes.
+ * The pipeline is formed finding a shortest path that
+ * starts from the writer and traverses all <i>nodes</i>
+ * This is basically a traveling salesman problem.
+ */
+ private DatanodeDescriptor[] getPipeline(
+ DatanodeDescriptor writer,
+ DatanodeDescriptor[] nodes) {
+ if (nodes.length==0) return nodes;
+
+ synchronized(clusterMap) {
+ int index=0;
+ if (writer == null || !clusterMap.contains(writer)) {
+ writer = nodes[0];
+ }
+ for(;index<nodes.length; index++) {
+ DatanodeDescriptor shortestNode = nodes[index];
+ int shortestDistance = clusterMap.getDistance(writer, shortestNode);
+ int shortestIndex = index;
+ for(int i=index+1; i<nodes.length; i++) {
+ DatanodeDescriptor currentNode = nodes[i];
+ int currentDistance = clusterMap.getDistance(writer, currentNode);
+ if (shortestDistance>currentDistance) {
+ shortestDistance = currentDistance;
+ shortestNode = currentNode;
+ shortestIndex = i;
+ }
+ }
+ //switch position index & shortestIndex
+ if (index != shortestIndex) {
+ nodes[shortestIndex] = nodes[index];
+ nodes[index] = shortestNode;
+ }
+ writer = shortestNode;
+ }
+ }
+ return nodes;
+ }
+
+ /** {@inheritDoc} */
+ public int verifyBlockPlacement(String srcPath,
+ LocatedBlock lBlk,
+ int minRacks) {
+ DatanodeInfo[] locs = lBlk.getLocations();
+ if (locs == null)
+ locs = new DatanodeInfo[0];
+ int numRacks = clusterMap.getNumOfRacks();
+ if(numRacks <= 1) // only one rack
+ return 0;
+ minRacks = Math.min(minRacks, numRacks);
+ // 1. Check that all locations are different.
+ // 2. Count locations on different racks.
+ Set<String> racks = new TreeSet<String>();
+ for (DatanodeInfo dn : locs)
+ racks.add(dn.getNetworkLocation());
+ return minRacks - racks.size();
+ }
+
+ /** {@inheritDoc} */
+ public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode,
+ Block block,
+ short replicationFactor,
+ Collection<DatanodeDescriptor> first,
+ Collection<DatanodeDescriptor> second) {
+ long minSpace = Long.MAX_VALUE;
+ DatanodeDescriptor cur = null;
+
+ // pick replica from the first Set. If first is empty, then pick replicas
+ // from second set.
+ Iterator<DatanodeDescriptor> iter =
+ first.isEmpty() ? second.iterator() : first.iterator();
+
+ // pick node with least free space
+ while (iter.hasNext() ) {
+ DatanodeDescriptor node = iter.next();
+ long free = node.getRemaining();
+ if (minSpace > free) {
+ minSpace = free;
+ cur = node;
+ }
+ }
+ return cur;
+ }
+}
+
Added: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1369804&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (added)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Mon Aug 6 12:08:17 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+/**
+ * This interface is used for retrieving the load related statistics of
+ * the cluster.
+ */
+public interface FSClusterStats {
+
+ /**
+ * an indication of the total load of the cluster.
+ *
+ * @return a count of the total number of block transfers and block
+ * writes that are currently occuring on the cluster.
+ */
+
+ public int getTotalLoad() ;
+}
+
\ No newline at end of file
Added: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java?rev=1369804&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java (added)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java Mon Aug 6 12:08:17 2012
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+/**
+ * This interface is used used the pluggable block placement policy
+ * to expose a few characteristics of an Inode.
+ */
+public interface FSInodeInfo {
+
+ /**
+ * a string representation of an inode
+ *
+ * @return the full pathname (from root) that this inode represents
+ */
+
+ public String getFullPathName() ;
+}
+
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1369804&r1=1369803&r2=1369804&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Aug 6 12:08:17 2012
@@ -113,6 +113,7 @@ import org.apache.hadoop.net.CachedDNSTo
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
@@ -140,7 +141,7 @@ import org.mortbay.util.ajax.JSON;
* 4) machine --> blocklist (inverted #2)
* 5) LRU cache of updated-heartbeat machines
***************************************************/
-public class FSNamesystem implements FSConstants, FSNamesystemMBean,
+public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterStats,
NameNodeMXBean, MetricsSource {
public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
public static final String AUDIT_FORMAT =
@@ -335,7 +336,7 @@ public class FSNamesystem implements FSC
private DNSToSwitchMapping dnsToSwitchMapping;
// for block replicas placement
- ReplicationTargetChooser replicator;
+ BlockPlacementPolicy replicator;
private HostsFileReader hostsReader;
private Daemon dnthread = null;
@@ -482,12 +483,9 @@ public class FSNamesystem implements FSC
short filePermission = (short)conf.getInt("dfs.upgrade.permission", 0777);
this.defaultPermission = PermissionStatus.createImmutable(
fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
-
-
- this.replicator = new ReplicationTargetChooser(
- conf.getBoolean("dfs.replication.considerLoad", true),
- this,
- clusterMap);
+
+ this.replicator = BlockPlacementPolicy.getInstance(conf, this, clusterMap);
+
this.defaultReplication = conf.getInt("dfs.replication", 3);
this.maxReplication = conf.getInt("dfs.replication.max", 512);
this.minReplication = conf.getInt("dfs.replication.min", 1);
@@ -1521,7 +1519,7 @@ public class FSNamesystem implements FSC
*/
public LocatedBlock getAdditionalBlock(String src,
String clientName,
- List<Node> excludedNodes
+ HashMap<Node, Node> excludedNodes
) throws IOException {
long fileLength, blockSize;
int replication;
@@ -1550,7 +1548,8 @@ public class FSNamesystem implements FSC
}
// choose targets for the new block tobe allocated.
- DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
+ DatanodeDescriptor targets[] = replicator.chooseTarget(src,
+ replication,
clientNode,
excludedNodes,
blockSize);
@@ -2970,10 +2969,11 @@ public class FSNamesystem implements FSC
List<DatanodeDescriptor> containingNodes;
DatanodeDescriptor srcNode;
+ INodeFile fileINode = null;
synchronized (this) {
synchronized (neededReplications) {
// block should belong to a file
- INodeFile fileINode = blocksMap.getINode(block);
+ fileINode = blocksMap.getINode(block);
// abandoned block or block reopened for append
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
@@ -3008,9 +3008,11 @@ public class FSNamesystem implements FSC
}
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
- DatanodeDescriptor targets[] = replicator.chooseTarget(
+ // It is costly to extract the filename for which chooseTargets is called,
+ // so for now we pass in the Inode itself.
+ DatanodeDescriptor targets[] = replicator.chooseTarget(fileINode,
requiredReplication - numEffectiveReplicas,
- srcNode, containingNodes, null, block.getNumBytes());
+ srcNode, containingNodes, block.getNumBytes());
if(targets.length == 0)
return false;
@@ -3018,7 +3020,7 @@ public class FSNamesystem implements FSC
synchronized (neededReplications) {
// Recheck since global lock was released
// block should belong to a file
- INodeFile fileINode = blocksMap.getINode(block);
+ fileINode = blocksMap.getINode(block);
// abandoned block or block reopened for append
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
@@ -3857,6 +3859,8 @@ public class FSNamesystem implements FSC
Block b, short replication,
DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint) {
+ INodeFile inode = blocksMap.getINode(b);
+
// first form a rack to datanodes map and
HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
new HashMap<String, ArrayList<DatanodeDescriptor>>();
@@ -3894,24 +3898,13 @@ public class FSNamesystem implements FSC
boolean firstOne = true;
while (nonExcess.size() - replication > 0) {
DatanodeInfo cur = null;
- long minSpace = Long.MAX_VALUE;
// check if we can del delNodeHint
if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) &&
(priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) {
cur = delNodeHint;
} else { // regular excessive replica removal
- Iterator<DatanodeDescriptor> iter =
- priSet.isEmpty() ? remains.iterator() : priSet.iterator();
- while( iter.hasNext() ) {
- DatanodeDescriptor node = iter.next();
- long free = node.getRemaining();
-
- if (minSpace > free) {
- minSpace = free;
- cur = node;
- }
- }
+ cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, remains);
}
firstOne = false;
@@ -4644,7 +4637,7 @@ public class FSNamesystem implements FSC
}
public DatanodeDescriptor getRandomDatanode() {
- return replicator.chooseTarget(1, null, null, 0)[0];
+ return (DatanodeDescriptor)clusterMap.chooseRandom(NodeBase.ROOT);
}
/**
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1369804&r1=1369803&r2=1369804&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java Mon Aug 6 12:08:17 2012
@@ -33,7 +33,7 @@ import org.apache.hadoop.hdfs.protocol.L
* This is a base INode class containing common fields for file and
* directory inodes.
*/
-abstract class INode implements Comparable<byte[]> {
+abstract class INode implements Comparable<byte[]>, FSInodeInfo {
protected byte[] name;
protected INodeDirectory parent;
protected long modificationTime;
@@ -247,6 +247,12 @@ abstract class INode implements Comparab
}
/** {@inheritDoc} */
+ public String getFullPathName() {
+ // Get the full path name of this inode.
+ return FSDirectory.getFullPathName(this);
+ }
+
+ /** {@inheritDoc} */
public String toString() {
return "\"" + getLocalName() + "\":" + getPermissionStatus();
}
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1369804&r1=1369803&r2=1369804&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Mon Aug 6 12:08:17 2012
@@ -682,19 +682,18 @@ public class NameNode implements ClientP
String clientName,
DatanodeInfo[] excludedNodes)
throws IOException {
-
- List<Node> excludedNodeList = null;
+ HashMap<Node, Node> excludedNodesSet = null;
if (excludedNodes != null) {
- // We must copy here, since this list gets modified later on
- // in ReplicationTargetChooser
- excludedNodeList = new ArrayList<Node>(
- Arrays.<Node>asList(excludedNodes));
+ excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
+ for (Node node:excludedNodes) {
+ excludedNodesSet.put(node, node);
+ }
}
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
+src+" for "+clientName);
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(
- src, clientName, excludedNodeList);
+ src, clientName, excludedNodesSet);
if (locatedBlock != null)
myMetrics.incrNumAddBlockOps();
return locatedBlock;
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1369804&r1=1369803&r2=1369804&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Mon Aug 6 12:08:17 2012
@@ -283,8 +283,8 @@ public class NamenodeFsck {
locs.length + " replica(s).");
}
// verify block placement policy
- int missingRacks = ReplicationTargetChooser.verifyBlockPlacement(
- lBlk, targetFileReplication, networktopology);
+ int missingRacks = BlockPlacementPolicy.getInstance(conf, null, networktopology).
+ verifyBlockPlacement(path, lBlk, Math.min(2,targetFileReplication));
if (missingRacks > 0) {
res.numMisReplicatedBlocks++;
misReplicatedPerFile++;
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java?rev=1369804&r1=1369803&r2=1369804&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java Mon Aug 6 12:08:17 2012
@@ -1,528 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
-import java.util.*;
-
-/** The class is responsible for choosing the desired number of targets
- * for placing block replicas.
- * The replica placement strategy is that if the writer is on a datanode,
- * the 1st replica is placed on the local machine,
- * otherwise a random datanode. The 2nd replica is placed on a datanode
- * that is on a different rack. The 3rd replica is placed on a datanode
- * which is on the same rack as the second replica.
- */
-class ReplicationTargetChooser {
- private final boolean considerLoad;
- private NetworkTopology clusterMap;
- private FSNamesystem fs;
-
- ReplicationTargetChooser(boolean considerLoad, FSNamesystem fs,
- NetworkTopology clusterMap) {
- this.considerLoad = considerLoad;
- this.fs = fs;
- this.clusterMap = clusterMap;
- }
-
- private static class NotEnoughReplicasException extends Exception {
- NotEnoughReplicasException(String msg) {
- super(msg);
- }
- }
-
- /**
- * choose <i>numOfReplicas</i> data nodes for <i>writer</i> to replicate
- * a block with size <i>blocksize</i>
- * If not, return as many as we can.
- *
- * @param numOfReplicas: number of replicas wanted.
- * @param writer: the writer's machine, null if not in the cluster.
- * @param excludedNodes: datanodesthat should not be considered targets.
- * @param blocksize: size of the data to be written.
- * @return array of DatanodeDescriptor instances chosen as targets
- * and sorted as a pipeline.
- */
- DatanodeDescriptor[] chooseTarget(int numOfReplicas,
- DatanodeDescriptor writer,
- List<Node> excludedNodes,
- long blocksize) {
- if (excludedNodes == null) {
- excludedNodes = new ArrayList<Node>();
- }
-
- return chooseTarget(numOfReplicas, writer,
- new ArrayList<DatanodeDescriptor>(), excludedNodes, blocksize);
- }
-
- /**
- * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
- * to re-replicate a block with size <i>blocksize</i>
- * If not, return as many as we can.
- *
- * @param numOfReplicas: additional number of replicas wanted.
- * @param writer: the writer's machine, null if not in the cluster.
- * @param choosenNodes: datanodes that have been choosen as targets.
- * @param excludedNodes: datanodesthat should not be considered targets.
- * @param blocksize: size of the data to be written.
- * @return array of DatanodeDescriptor instances chosen as target
- * and sorted as a pipeline.
- */
- DatanodeDescriptor[] chooseTarget(int numOfReplicas,
- DatanodeDescriptor writer,
- List<DatanodeDescriptor> choosenNodes,
- List<Node> excludedNodes,
- long blocksize) {
- if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
- return new DatanodeDescriptor[0];
- }
-
- if (excludedNodes == null) {
- excludedNodes = new ArrayList<Node>();
- }
-
- int clusterSize = clusterMap.getNumOfLeaves();
- int totalNumOfReplicas = choosenNodes.size()+numOfReplicas;
- if (totalNumOfReplicas > clusterSize) {
- numOfReplicas -= (totalNumOfReplicas-clusterSize);
- totalNumOfReplicas = clusterSize;
- }
-
- int maxNodesPerRack =
- (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
-
- List<DatanodeDescriptor> results =
- new ArrayList<DatanodeDescriptor>(choosenNodes);
- excludedNodes.addAll(choosenNodes);
-
- if (!clusterMap.contains(writer)) {
- writer=null;
- }
-
- DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
- excludedNodes, blocksize, maxNodesPerRack, results);
-
- results.removeAll(choosenNodes);
-
- // sorting nodes to form a pipeline
- return getPipeline((writer==null)?localNode:writer,
- results.toArray(new DatanodeDescriptor[results.size()]));
- }
-
- /* choose <i>numOfReplicas</i> from all data nodes */
- private DatanodeDescriptor chooseTarget(int numOfReplicas,
- DatanodeDescriptor writer,
- List<Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results) {
-
- if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
- return writer;
- }
-
- int numOfResults = results.size();
- boolean newBlock = (numOfResults==0);
- if (writer == null && !newBlock) {
- writer = (DatanodeDescriptor)results.get(0);
- }
-
- try {
- switch(numOfResults) {
- case 0:
- writer = chooseLocalNode(writer, excludedNodes,
- blocksize, maxNodesPerRack, results);
- if (--numOfReplicas == 0) {
- break;
- }
- case 1:
- chooseRemoteRack(1, results.get(0), excludedNodes,
- blocksize, maxNodesPerRack, results);
- if (--numOfReplicas == 0) {
- break;
- }
- case 2:
- if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
- chooseRemoteRack(1, results.get(0), excludedNodes,
- blocksize, maxNodesPerRack, results);
- } else if (newBlock){
- chooseLocalRack(results.get(1), excludedNodes, blocksize,
- maxNodesPerRack, results);
- } else {
- chooseLocalRack(writer, excludedNodes, blocksize,
- maxNodesPerRack, results);
- }
- if (--numOfReplicas == 0) {
- break;
- }
- default:
- chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
- }
- } catch (NotEnoughReplicasException e) {
- FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
- + numOfReplicas);
- }
- return writer;
- }
-
- /* choose <i>localMachine</i> as the target.
- * if <i>localMachine</i> is not availabe,
- * choose a node on the same rack
- * @return the choosen node
- */
- private DatanodeDescriptor chooseLocalNode(
- DatanodeDescriptor localMachine,
- List<Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
- // if no local machine, randomly choose one node
- if (localMachine == null)
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
-
- // otherwise try local machine first
- if (!excludedNodes.contains(localMachine)) {
- excludedNodes.add(localMachine);
- if (isGoodTarget(localMachine, blocksize,
- maxNodesPerRack, false, results)) {
- results.add(localMachine);
- return localMachine;
- }
- }
-
- // try a node on local rack
- return chooseLocalRack(localMachine, excludedNodes,
- blocksize, maxNodesPerRack, results);
- }
-
- /* choose one node from the rack that <i>localMachine</i> is on.
- * if no such node is availabe, choose one node from the rack where
- * a second replica is on.
- * if still no such node is available, choose a random node
- * in the cluster.
- * @return the choosen node
- */
- private DatanodeDescriptor chooseLocalRack(
- DatanodeDescriptor localMachine,
- List<Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
- // no local machine, so choose a random machine
- if (localMachine == null) {
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
- }
-
- // choose one from the local rack
- try {
- return chooseRandom(
- localMachine.getNetworkLocation(),
- excludedNodes, blocksize, maxNodesPerRack, results);
- } catch (NotEnoughReplicasException e1) {
- // find the second replica
- DatanodeDescriptor newLocal=null;
- for(Iterator<DatanodeDescriptor> iter=results.iterator();
- iter.hasNext();) {
- DatanodeDescriptor nextNode = iter.next();
- if (nextNode != localMachine) {
- newLocal = nextNode;
- break;
- }
- }
- if (newLocal != null) {
- try {
- return chooseRandom(
- newLocal.getNetworkLocation(),
- excludedNodes, blocksize, maxNodesPerRack, results);
- } catch(NotEnoughReplicasException e2) {
- //otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
- }
- } else {
- //otherwise randomly choose one from the network
- return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results);
- }
- }
- }
-
- /* choose <i>numOfReplicas</i> nodes from the racks
- * that <i>localMachine</i> is NOT on.
- * if not enough nodes are availabe, choose the remaining ones
- * from the local rack
- */
-
- private void chooseRemoteRack(int numOfReplicas,
- DatanodeDescriptor localMachine,
- List<Node> excludedNodes,
- long blocksize,
- int maxReplicasPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
- int oldNumOfReplicas = results.size();
- // randomly choose one node from remote racks
- try {
- chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
- excludedNodes, blocksize, maxReplicasPerRack, results);
- } catch (NotEnoughReplicasException e) {
- chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
- localMachine.getNetworkLocation(), excludedNodes, blocksize,
- maxReplicasPerRack, results);
- }
- }
-
- /* Randomly choose one target from <i>nodes</i>.
- * @return the choosen node
- */
- private DatanodeDescriptor chooseRandom(
- String nodes,
- List<Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
- DatanodeDescriptor result;
- do {
- DatanodeDescriptor[] selectedNodes =
- chooseRandom(1, nodes, excludedNodes);
- if (selectedNodes.length == 0) {
- throw new NotEnoughReplicasException(
- "Not able to place enough replicas");
- }
- result = (DatanodeDescriptor)(selectedNodes[0]);
- } while(!isGoodTarget(result, blocksize, maxNodesPerRack, results));
- results.add(result);
- return result;
- }
-
- /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
- */
- private void chooseRandom(int numOfReplicas,
- String nodes,
- List<Node> excludedNodes,
- long blocksize,
- int maxNodesPerRack,
- List<DatanodeDescriptor> results)
- throws NotEnoughReplicasException {
- boolean toContinue = true;
- do {
- DatanodeDescriptor[] selectedNodes =
- chooseRandom(numOfReplicas, nodes, excludedNodes);
- if (selectedNodes.length < numOfReplicas) {
- toContinue = false;
- }
- for(int i=0; i<selectedNodes.length; i++) {
- DatanodeDescriptor result = selectedNodes[i];
- if (isGoodTarget(result, blocksize, maxNodesPerRack, results)) {
- numOfReplicas--;
- results.add(result);
- }
- } // end of for
- } while (numOfReplicas>0 && toContinue);
-
- if (numOfReplicas>0) {
- throw new NotEnoughReplicasException(
- "Not able to place enough replicas");
- }
- }
-
- /* Randomly choose <i>numOfNodes</i> nodes from <i>scope</i>.
- * @return the choosen nodes
- */
- private DatanodeDescriptor[] chooseRandom(int numOfReplicas,
- String nodes,
- List<Node> excludedNodes) {
- List<DatanodeDescriptor> results =
- new ArrayList<DatanodeDescriptor>();
- int numOfAvailableNodes =
- clusterMap.countNumOfAvailableNodes(nodes, excludedNodes);
- numOfReplicas = (numOfAvailableNodes<numOfReplicas)?
- numOfAvailableNodes:numOfReplicas;
- while(numOfReplicas > 0) {
- DatanodeDescriptor choosenNode =
- (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
- if (!excludedNodes.contains(choosenNode)) {
- results.add(choosenNode);
- excludedNodes.add(choosenNode);
- numOfReplicas--;
- }
- }
- return (DatanodeDescriptor[])results.toArray(
- new DatanodeDescriptor[results.size()]);
- }
-
- /* judge if a node is a good target.
- * return true if <i>node</i> has enough space,
- * does not have too much load, and the rack does not have too many nodes
- */
- private boolean isGoodTarget(DatanodeDescriptor node,
- long blockSize, int maxTargetPerLoc,
- List<DatanodeDescriptor> results) {
- return isGoodTarget(node, blockSize, maxTargetPerLoc,
- this.considerLoad, results);
- }
-
- private boolean isGoodTarget(DatanodeDescriptor node,
- long blockSize, int maxTargetPerLoc,
- boolean considerLoad,
- List<DatanodeDescriptor> results) {
- Log logr = FSNamesystem.LOG;
- // check if the node is (being) decommissed
- if (node.isDecommissionInProgress() || node.isDecommissioned()) {
- logr.debug("Node "+NodeBase.getPath(node)+
- " is not chosen because the node is (being) decommissioned");
- return false;
- }
-
- long remaining = node.getRemaining() -
- (node.getBlocksScheduled() * blockSize);
- // check the remaining capacity of the target machine
- if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
- logr.debug("Node "+NodeBase.getPath(node)+
- " is not chosen because the node does not have enough space");
- return false;
- }
-
- // check the communication traffic of the target machine
- if (considerLoad) {
- double avgLoad = 0;
- int size = clusterMap.getNumOfLeaves();
- if (size != 0) {
- avgLoad = (double)fs.getTotalLoad()/size;
- }
- if (node.getXceiverCount() > (2.0 * avgLoad)) {
- logr.debug("Node "+NodeBase.getPath(node)+
- " is not chosen because the node is too busy");
- return false;
- }
- }
-
- // check if the target rack has chosen too many nodes
- String rackname = node.getNetworkLocation();
- int counter=1;
- for(Iterator<DatanodeDescriptor> iter = results.iterator();
- iter.hasNext();) {
- Node result = iter.next();
- if (rackname.equals(result.getNetworkLocation())) {
- counter++;
- }
- }
- if (counter>maxTargetPerLoc) {
- logr.debug("Node "+NodeBase.getPath(node)+
- " is not chosen because the rack has too many chosen nodes");
- return false;
- }
- return true;
- }
-
- /* Return a pipeline of nodes.
- * The pipeline is formed finding a shortest path that
- * starts from the writer and tranverses all <i>nodes</i>
- * This is basically a traveling salesman problem.
- */
- private DatanodeDescriptor[] getPipeline(
- DatanodeDescriptor writer,
- DatanodeDescriptor[] nodes) {
- if (nodes.length==0) return nodes;
-
- synchronized(clusterMap) {
- int index=0;
- if (writer == null || !clusterMap.contains(writer)) {
- writer = nodes[0];
- }
- for(;index<nodes.length; index++) {
- DatanodeDescriptor shortestNode = nodes[index];
- int shortestDistance = clusterMap.getDistance(writer, shortestNode);
- int shortestIndex = index;
- for(int i=index+1; i<nodes.length; i++) {
- DatanodeDescriptor currentNode = nodes[i];
- int currentDistance = clusterMap.getDistance(writer, currentNode);
- if (shortestDistance>currentDistance) {
- shortestDistance = currentDistance;
- shortestNode = currentNode;
- shortestIndex = i;
- }
- }
- //switch position index & shortestIndex
- if (index != shortestIndex) {
- nodes[shortestIndex] = nodes[index];
- nodes[index] = shortestNode;
- }
- writer = shortestNode;
- }
- }
- return nodes;
- }
-
- /**
- * Verify that the block is replicated on at least 2 different racks
- * if there is more than one rack in the system.
- *
- * @param lBlk block with locations
- * @param cluster
- * @return 1 if the block must be relicated on additional rack,
- * or 0 if the number of racks is sufficient.
- */
- public static int verifyBlockPlacement(LocatedBlock lBlk,
- short replication,
- NetworkTopology cluster) {
- int numRacks = verifyBlockPlacement(lBlk, Math.min(2,replication), cluster);
- return numRacks < 0 ? 0 : numRacks;
- }
-
- /**
- * Verify that the block is replicated on at least minRacks different racks
- * if there is more than minRacks rack in the system.
- *
- * @param lBlk block with locations
- * @param minRacks number of racks the block should be replicated to
- * @param cluster
- * @return the difference between the required and the actual number of racks
- * the block is replicated to.
- */
- public static int verifyBlockPlacement(LocatedBlock lBlk,
- int minRacks,
- NetworkTopology cluster) {
- DatanodeInfo[] locs = lBlk.getLocations();
- if (locs == null)
- locs = new DatanodeInfo[0];
- int numRacks = cluster.getNumOfRacks();
- if(numRacks <= 1) // only one rack
- return 0;
- minRacks = Math.min(minRacks, numRacks);
- // 1. Check that all locations are different.
- // 2. Count locations on different racks.
- Set<String> racks = new TreeSet<String>();
- for (DatanodeInfo dn : locs)
- racks.add(dn.getNetworkLocation());
- return minRacks - racks.size();
- }
-} //end of Replicator
-
Added: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnsupportedActionException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnsupportedActionException.java?rev=1369804&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnsupportedActionException.java (added)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/UnsupportedActionException.java Mon Aug 6 12:08:17 2012
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This exception is thrown when an operation is not supported.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class UnsupportedActionException extends IOException {
+ /** for java.io.Serializable */
+ private static final long serialVersionUID = 1L;
+
+ public UnsupportedActionException(String msg) {
+ super(msg);
+ }
+}