You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/06/14 23:53:07 UTC
svn commit: r547419 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/net/
Author: cutting
Date: Thu Jun 14 14:53:06 2007
New Revision: 547419
URL: http://svn.apache.org/viewvc?view=rev&rev=547419
Log:
HADOOP-1269. Finer grained locking in HDFS namenode. Contributed by Dhruba.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=547419&r1=547418&r2=547419
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jun 14 14:53:06 2007
@@ -120,6 +120,9 @@
38. HADOOP-1139. Log HDFS block transitions at INFO level, to better
enable diagnosis of problems. (Dhruba Borthakur via cutting)
+ 39. HADOOP-1269. Finer grained locking in HDFS namenode.
+ (Dhruba Borthakur via cutting)
+
Release 0.13.0 - 2007-06-08
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=547419&r1=547418&r2=547419
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Jun 14 14:53:06 2007
@@ -1273,9 +1273,10 @@
" seconds");
}
try {
- LOG.debug("NotReplicatedYetException sleeping " + src +
+ LOG.warn("NotReplicatedYetException sleeping " + src +
" retries left " + retries);
Thread.sleep(sleeptime);
+ sleeptime *= 2;
} catch (InterruptedException ie) {
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=547419&r1=547418&r2=547419
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu Jun 14 14:53:06 2007
@@ -289,22 +289,6 @@
}
}
String errorMsg = null;
- // verify build version
- if (!nsInfo.getBuildVersion().equals(Storage.getBuildVersion())) {
- errorMsg = "Incompatible build versions: namenode BV = "
- + nsInfo.getBuildVersion() + "; datanode BV = "
- + Storage.getBuildVersion();
- LOG.fatal(errorMsg);
- try {
- namenode.errorReport(dnRegistration,
- DatanodeProtocol.NOTIFY, errorMsg);
- } catch(SocketTimeoutException e) { // namenode is busy
- LOG.info("Problem connecting to server: " + getNameNodeAddr());
- }
- throw new IOException(errorMsg);
- }
- assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
- "Data-node and name-node layout versions must be the same.";
return nsInfo;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=547419&r1=547418&r2=547419
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Jun 14 14:53:06 2007
@@ -429,28 +429,39 @@
* @see ClientProtocol#open(String, long, long)
* @see ClientProtocol#getBlockLocations(String, long, long)
*/
- synchronized LocatedBlocks getBlockLocations(String clientMachine,
- String src,
- long offset,
- long length
- ) throws IOException {
+ LocatedBlocks getBlockLocations(String clientMachine,
+ String src,
+ long offset,
+ long length
+ ) throws IOException {
if (offset < 0) {
throw new IOException("Negative offset is not supported. File: " + src );
}
if (length < 0) {
throw new IOException("Negative length is not supported. File: " + src );
}
- return getBlockLocations(clientMachine,
- dir.getFileINode(src),
- offset, length, Integer.MAX_VALUE);
+
+ DatanodeDescriptor client = null;
+ LocatedBlocks blocks = getBlockLocations(dir.getFileINode(src),
+ offset, length,
+ Integer.MAX_VALUE);
+ if (blocks == null) {
+ return null;
+ }
+ client = host2DataNodeMap.getDatanodeByHost(clientMachine);
+ for (Iterator<LocatedBlock> it = blocks.getLocatedBlocks().iterator();
+ it.hasNext();) {
+ LocatedBlock block = (LocatedBlock) it.next();
+ clusterMap.sortByDistance(client,
+ (DatanodeDescriptor[])(block.getLocations()));
+ }
+ return blocks;
}
- private LocatedBlocks getBlockLocations(String clientMachine,
- FSDirectory.INode inode,
- long offset,
- long length,
- int nrBlocksToReturn
- ) throws IOException {
+ private synchronized LocatedBlocks getBlockLocations(FSDirectory.INode inode,
+ long offset,
+ long length,
+ int nrBlocksToReturn) {
if(inode == null || inode.isDir()) {
return null;
}
@@ -479,8 +490,6 @@
long endOff = offset + length;
- DatanodeDescriptor client;
- client = host2DataNodeMap.getDatanodeByHost(clientMachine);
do {
// get block locations
int numNodes = blocksMap.numNodes(blocks[curBlk]);
@@ -491,7 +500,6 @@
blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
machineSet[numNodes++] = it.next();
}
- clusterMap.sortByDistance(client, machineSet);
}
results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos));
curPos += blocks[curBlk].getNumBytes();
@@ -585,7 +593,54 @@
* @throws IOException if the filename is invalid
* {@link FSDirectory#isValidToCreate(UTF8)}.
*/
- public synchronized LocatedBlock startFile(UTF8 src,
+ public LocatedBlock startFile(UTF8 src,
+ UTF8 holder,
+ UTF8 clientMachine,
+ boolean overwrite,
+ short replication,
+ long blockSize
+ ) throws IOException {
+
+ //
+ // Create file into pendingCreates and get the first blockId
+ //
+ Block newBlock = startFileInternal(src, holder, clientMachine,
+ overwrite, replication,
+ blockSize);
+
+ //
+ // Get the array of replication targets
+ //
+ try {
+ DatanodeDescriptor clientNode =
+ host2DataNodeMap.getDatanodeByHost(clientMachine.toString());
+ DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
+ clientNode, null, blockSize);
+ if (targets.length < this.minReplication) {
+ if (clusterMap.getNumOfLeaves() == 0) {
+ throw new IOException("Failed to create file " + src
+ + " on client " + clientMachine
+ + " because this cluster has no datanodes.");
+ }
+ throw new IOException("Failed to create file " + src
+ + " on client " + clientMachine
+ + " because there were not enough datanodes available. "
+ + "Found " + targets.length
+ + " datanodes but MIN_REPLICATION for the cluster is "
+ + "configured to be "
+ + this.minReplication
+ + ".");
+ }
+ return new LocatedBlock(newBlock, targets, 0L);
+
+ } catch (IOException ie) {
+ NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
+ + ie.getMessage());
+ throw ie;
+ }
+ }
+
+ public synchronized Block startFileInternal(UTF8 src,
UTF8 holder,
UTF8 clientMachine,
boolean overwrite,
@@ -666,26 +721,8 @@
}
}
- // Get the array of replication targets
DatanodeDescriptor clientNode =
host2DataNodeMap.getDatanodeByHost(clientMachine.toString());
- DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
- clientNode, null, blockSize);
- if (targets.length < this.minReplication) {
- if (clusterMap.getNumOfLeaves() == 0) {
- throw new IOException("Failed to create file "+src
- + " on client " + clientMachine
- + " because this cluster has no datanodes.");
- }
- throw new IOException("Failed to create file "+src
- + " on client " + clientMachine
- + " because there were not enough datanodes available. "
- + "Found " + targets.length
- + " datanodes but MIN_REPLICATION for the cluster is "
- + "configured to be "
- + this.minReplication
- + ".");
- }
// Reserve space for this pending file
pendingCreates.put(src,
@@ -709,9 +746,9 @@
}
lease.startedCreate(src);
}
-
+
// Create first block
- return new LocatedBlock(allocateBlock(src), targets, 0L);
+ return allocateBlock(src);
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
+ie.getMessage());
@@ -730,38 +767,52 @@
* are replicated. Will return an empty 2-elt array if we want the
* client to "try again later".
*/
- public synchronized LocatedBlock getAdditionalBlock(UTF8 src,
- UTF8 clientName
- ) throws IOException {
+ public LocatedBlock getAdditionalBlock(UTF8 src,
+ UTF8 clientName
+ ) throws IOException {
+ long fileLength, blockSize;
+ int replication;
+ DatanodeDescriptor clientNode = null;
+ Block newBlock = null;
+
NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file "
+src+" for "+clientName);
- if (isInSafeMode())
- throw new SafeModeException("Cannot add block to " + src, safeMode);
- FileUnderConstruction pendingFile = pendingCreates.get(src);
- // make sure that we still have the lease on this file
- if (pendingFile == null) {
- throw new LeaseExpiredException("No lease on " + src);
- }
- if (!pendingFile.getClientName().equals(clientName)) {
- throw new LeaseExpiredException("Lease mismatch on " + src +
- " owned by " + pendingFile.getClientName() +
- " and appended by " + clientName);
- }
- //
- // If we fail this, bad things happen!
- //
- if (!checkFileProgress(pendingFile, false)) {
- throw new NotReplicatedYetException("Not replicated yet:" + src);
+ synchronized (this) {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot add block to " + src, safeMode);
+ }
+
+ //
+ // make sure that we still have the lease on this file
+ //
+ FileUnderConstruction pendingFile = pendingCreates.get(src);
+ if (pendingFile == null) {
+ throw new LeaseExpiredException("No lease on " + src);
+ }
+ if (!pendingFile.getClientName().equals(clientName)) {
+ throw new LeaseExpiredException("Lease mismatch on " + src +
+ " owned by " + pendingFile.getClientName() +
+ " and appended by " + clientName);
+ }
+
+ //
+ // If we fail this, bad things happen!
+ //
+ if (!checkFileProgress(pendingFile, false)) {
+ throw new NotReplicatedYetException("Not replicated yet:" + src);
+ }
+ fileLength = pendingFile.computeFileLength();
+ blockSize = pendingFile.getBlockSize();
+ clientNode = pendingFile.getClientNode();
+ replication = (int)pendingFile.getReplication();
+ newBlock = allocateBlock(src);
}
- // Get the array of replication targets
- DatanodeDescriptor clientNode = pendingFile.getClientNode();
- DatanodeDescriptor targets[] = replicator.chooseTarget(
- (int)(pendingFile.getReplication()),
+ DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
clientNode,
null,
- pendingFile.getBlockSize());
+ blockSize);
if (targets.length < this.minReplication) {
throw new IOException("File " + src + " could only be replicated to " +
targets.length + " nodes, instead of " +
@@ -769,9 +820,7 @@
}
// Create next block
- return new LocatedBlock(allocateBlock(src),
- targets,
- pendingFile.computeFileLength());
+ return new LocatedBlock(newBlock, targets, fileLength);
}
/**
@@ -930,7 +979,7 @@
/**
* Allocate a block at the given pending filename
*/
- synchronized Block allocateBlock(UTF8 src) {
+ private Block allocateBlock(UTF8 src) {
Block b = null;
do {
b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java?view=diff&rev=547419&r1=547418&r2=547419
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java Thu Jun 14 14:53:06 2007
@@ -18,26 +18,34 @@
package org.apache.hadoop.dfs;
import java.util.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
class Host2NodesMap {
private HashMap<String, DatanodeDescriptor[]> map
= new HashMap<String, DatanodeDescriptor[]>();
private Random r = new Random();
+ private ReadWriteLock hostmapLock = new ReentrantReadWriteLock();
/** Check if node is already in the map. */
- synchronized boolean contains(DatanodeDescriptor node) {
+ boolean contains(DatanodeDescriptor node) {
if (node==null) {
return false;
}
String host = node.getHost();
- DatanodeDescriptor[] nodes = map.get(host);
- if (nodes != null) {
- for(DatanodeDescriptor containedNode:nodes) {
- if (node==containedNode) {
- return true;
+ hostmapLock.readLock().lock();
+ try {
+ DatanodeDescriptor[] nodes = map.get(host);
+ if (nodes != null) {
+ for(DatanodeDescriptor containedNode:nodes) {
+ if (node==containedNode) {
+ return true;
+ }
}
}
+ } finally {
+ hostmapLock.readLock().unlock();
}
return false;
}
@@ -45,85 +53,101 @@
/** add node to the map
* return true if the node is added; false otherwise.
*/
- synchronized boolean add(DatanodeDescriptor node) {
- if (node==null || contains(node)) {
- return false;
- }
+ boolean add(DatanodeDescriptor node) {
+ hostmapLock.writeLock().lock();
+ try {
+ if (node==null || contains(node)) {
+ return false;
+ }
- String host = node.getHost();
- DatanodeDescriptor[] nodes = map.get(host);
- DatanodeDescriptor[] newNodes;
- if (nodes==null) {
- newNodes = new DatanodeDescriptor[1];
- newNodes[0]=node;
- } else { // rare case: more than one datanode on the host
- newNodes = new DatanodeDescriptor[nodes.length+1];
- System.arraycopy(nodes, 0, newNodes, 0, nodes.length);
- newNodes[nodes.length] = node;
+ String host = node.getHost();
+ DatanodeDescriptor[] nodes = map.get(host);
+ DatanodeDescriptor[] newNodes;
+ if (nodes==null) {
+ newNodes = new DatanodeDescriptor[1];
+ newNodes[0]=node;
+ } else { // rare case: more than one datanode on the host
+ newNodes = new DatanodeDescriptor[nodes.length+1];
+ System.arraycopy(nodes, 0, newNodes, 0, nodes.length);
+ newNodes[nodes.length] = node;
+ }
+ map.put(host, newNodes);
+ return true;
+ } finally {
+ hostmapLock.writeLock().unlock();
}
- map.put(host, newNodes);
- return true;
}
/** remove node from the map
* return true if the node is removed; false otherwise.
*/
- synchronized boolean remove(DatanodeDescriptor node) {
+ boolean remove(DatanodeDescriptor node) {
if (node==null) {
return false;
}
String host = node.getHost();
- DatanodeDescriptor[] nodes = map.get(host);
- if (nodes==null) {
- return false;
- }
- if (nodes.length==1) {
- if (nodes[0]==node) {
- map.remove(host);
- return true;
- } else {
+ hostmapLock.writeLock().lock();
+ try {
+
+ DatanodeDescriptor[] nodes = map.get(host);
+ if (nodes==null) {
return false;
}
- }
- //rare case
- int i=0;
- for(; i<nodes.length; i++) {
- if (nodes[i]==node) {
- break;
+ if (nodes.length==1) {
+ if (nodes[0]==node) {
+ map.remove(host);
+ return true;
+ } else {
+ return false;
+ }
}
- }
- if (i==nodes.length) {
- return false;
- } else {
- DatanodeDescriptor[] newNodes;
- newNodes = new DatanodeDescriptor[nodes.length-1];
- System.arraycopy(nodes, 0, newNodes, 0, i);
- System.arraycopy(nodes, i+1, newNodes, i, nodes.length-i-1);
- map.put(host, newNodes);
- return true;
+ //rare case
+ int i=0;
+ for(; i<nodes.length; i++) {
+ if (nodes[i]==node) {
+ break;
+ }
+ }
+ if (i==nodes.length) {
+ return false;
+ } else {
+ DatanodeDescriptor[] newNodes;
+ newNodes = new DatanodeDescriptor[nodes.length-1];
+ System.arraycopy(nodes, 0, newNodes, 0, i);
+ System.arraycopy(nodes, i+1, newNodes, i, nodes.length-i-1);
+ map.put(host, newNodes);
+ return true;
+ }
+ } finally {
+ hostmapLock.writeLock().unlock();
}
}
/** get a data node by its host.
* @return DatanodeDescriptor if found; otherwise null.
*/
- synchronized DatanodeDescriptor getDatanodeByHost(String host) {
+ DatanodeDescriptor getDatanodeByHost(String host) {
if (host==null) {
return null;
}
- DatanodeDescriptor[] nodes = map.get(host);
- // no entry
- if (nodes== null) {
- return null;
- }
- // one node
- if (nodes.length == 1) {
- return nodes[0];
+ hostmapLock.readLock().lock();
+ try {
+ DatanodeDescriptor[] nodes = map.get(host);
+ // no entry
+ if (nodes== null) {
+ return null;
+ }
+ // one node
+ if (nodes.length == 1) {
+ return nodes[0];
+ }
+ // more than one node
+ return nodes[r.nextInt(nodes.length)];
+ } finally {
+ hostmapLock.readLock().unlock();
}
- // more than one node
- return nodes[r.nextInt(nodes.length)];
}
/**
@@ -144,16 +168,21 @@
host = name.substring(0, colon);
}
- DatanodeDescriptor[] nodes = map.get(host);
- // no entry
- if (nodes== null) {
- return null;
- }
- for(DatanodeDescriptor containedNode:nodes) {
- if (name.equals(containedNode.getName())) {
- return containedNode;
+ hostmapLock.readLock().lock();
+ try {
+ DatanodeDescriptor[] nodes = map.get(host);
+ // no entry
+ if (nodes== null) {
+ return null;
}
+ for(DatanodeDescriptor containedNode:nodes) {
+ if (name.equals(containedNode.getName())) {
+ return containedNode;
+ }
+ }
+ return null;
+ } finally {
+ hostmapLock.readLock().unlock();
}
- return null;
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java?view=diff&rev=547419&r1=547418&r2=547419
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java Thu Jun 14 14:53:06 2007
@@ -23,6 +23,8 @@
import java.util.List;
import java.util.Random;
import java.util.Arrays;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -224,7 +226,7 @@
} // end of remove
/** Given a node's string representation, return a reference to the node */
- Node getLoc(String loc) {
+ private Node getLoc(String loc) {
if (loc == null || loc.length() == 0) return this;
String[] path = loc.split(PATH_SEPARATOR_STR, 2);
@@ -300,8 +302,10 @@
InnerNode clusterMap = new InnerNode(InnerNode.ROOT); // the root
private int numOfRacks = 0; // rack counter
+ private ReadWriteLock netlock;
public NetworkTopology() {
+ netlock = new ReentrantReadWriteLock();
}
/** Add a data node
@@ -310,21 +314,26 @@
* data node to be added
* @exception IllegalArgumentException if add a data node to a leave
*/
- public synchronized void add(DatanodeDescriptor node) {
+ public void add(DatanodeDescriptor node) {
if (node==null) return;
+ netlock.writeLock().lock();
LOG.info("Adding a new node: "+node.getPath());
- Node rack = getNode(node.getNetworkLocation());
- if (rack != null && !(rack instanceof InnerNode)) {
- throw new IllegalArgumentException("Unexpected data node "
- + node.toString()
- + " at an illegal network location");
- }
- if (clusterMap.add(node)) {
- if (rack == null) {
- numOfRacks++;
+ try {
+ Node rack = getNode(node.getNetworkLocation());
+ if (rack != null && !(rack instanceof InnerNode)) {
+ throw new IllegalArgumentException("Unexpected data node "
+ + node.toString()
+ + " at an illegal network location");
+ }
+ if (clusterMap.add(node)) {
+ if (rack == null) {
+ numOfRacks++;
+ }
}
+ LOG.debug("NetworkTopology became:\n" + this.toString());
+ } finally {
+ netlock.writeLock().unlock();
}
- LOG.debug("NetworkTopology became:\n" + this.toString());
}
/** Remove a data node
@@ -332,16 +341,21 @@
* @param node
* data node to be removed
*/
- public synchronized void remove(DatanodeDescriptor node) {
+ public void remove(DatanodeDescriptor node) {
if (node==null) return;
+ netlock.writeLock().lock();
LOG.info("Removing a node: "+node.getPath());
- if (clusterMap.remove(node)) {
- InnerNode rack = (InnerNode)getNode(node.getNetworkLocation());
- if (rack == null) {
- numOfRacks--;
+ try {
+ if (clusterMap.remove(node)) {
+ InnerNode rack = (InnerNode)getNode(node.getNetworkLocation());
+ if (rack == null) {
+ numOfRacks--;
+ }
}
+ LOG.debug("NetworkTopology became:\n" + this.toString());
+ } finally {
+ netlock.writeLock().unlock();
}
- LOG.debug("NetworkTopology became:\n" + this.toString());
}
/** Check if the tree contains data node <i>node</i>
@@ -350,13 +364,18 @@
* a data node
* @return true if <i>node</i> is already in the tree; false otherwise
*/
- public synchronized boolean contains(DatanodeDescriptor node) {
+ public boolean contains(DatanodeDescriptor node) {
if (node == null) return false;
- Node parent = node.getParent();
- for(int level=node.getLevel(); parent!=null&&level>0;
- parent=parent.getParent(), level--) {
- if (parent == clusterMap)
- return true;
+ netlock.readLock().lock();
+ try {
+ Node parent = node.getParent();
+ for(int level=node.getLevel(); parent!=null&&level>0;
+ parent=parent.getParent(), level--) {
+ if (parent == clusterMap)
+ return true;
+ }
+ } finally {
+ netlock.readLock().unlock();
}
return false;
}
@@ -367,7 +386,7 @@
* a path-like string representation of a node
* @return a reference to the node; null if the node is not in the tree
*/
- public synchronized Node getNode(String loc) {
+ private Node getNode(String loc) {
loc = NodeBase.normalize(loc);
if (!NodeBase.ROOT.equals(loc))
loc = loc.substring(1);
@@ -375,13 +394,23 @@
}
/** Return the total number of racks */
- public synchronized int getNumOfRacks() {
- return numOfRacks;
+ public int getNumOfRacks() {
+ netlock.readLock().lock();
+ try {
+ return numOfRacks;
+ } finally {
+ netlock.readLock().unlock();
+ }
}
/** Return the total number of data nodes */
- public synchronized int getNumOfLeaves() {
- return clusterMap.getNumOfLeaves();
+ public int getNumOfLeaves() {
+ netlock.readLock().lock();
+ try {
+ return clusterMap.getNumOfLeaves();
+ } finally {
+ netlock.readLock().unlock();
+ }
}
/** Return the distance between two data nodes
@@ -397,24 +426,28 @@
if (node1 == node2) {
return 0;
}
- int i;
Node n1=node1, n2=node2;
- int level1=node1.getLevel(), level2=node2.getLevel();
int dis = 0;
- while(n1!=null && level1>level2) {
- n1 = n1.getParent();
- level1--;
- dis++;
- }
- while(n2!=null && level2>level1) {
- n2 = n2.getParent();
- level2--;
- dis++;
- }
- while(n1!=null && n2!=null && n1.getParent()!=n2.getParent()) {
- n1=n1.getParent();
- n2=n2.getParent();
- dis+=2;
+ netlock.readLock().lock();
+ try {
+ int level1=node1.getLevel(), level2=node2.getLevel();
+ while(n1!=null && level1>level2) {
+ n1 = n1.getParent();
+ level1--;
+ dis++;
+ }
+ while(n2!=null && level2>level1) {
+ n2 = n2.getParent();
+ level2--;
+ dis++;
+ }
+ while(n1!=null && n2!=null && n1.getParent()!=n2.getParent()) {
+ n1=n1.getParent();
+ n2=n2.getParent();
+ dis+=2;
+ }
+ } finally {
+ netlock.readLock().unlock();
}
if (n1==null) {
LOG.warn("The cluster does not contain data node: "+node1.getPath());
@@ -440,11 +473,16 @@
return false;
}
- if (node1 == node2 || node1.equals(node2)) {
- return true;
- }
+ netlock.readLock().lock();
+ try {
+ if (node1 == node2 || node1.equals(node2)) {
+ return true;
+ }
- return node1.getParent()==node2.getParent();
+ return node1.getParent()==node2.getParent();
+ } finally {
+ netlock.readLock().unlock();
+ }
}
final private static Random r = new Random();
@@ -455,10 +493,15 @@
* @return the choosen data node
*/
public DatanodeDescriptor chooseRandom(String scope) {
- if (scope.startsWith("~")) {
- return chooseRandom(NodeBase.ROOT, scope.substring(1));
- } else {
- return chooseRandom(scope, null);
+ netlock.readLock().lock();
+ try {
+ if (scope.startsWith("~")) {
+ return chooseRandom(NodeBase.ROOT, scope.substring(1));
+ } else {
+ return chooseRandom(scope, null);
+ }
+ } finally {
+ netlock.readLock().unlock();
}
}
@@ -507,22 +550,27 @@
}
scope = NodeBase.normalize(scope);
int count=0; // the number of nodes in both scope & excludedNodes
- for(DatanodeDescriptor node:excludedNodes) {
- if ((node.getPath()+NodeBase.PATH_SEPARATOR_STR).
- startsWith(scope+NodeBase.PATH_SEPARATOR_STR)) {
- count++;
+ netlock.readLock().lock();
+ try {
+ for(DatanodeDescriptor node:excludedNodes) {
+ if ((node.getPath()+NodeBase.PATH_SEPARATOR_STR).
+ startsWith(scope+NodeBase.PATH_SEPARATOR_STR)) {
+ count++;
+ }
+ }
+ Node n=getNode(scope);
+ int scopeNodeCount=1;
+ if (n instanceof InnerNode) {
+ scopeNodeCount=((InnerNode)n).getNumOfLeaves();
+ }
+ if (isExcluded) {
+ return clusterMap.getNumOfLeaves()-
+ scopeNodeCount-excludedNodes.size()+count;
+ } else {
+ return scopeNodeCount-count;
}
- }
- Node n=getNode(scope);
- int scopeNodeCount=1;
- if (n instanceof InnerNode) {
- scopeNodeCount=((InnerNode)n).getNumOfLeaves();
- }
- if (isExcluded) {
- return clusterMap.getNumOfLeaves()-
- scopeNodeCount-excludedNodes.size()+count;
- } else {
- return scopeNodeCount-count;
+ } finally {
+ netlock.readLock().unlock();
}
}
@@ -549,21 +597,22 @@
/* Set and used only inside sortByDistance.
* This saves an allocation each time we sort.
*/
- private DatanodeDescriptor distFrom = null;
+ private static ThreadLocal<DatanodeDescriptor> distFrom =
+ new ThreadLocal<DatanodeDescriptor>();
private final Comparator<DatanodeDescriptor> nodeDistanceComparator =
new Comparator<DatanodeDescriptor>() {
public int compare(DatanodeDescriptor n1, DatanodeDescriptor n2) {
- return getDistance(distFrom, n1) - getDistance(distFrom, n2);
+ return getDistance(distFrom.get(), n1) - getDistance(distFrom.get(), n2);
}
};
/** Sorts nodes array by their distances to <i>reader</i>. */
- public synchronized void sortByDistance(final DatanodeDescriptor reader,
- DatanodeDescriptor[] nodes) {
+ public void sortByDistance(final DatanodeDescriptor reader,
+ DatanodeDescriptor[] nodes) {
if (reader != null && contains(reader)) {
- distFrom = reader;
+ distFrom.set(reader);
Arrays.sort(nodes, nodeDistanceComparator);
- distFrom = null;
+ distFrom.set(null);
}
}
}