You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/19 23:34:53 UTC
svn commit: r530556 [5/12] - in /lucene/hadoop/trunk: ./
src/contrib/abacus/src/java/org/apache/hadoop/abacus/
src/contrib/hbase/src/java/org/apache/hadoop/hbase/
src/contrib/hbase/src/test/org/apache/hadoop/hbase/
src/contrib/streaming/src/java/org/ap...
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=530556&r1=530555&r2=530556
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Apr 19 14:34:41 2007
@@ -221,21 +221,21 @@
this.defaultReplication = conf.getInt("dfs.replication", 3);
this.maxReplication = conf.getInt("dfs.replication.max", 512);
this.minReplication = conf.getInt("dfs.replication.min", 1);
- if( minReplication <= 0 )
+ if (minReplication <= 0)
throw new IOException(
"Unexpected configuration parameters: dfs.replication.min = "
+ minReplication
- + " must be greater than 0" );
- if( maxReplication >= (int)Short.MAX_VALUE )
+ + " must be greater than 0");
+ if (maxReplication >= (int)Short.MAX_VALUE)
throw new IOException(
"Unexpected configuration parameters: dfs.replication.max = "
- + maxReplication + " must be less than " + (Short.MAX_VALUE) );
- if( maxReplication < minReplication )
+ + maxReplication + " must be less than " + (Short.MAX_VALUE));
+ if (maxReplication < minReplication)
throw new IOException(
"Unexpected configuration parameters: dfs.replication.min = "
+ minReplication
+ " must be less than dfs.replication.max = "
- + maxReplication );
+ + maxReplication);
this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
this.heartbeatRecheckInterval = 5 * 60 * 1000; // 5 minutes
@@ -245,11 +245,11 @@
this.localMachine = hostname;
this.port = port;
- this.dir = new FSDirectory( this );
- StartupOption startOpt = (StartupOption)conf.get(
- "dfs.namenode.startup", StartupOption.REGULAR );
- this.dir.loadFSImage( getNamespaceDirs(conf), startOpt );
- this.safeMode = new SafeModeInfo( conf );
+ this.dir = new FSDirectory(this);
+ StartupOption startOpt = (StartupOption)conf.get(
+ "dfs.namenode.startup", StartupOption.REGULAR);
+ this.dir.loadFSImage(getNamespaceDirs(conf), startOpt);
+ this.safeMode = new SafeModeInfo(conf);
setBlockTotal();
pendingReplications = new PendingReplicationBlocks(LOG);
this.hbthread = new Daemon(new HeartbeatMonitor());
@@ -268,7 +268,7 @@
this.infoPort = conf.getInt("dfs.info.port", 50070);
this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0");
- this.infoServer = new StatusHttpServer("dfs",infoBindAddress, infoPort, false);
+ this.infoServer = new StatusHttpServer("dfs", infoBindAddress, infoPort, false);
this.infoServer.setAttribute("name.system", this);
this.infoServer.setAttribute("name.node", nn);
this.infoServer.setAttribute("name.conf", conf);
@@ -286,9 +286,9 @@
String[] dirNames = conf.getStrings("dfs.name.dir");
if (dirNames == null)
dirNames = new String[] {"/tmp/hadoop/dfs/name"};
- Collection<File> dirs = new ArrayList<File>( dirNames.length );
- for( int idx = 0; idx < dirNames.length; idx++ ) {
- dirs.add( new File(dirNames[idx] ));
+ Collection<File> dirs = new ArrayList<File>(dirNames.length);
+ for(int idx = 0; idx < dirNames.length; idx++) {
+ dirs.add(new File(dirNames[idx]));
}
return dirs;
}
@@ -310,8 +310,8 @@
}
NamespaceInfo getNamespaceInfo() {
- return new NamespaceInfo( dir.fsImage.getNamespaceID(),
- dir.fsImage.getCTime() );
+ return new NamespaceInfo(dir.fsImage.getNamespaceID(),
+ dir.fsImage.getCTime());
}
/** Close down this filesystem manager.
@@ -368,9 +368,9 @@
Block block = it.next();
out.print(block);
for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
- jt.hasNext(); ) {
+ jt.hasNext();) {
DatanodeDescriptor node = jt.next();
- out.print(" " + node + " : " );
+ out.print(" " + node + " : ");
}
out.println("");
}
@@ -397,9 +397,9 @@
}
/* get replication factor of a block */
- private int getReplication( Block block ) {
- FSDirectory.INode fileINode = blocksMap.getINode( block );
- if( fileINode == null ) { // block does not belong to any file
+ private int getReplication(Block block) {
+ FSDirectory.INode fileINode = blocksMap.getINode(block);
+ if (fileINode == null) { // block does not belong to any file
return 0;
} else {
return fileINode.getReplication();
@@ -424,7 +424,7 @@
/* Return the total number of under replication blocks */
synchronized int size() {
int size = 0;
- for( int i=0; i<LEVEL; i++ ) {
+ for(int i=0; i<LEVEL; i++) {
size += priorityQueues.get(i).size();
}
return size;
@@ -433,7 +433,7 @@
/* Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) {
for(TreeSet<Block> set:priorityQueues) {
- if(set.contains(block)) return true;
+ if (set.contains(block)) return true;
}
return false;
}
@@ -447,9 +447,9 @@
int curReplicas, int expectedReplicas) {
if (curReplicas<=0 || curReplicas>=expectedReplicas) {
return LEVEL; // no need to replicate
- } else if(curReplicas==1) {
+ } else if (curReplicas==1) {
return 0; // highest priority
- } else if(curReplicas*3<expectedReplicas) {
+ } else if (curReplicas*3<expectedReplicas) {
return 1;
} else {
return 2;
@@ -463,18 +463,18 @@
*/
synchronized boolean add(
Block block, int curReplicas, int expectedReplicas) {
- if(curReplicas<=0 || expectedReplicas <= curReplicas) {
+ if (curReplicas<=0 || expectedReplicas <= curReplicas) {
return false;
}
int priLevel = getPriority(block, curReplicas, expectedReplicas);
- if( priorityQueues.get(priLevel).add(block) ) {
+ if (priorityQueues.get(priLevel).add(block)) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.add:"
+ block.getBlockName()
+ " has only "+curReplicas
+ " replicas and need " + expectedReplicas
+ " replicas so is added to neededReplications"
- + " at priority level " + priLevel );
+ + " at priority level " + priLevel);
return true;
}
return false;
@@ -484,7 +484,7 @@
synchronized boolean add(Block block) {
int expectedReplicas = getReplication(block);
return add(block,
- countContainingNodes( block ),
+ countContainingNodes(block),
expectedReplicas);
}
@@ -496,21 +496,21 @@
}
/* remove a block from a under replication queue given a priority*/
- private boolean remove(Block block, int priLevel ) {
- if( priLevel >= 0 && priLevel < LEVEL
- && priorityQueues.get(priLevel).remove(block) ) {
+ private boolean remove(Block block, int priLevel) {
+ if (priLevel >= 0 && priLevel < LEVEL
+ && priorityQueues.get(priLevel).remove(block)) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.remove: "
+ "Removing block " + block.getBlockName()
- + " from priority queue "+ priLevel );
+ + " from priority queue "+ priLevel);
return true;
} else {
for(int i=0; i<LEVEL; i++) {
- if( i!=priLevel && priorityQueues.get(i).remove(block) ) {
+ if (i!=priLevel && priorityQueues.get(i).remove(block)) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.remove: "
+ "Removing block " + block.getBlockName()
- + " from priority queue "+ i );
+ + " from priority queue "+ i);
return true;
}
}
@@ -520,7 +520,7 @@
/* remove a block from a under replication queue */
synchronized boolean remove(Block block) {
- int curReplicas = countContainingNodes( block );
+ int curReplicas = countContainingNodes(block);
int expectedReplicas = getReplication(block);
return remove(block, curReplicas, expectedReplicas);
}
@@ -528,7 +528,7 @@
/* update the priority level of a block */
synchronized void update(Block block,
int curReplicasDelta, int expectedReplicasDelta) {
- int curReplicas = countContainingNodes( block );
+ int curReplicas = countContainingNodes(block);
int curExpectedReplicas = getReplication(block);
int oldReplicas = curReplicas-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
@@ -542,10 +542,10 @@
" oldExpectedReplicas " + oldExpectedReplicas +
" curPri " + curPri +
" oldPri " + oldPri);
- if( oldPri != LEVEL && oldPri != curPri ) {
+ if (oldPri != LEVEL && oldPri != curPri) {
remove(block, oldPri);
}
- if( curPri != LEVEL && oldPri != curPri
+ if (curPri != LEVEL && oldPri != curPri
&& priorityQueues.get(curPri).add(block)) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.update:"
@@ -553,7 +553,7 @@
+ " has only "+curReplicas
+ " replicas and need " + curExpectedReplicas
+ " replicas so is added to neededReplications"
- + " at priority level " + curPri );
+ + " at priority level " + curPri);
}
}
@@ -571,7 +571,7 @@
}
private void update() {
- while( level< LEVEL-1 && !iterators.get(level).hasNext() ) {
+ while(level< LEVEL-1 && !iterators.get(level).hasNext() ) {
level++;
}
}
@@ -616,17 +616,17 @@
host2DataNodeMap.getDatanodeByHost(clientMachine);
for (int i = 0; i < blocks.length; i++) {
- int numNodes = blocksMap.numNodes( blocks[i] );
- if ( numNodes <= 0 ) {
+ int numNodes = blocksMap.numNodes(blocks[i]);
+ if (numNodes <= 0) {
machineSets[i] = new DatanodeDescriptor[0];
} else {
machineSets[i] = new DatanodeDescriptor[ numNodes ];
numNodes = 0;
- for( Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator( blocks[i] ); it.hasNext(); ) {
+ for(Iterator<DatanodeDescriptor> it =
+ blocksMap.nodeIterator(blocks[i]); it.hasNext();) {
machineSets[i][ numNodes++ ] = it.next();
}
- clusterMap.sortByDistance( client, machineSets[i] );
+ clusterMap.sortByDistance(client, machineSets[i]);
}
}
@@ -653,31 +653,31 @@
public synchronized boolean setReplication(String src,
short replication
) throws IOException {
- if( isInSafeMode() )
- throw new SafeModeException( "Cannot set replication for " + src, safeMode );
- verifyReplication(src, replication, null );
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot set replication for " + src, safeMode);
+ verifyReplication(src, replication, null);
Vector<Integer> oldReplication = new Vector<Integer>();
Block[] fileBlocks;
- fileBlocks = dir.setReplication( src, replication, oldReplication );
- if( fileBlocks == null ) // file not found or is a directory
+ fileBlocks = dir.setReplication(src, replication, oldReplication);
+ if (fileBlocks == null) // file not found or is a directory
return false;
int oldRepl = oldReplication.elementAt(0).intValue();
- if( oldRepl == replication ) // the same replication
+ if (oldRepl == replication) // the same replication
return true;
// update needReplication priority queues
LOG.info("Increasing replication for file " + src
- + ". New replication is " + replication );
- for( int idx = 0; idx < fileBlocks.length; idx++ )
- neededReplications.update( fileBlocks[idx], 0, replication-oldRepl );
+ + ". New replication is " + replication);
+ for(int idx = 0; idx < fileBlocks.length; idx++)
+ neededReplications.update(fileBlocks[idx], 0, replication-oldRepl);
- if( oldRepl > replication ) {
+ if (oldRepl > replication) {
// old replication > the new one; need to remove copies
LOG.info("Reducing replication for file " + src
- + ". New replication is " + replication );
- for( int idx = 0; idx < fileBlocks.length; idx++ )
- proccessOverReplicatedBlock( fileBlocks[idx], replication );
+ + ". New replication is " + replication);
+ for(int idx = 0; idx < fileBlocks.length; idx++)
+ proccessOverReplicatedBlock(fileBlocks[idx], replication);
}
return true;
}
@@ -690,21 +690,21 @@
* Check whether the replication parameter is within the range
* determined by system configuration.
*/
- private void verifyReplication( String src,
- short replication,
- UTF8 clientName
- ) throws IOException {
+ private void verifyReplication(String src,
+ short replication,
+ UTF8 clientName
+ ) throws IOException {
String text = "file " + src
+ ((clientName != null) ? " on client " + clientName : "")
+ ".\n"
+ "Requested replication " + replication;
- if( replication > maxReplication )
- throw new IOException( text + " exceeds maximum " + maxReplication );
+ if (replication > maxReplication)
+ throw new IOException(text + " exceeds maximum " + maxReplication);
- if( replication < minReplication )
- throw new IOException(
- text + " is less than the required minimum " + minReplication );
+ if (replication < minReplication)
+ throw new IOException(
+ text + " is less than the required minimum " + minReplication);
}
/**
@@ -718,17 +718,17 @@
* @throws IOException if the filename is invalid
* {@link FSDirectory#isValidToCreate(UTF8)}.
*/
- public synchronized Object[] startFile( UTF8 src,
- UTF8 holder,
- UTF8 clientMachine,
- boolean overwrite,
- short replication,
- long blockSize
- ) throws IOException {
+ public synchronized Object[] startFile(UTF8 src,
+ UTF8 holder,
+ UTF8 clientMachine,
+ boolean overwrite,
+ short replication,
+ long blockSize
+ ) throws IOException {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: file "
+src+" for "+holder+" at "+clientMachine);
- if( isInSafeMode() )
- throw new SafeModeException( "Cannot create file" + src, safeMode );
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot create file" + src, safeMode);
if (!isValidName(src.toString())) {
throw new IOException("Invalid file name: " + src);
}
@@ -785,9 +785,9 @@
}
try {
- verifyReplication(src.toString(), replication, clientMachine );
- } catch( IOException e) {
- throw new IOException( "failed to create "+e.getMessage());
+ verifyReplication(src.toString(), replication, clientMachine);
+ } catch(IOException e) {
+ throw new IOException("failed to create "+e.getMessage());
}
if (!dir.isValidToCreate(src)) {
if (overwrite) {
@@ -827,8 +827,8 @@
holder,
clientMachine,
clientNode));
- NameNode.stateChangeLog.debug( "DIR* NameSystem.startFile: "
- +"add "+src+" to pendingCreates for "+holder );
+ NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+ +"add "+src+" to pendingCreates for "+holder);
synchronized (leases) {
Lease lease = leases.get(holder);
if (lease == null) {
@@ -871,8 +871,8 @@
) throws IOException {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file "
+src+" for "+clientName);
- if( isInSafeMode() )
- throw new SafeModeException( "Cannot add block to " + src, safeMode );
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot add block to " + src, safeMode);
FileUnderConstruction pendingFile = pendingCreates.get(src);
// make sure that we still have the lease on this file
if (pendingFile == null) {
@@ -916,11 +916,11 @@
// Remove the block from the pending creates list
//
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
- +b.getBlockName()+"of file "+src );
+ +b.getBlockName()+"of file "+src);
FileUnderConstruction pendingFile = pendingCreates.get(src);
if (pendingFile != null) {
Collection<Block> pendingVector = pendingFile.getBlocks();
- for (Iterator<Block> it = pendingVector.iterator(); it.hasNext(); ) {
+ for (Iterator<Block> it = pendingVector.iterator(); it.hasNext();) {
Block cur = it.next();
if (cur.compareTo(b) == 0) {
pendingCreateBlocks.remove(cur);
@@ -942,7 +942,7 @@
public synchronized void abandonFileInProgress(UTF8 src,
UTF8 holder
) throws IOException {
- NameNode.stateChangeLog.debug("DIR* NameSystem.abandonFileInProgress:" + src );
+ NameNode.stateChangeLog.debug("DIR* NameSystem.abandonFileInProgress:" + src);
synchronized (leases) {
// find the lease
Lease lease = leases.get(holder);
@@ -969,20 +969,20 @@
* Before we return, we make sure that all the file's blocks have
* been reported by datanodes and are replicated correctly.
*/
- public synchronized int completeFile( UTF8 src,
- UTF8 holder) throws IOException {
- NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder );
- if( isInSafeMode() )
- throw new SafeModeException( "Cannot complete file " + src, safeMode );
+ public synchronized int completeFile(UTF8 src,
+ UTF8 holder) throws IOException {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot complete file " + src, safeMode);
FileUnderConstruction pendingFile = pendingCreates.get(src);
if (dir.getFile(src) != null || pendingFile == null) {
- NameNode.stateChangeLog.warn( "DIR* NameSystem.completeFile: "
- + "failed to complete " + src
- + " because dir.getFile()==" + dir.getFile(src)
- + " and " + pendingFile);
+ NameNode.stateChangeLog.warn("DIR* NameSystem.completeFile: "
+ + "failed to complete " + src
+ + " because dir.getFile()==" + dir.getFile(src)
+ + " and " + pendingFile);
return OPERATION_FAILED;
- } else if (! checkFileProgress(pendingFile, true)) {
+ } else if (!checkFileProgress(pendingFile, true)) {
return STILL_WAITING;
}
@@ -998,8 +998,8 @@
//
for (int i = 0; i < nrBlocks; i++) {
Block b = pendingBlocks[i];
- Block storedBlock = blocksMap.getStoredBlock( b );
- if ( storedBlock != null ) {
+ Block storedBlock = blocksMap.getStoredBlock(b);
+ if (storedBlock != null) {
pendingBlocks[i] = storedBlock;
}
}
@@ -1007,7 +1007,7 @@
//
// Now we can add the (name,blocks) tuple to the filesystem
//
- if ( ! dir.addFile(src, pendingBlocks, pendingFile.getReplication())) {
+ if (!dir.addFile(src, pendingBlocks, pendingFile.getReplication())) {
return OPERATION_FAILED;
}
@@ -1024,7 +1024,7 @@
Lease lease = leases.get(holder);
if (lease != null) {
lease.completedCreate(src);
- if (! lease.hasLocks()) {
+ if (!lease.hasLocks()) {
leases.remove(holder);
sortedLeases.remove(lease);
}
@@ -1043,7 +1043,7 @@
int numExpectedReplicas = pendingFile.getReplication();
for (int i = 0; i < nrBlocks; i++) {
// filter out containingNodes that are marked for decommission.
- int numCurrentReplica = countContainingNodes( pendingBlocks[i] );
+ int numCurrentReplica = countContainingNodes(pendingBlocks[i]);
if (numCurrentReplica < numExpectedReplicas) {
neededReplications.add(
pendingBlocks[i], numCurrentReplica, numExpectedReplicas);
@@ -1061,13 +1061,13 @@
Block b = null;
do {
b = new Block(FSNamesystem.randBlockId.nextLong(), 0);
- } while ( isValidBlock(b) );
+ } while (isValidBlock(b));
FileUnderConstruction v = pendingCreates.get(src);
v.getBlocks().add(b);
pendingCreateBlocks.add(b);
NameNode.stateChangeLog.debug("BLOCK* NameSystem.allocateBlock: "
+src+ ". "+b.getBlockName()+
- " is created and added to pendingCreates and pendingCreateBlocks" );
+ " is created and added to pendingCreates and pendingCreateBlocks");
return b;
}
@@ -1081,8 +1081,8 @@
//
// check all blocks of the file.
//
- for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext(); ) {
- if ( blocksMap.numNodes(it.next()) < this.minReplication ) {
+ for (Iterator<Block> it = v.getBlocks().iterator(); it.hasNext();) {
+ if (blocksMap.numNodes(it.next()) < this.minReplication) {
return false;
}
}
@@ -1156,9 +1156,9 @@
// Check how many copies we have of the block. If we have at least one
// copy on a live node, then we can delete it.
- int count = countContainingNodes( blk );
- if ( (count > 1) || ( (count == 1) && ( dn.isDecommissionInProgress() ||
- dn.isDecommissioned() ))) {
+ int count = countContainingNodes(blk);
+ if ((count > 1) || ((count == 1) && (dn.isDecommissionInProgress() ||
+ dn.isDecommissioned()))) {
addToInvalidates(blk, dn);
removeStoredBlock(blk, getDatanode(dn));
NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
@@ -1186,9 +1186,9 @@
* Change the indicated filename.
*/
public synchronized boolean renameTo(UTF8 src, UTF8 dst) throws IOException {
- NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst );
- if( isInSafeMode() )
- throw new SafeModeException( "Cannot rename " + src, safeMode );
+ NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst);
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot rename " + src, safeMode);
if (!isValidName(dst.toString())) {
throw new IOException("Invalid name: " + dst);
}
@@ -1200,21 +1200,21 @@
* invalidate some blocks that make up the file.
*/
public synchronized boolean delete(UTF8 src) throws IOException {
- NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src );
- if( isInSafeMode() )
- throw new SafeModeException( "Cannot delete " + src, safeMode );
+ NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot delete " + src, safeMode);
Block deletedBlocks[] = dir.delete(src);
if (deletedBlocks != null) {
for (int i = 0; i < deletedBlocks.length; i++) {
Block b = deletedBlocks[i];
- for ( Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator( b ); it.hasNext(); ) {
+ for (Iterator<DatanodeDescriptor> it =
+ blocksMap.nodeIterator(b); it.hasNext();) {
DatanodeDescriptor node = it.next();
addToInvalidates(b, node);
NameNode.stateChangeLog.debug("BLOCK* NameSystem.delete: "
+ b.getBlockName() + " is added to invalidSet of "
- + node.getName() );
+ + node.getName());
}
}
}
@@ -1253,7 +1253,7 @@
// Check for ".." "." ":" "/"
StringTokenizer tokens = new StringTokenizer(src, Path.SEPARATOR);
- while( tokens.hasMoreTokens()) {
+ while(tokens.hasMoreTokens()) {
String element = tokens.nextToken();
if (element.equals("..") ||
element.equals(".") ||
@@ -1268,11 +1268,11 @@
/**
* Create all the necessary directories
*/
- public synchronized boolean mkdirs( String src ) throws IOException {
+ public synchronized boolean mkdirs(String src) throws IOException {
boolean success;
- NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src );
- if( isInSafeMode() )
- throw new SafeModeException( "Cannot create directory " + src, safeMode );
+ NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot create directory " + src, safeMode);
if (!isValidName(src)) {
throw new IOException("Invalid directory name: " + src);
}
@@ -1294,7 +1294,7 @@
int startBlock = -1;
int endBlock = -1;
- Block blocks[] = dir.getFile( new UTF8( src ));
+ Block blocks[] = dir.getFile(new UTF8(src));
if (blocks == null) { // no blocks
return new String[0][];
@@ -1332,9 +1332,9 @@
String hosts[][] = new String[(endBlock - startBlock) + 1][];
for (int i = startBlock; i <= endBlock; i++) {
Collection<String> v = new ArrayList<String>();
- for ( Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator( blocks[i] ); it.hasNext(); ) {
- v.add( it.next().getHostName() );
+ for (Iterator<DatanodeDescriptor> it =
+ blocksMap.nodeIterator(blocks[i]); it.hasNext();) {
+ v.add(it.next().getHostName());
}
hosts[i-startBlock] = v.toArray(new String[v.size()]);
}
@@ -1396,10 +1396,10 @@
return (locks.size() + creates.size()) > 0;
}
public void releaseLocks() {
- for (Iterator<UTF8> it = locks.iterator(); it.hasNext(); )
+ for (Iterator<UTF8> it = locks.iterator(); it.hasNext();)
internalReleaseLock(it.next(), holder);
locks.clear();
- for (Iterator<UTF8> it = creates.iterator(); it.hasNext(); )
+ for (Iterator<UTF8> it = creates.iterator(); it.hasNext();)
internalReleaseCreate(it.next(), holder);
creates.clear();
}
@@ -1463,51 +1463,53 @@
/**
* Get a lock (perhaps exclusive) on the given file
*/
- /** @deprecated */ @Deprecated
- public synchronized int obtainLock( UTF8 src,
- UTF8 holder,
- boolean exclusive) throws IOException {
- if( isInSafeMode() )
- throw new SafeModeException( "Cannot lock file " + src, safeMode );
- int result = dir.obtainLock(src, holder, exclusive);
- if (result == COMPLETE_SUCCESS) {
- synchronized (leases) {
- Lease lease = leases.get(holder);
- if (lease == null) {
- lease = new Lease(holder);
- leases.put(holder, lease);
- sortedLeases.add(lease);
- } else {
- sortedLeases.remove(lease);
- lease.renew();
- sortedLeases.add(lease);
- }
- lease.obtained(src);
+ /** @deprecated */
+ @Deprecated
+ public synchronized int obtainLock(UTF8 src,
+ UTF8 holder,
+ boolean exclusive) throws IOException {
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot lock file " + src, safeMode);
+ int result = dir.obtainLock(src, holder, exclusive);
+ if (result == COMPLETE_SUCCESS) {
+ synchronized (leases) {
+ Lease lease = leases.get(holder);
+ if (lease == null) {
+ lease = new Lease(holder);
+ leases.put(holder, lease);
+ sortedLeases.add(lease);
+ } else {
+ sortedLeases.remove(lease);
+ lease.renew();
+ sortedLeases.add(lease);
}
+ lease.obtained(src);
}
- return result;
}
+ return result;
+ }
/**
* Release the lock on the given file
*/
- /** @deprecated */ @Deprecated
- public synchronized int releaseLock(UTF8 src, UTF8 holder) {
- int result = internalReleaseLock(src, holder);
- if (result == COMPLETE_SUCCESS) {
- synchronized (leases) {
- Lease lease = leases.get(holder);
- if (lease != null) {
- lease.released(src);
- if (! lease.hasLocks()) {
- leases.remove(holder);
- sortedLeases.remove(lease);
- }
+ /** @deprecated */
+ @Deprecated
+ public synchronized int releaseLock(UTF8 src, UTF8 holder) {
+ int result = internalReleaseLock(src, holder);
+ if (result == COMPLETE_SUCCESS) {
+ synchronized (leases) {
+ Lease lease = leases.get(holder);
+ if (lease != null) {
+ lease.released(src);
+ if (!lease.hasLocks()) {
+ leases.remove(holder);
+ sortedLeases.remove(lease);
}
}
}
- return result;
}
+ return result;
+ }
private int internalReleaseLock(UTF8 src, UTF8 holder) {
return dir.releaseLock(src, holder);
}
@@ -1524,7 +1526,7 @@
"DIR* NameSystem.internalReleaseCreate: " + src
+ " is removed from pendingCreates for "
+ holder + " (failure)");
- for (Iterator<Block> it2 = v.getBlocks().iterator(); it2.hasNext(); ) {
+ for (Iterator<Block> it2 = v.getBlocks().iterator(); it2.hasNext();) {
Block b = it2.next();
pendingCreateBlocks.remove(b);
}
@@ -1540,8 +1542,8 @@
*/
public void renewLease(UTF8 holder) throws IOException {
synchronized (leases) {
- if( isInSafeMode() )
- throw new SafeModeException( "Cannot renew lease for " + holder, safeMode );
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
Lease lease = leases.get(holder);
if (lease != null) {
sortedLeases.remove(lease);
@@ -1588,57 +1590,57 @@
* @see DataNode#register()
* @author Konstantin Shvachko
*/
- public synchronized void registerDatanode( DatanodeRegistration nodeReg,
- String networkLocation
- ) throws IOException {
+ public synchronized void registerDatanode(DatanodeRegistration nodeReg,
+ String networkLocation
+ ) throws IOException {
if (!verifyNodeRegistration(nodeReg)) {
- throw new DisallowedDatanodeException( nodeReg );
+ throw new DisallowedDatanodeException(nodeReg);
}
String dnAddress = Server.getRemoteAddress();
- if ( dnAddress == null ) {
+ if (dnAddress == null) {
//Mostly not called inside an RPC.
- throw new IOException( "Could not find remote address for " +
- "registration from " + nodeReg.getName() );
+ throw new IOException("Could not find remote address for " +
+ "registration from " + nodeReg.getName());
}
String hostName = nodeReg.getHost();
// update the datanode's name with ip:port
- DatanodeID dnReg = new DatanodeID( dnAddress + ":" + nodeReg.getPort(),
- nodeReg.getStorageID(),
- nodeReg.getInfoPort() );
- nodeReg.updateRegInfo( dnReg );
+ DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
+ nodeReg.getStorageID(),
+ nodeReg.getInfoPort());
+ nodeReg.updateRegInfo(dnReg);
NameNode.stateChangeLog.info(
"BLOCK* NameSystem.registerDatanode: "
+ "node registration from " + nodeReg.getName()
- + " storage " + nodeReg.getStorageID() );
+ + " storage " + nodeReg.getStorageID());
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
- DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName( nodeReg.getName() );
+ DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName(nodeReg.getName());
- if( nodeN != null && nodeN != nodeS ) {
- NameNode.LOG.info( "BLOCK* NameSystem.registerDatanode: "
- + "node from name: " + nodeN.getName() );
+ if (nodeN != null && nodeN != nodeS) {
+ NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
+ + "node from name: " + nodeN.getName());
// nodeN previously served a different data storage,
// which is not served by anybody anymore.
- removeDatanode( nodeN );
+ removeDatanode(nodeN);
// physically remove node from datanodeMap
- wipeDatanode( nodeN );
+ wipeDatanode(nodeN);
// and log removal
- getEditLog().logRemoveDatanode( nodeN );
+ getEditLog().logRemoveDatanode(nodeN);
nodeN = null;
}
- if ( nodeS != null ) {
- if( nodeN == nodeS ) {
+ if (nodeS != null) {
+ if (nodeN == nodeS) {
// The same datanode has been just restarted to serve the same data
// storage. We do not need to remove old data blocks, the delta will
// be calculated on the next block report from the datanode
NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
- + "node restarted." );
+ + "node restarted.");
} else {
// nodeS is found
// The registering datanode is a replacement node for the existing
@@ -1646,46 +1648,46 @@
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.registerDatanode: "
+ "node " + nodeS.getName()
- + " is replaced by " + nodeReg.getName() + "." );
+ + " is replaced by " + nodeReg.getName() + ".");
}
- getEditLog().logRemoveDatanode( nodeS );
+ getEditLog().logRemoveDatanode(nodeS);
// update cluster map
- clusterMap.remove( nodeS );
- nodeS.updateRegInfo( nodeReg );
- nodeS.setNetworkLocation( networkLocation );
- clusterMap.add( nodeS );
- nodeS.setHostName( hostName );
- getEditLog().logAddDatanode( nodeS );
+ clusterMap.remove(nodeS);
+ nodeS.updateRegInfo(nodeReg);
+ nodeS.setNetworkLocation(networkLocation);
+ clusterMap.add(nodeS);
+ nodeS.setHostName(hostName);
+ getEditLog().logAddDatanode(nodeS);
// also treat the registration message as a heartbeat
- synchronized( heartbeats ) {
- heartbeats.add( nodeS );
+ synchronized(heartbeats) {
+ heartbeats.add(nodeS);
//update its timestamp
- nodeS.updateHeartbeat( 0L, 0L, 0);
+ nodeS.updateHeartbeat(0L, 0L, 0);
nodeS.isAlive = true;
}
return;
}
// this is a new datanode serving a new data storage
- if( nodeReg.getStorageID().equals("") ) {
+ if (nodeReg.getStorageID().equals("")) {
// this data storage has never been registered
// it is either empty or was created by pre-storageID version of DFS
nodeReg.storageID = newStorageID();
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.registerDatanode: "
- + "new storageID " + nodeReg.getStorageID() + " assigned." );
+ + "new storageID " + nodeReg.getStorageID() + " assigned.");
}
// register new datanode
DatanodeDescriptor nodeDescr
- = new DatanodeDescriptor( nodeReg, networkLocation, hostName );
- unprotectedAddDatanode( nodeDescr );
+ = new DatanodeDescriptor(nodeReg, networkLocation, hostName);
+ unprotectedAddDatanode(nodeDescr);
clusterMap.add(nodeDescr);
- getEditLog().logAddDatanode( nodeDescr );
+ getEditLog().logAddDatanode(nodeDescr);
// also treat the registration message as a heartbeat
- synchronized( heartbeats ) {
- heartbeats.add( nodeDescr );
+ synchronized(heartbeats) {
+ heartbeats.add(nodeDescr);
nodeDescr.isAlive = true;
// no need to update its timestamp
// because its is done when the descriptor is created
@@ -1701,7 +1703,7 @@
* @return registration ID
*/
public String getRegistrationID() {
- return Storage.getRegistrationID( dir.fsImage );
+ return Storage.getRegistrationID(dir.fsImage);
}
/**
@@ -1714,9 +1716,9 @@
*/
private String newStorageID() {
String newID = null;
- while( newID == null ) {
- newID = "DS" + Integer.toString( r.nextInt() );
- if( datanodeMap.get( newID ) != null )
+ while(newID == null) {
+ newID = "DS" + Integer.toString(r.nextInt());
+ if (datanodeMap.get(newID) != null)
newID = null;
}
return newID;
@@ -1743,20 +1745,20 @@
* @return true if block report is required or false otherwise.
* @throws IOException
*/
- public boolean gotHeartbeat( DatanodeID nodeID,
- long capacity,
- long remaining,
- int xceiverCount,
- int xmitsInProgress,
- Object[] xferResults,
- Object deleteList[]
- ) throws IOException {
+ public boolean gotHeartbeat(DatanodeID nodeID,
+ long capacity,
+ long remaining,
+ int xceiverCount,
+ int xmitsInProgress,
+ Object[] xferResults,
+ Object deleteList[]
+ ) throws IOException {
synchronized (heartbeats) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo;
try {
- nodeinfo = getDatanode( nodeID );
- if (nodeinfo == null ) {
+ nodeinfo = getDatanode(nodeID);
+ if (nodeinfo == null) {
return true;
}
} catch(UnregisteredDatanodeException e) {
@@ -1769,7 +1771,7 @@
throw new DisallowedDatanodeException(nodeinfo);
}
- if( !nodeinfo.isAlive ) {
+ if (!nodeinfo.isAlive) {
return true;
} else {
updateStats(nodeinfo, false);
@@ -1968,11 +1970,11 @@
* @param nodeID datanode ID
* @author hairong
*/
- synchronized public void removeDatanode( DatanodeID nodeID )
+ synchronized public void removeDatanode(DatanodeID nodeID)
throws IOException {
- DatanodeDescriptor nodeInfo = getDatanode( nodeID );
+ DatanodeDescriptor nodeInfo = getDatanode(nodeID);
if (nodeInfo != null) {
- removeDatanode( nodeInfo );
+ removeDatanode(nodeInfo);
} else {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
+ nodeInfo.getName() + " does not exist");
@@ -1984,21 +1986,21 @@
* @param nodeInfo datanode descriptor
* @author hairong
*/
- private void removeDatanode( DatanodeDescriptor nodeInfo ) {
+ private void removeDatanode(DatanodeDescriptor nodeInfo) {
if (nodeInfo.isAlive) {
updateStats(nodeInfo, false);
heartbeats.remove(nodeInfo);
nodeInfo.isAlive = false;
}
- for (Iterator<Block> it = nodeInfo.getBlockIterator(); it.hasNext(); ) {
+ for (Iterator<Block> it = nodeInfo.getBlockIterator(); it.hasNext();) {
removeStoredBlock(it.next(), nodeInfo);
}
unprotectedRemoveDatanode(nodeInfo);
clusterMap.remove(nodeInfo);
}
- void unprotectedRemoveDatanode( DatanodeDescriptor nodeDescr ) {
+ void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
// datanodeMap.remove(nodeDescr.getStorageID());
// deaddatanodeMap.put(nodeDescr.getName(), nodeDescr);
nodeDescr.resetBlocks();
@@ -2007,7 +2009,7 @@
+ nodeDescr.getName() + " is out of service now.");
}
- void unprotectedAddDatanode( DatanodeDescriptor nodeDescr ) {
+ void unprotectedAddDatanode(DatanodeDescriptor nodeDescr) {
/* To keep host2DataNodeMap consistent with datanodeMap,
remove from host2DataNodeMap the datanodeDescriptor removed
from datanodeMap before adding nodeDescr to host2DataNodeMap.
@@ -2018,7 +2020,7 @@
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.unprotectedAddDatanode: "
- + "node " + nodeDescr.getName() + " is added to datanodeMap." );
+ + "node " + nodeDescr.getName() + " is added to datanodeMap.");
}
@@ -2027,7 +2029,7 @@
*
* @param nodeID node
*/
- void wipeDatanode( DatanodeID nodeID ) throws IOException {
+ void wipeDatanode(DatanodeID nodeID) throws IOException {
String key = nodeID.getStorageID();
host2DataNodeMap.remove(datanodeMap.remove(key));
NameNode.stateChangeLog.debug(
@@ -2086,7 +2088,7 @@
}
}
}
- allAlive = ! foundDead;
+ allAlive = !foundDead;
}
}
@@ -2099,9 +2101,9 @@
) throws IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
- +"from "+nodeID.getName()+" "+newReport.length+" blocks" );
+ +"from "+nodeID.getName()+" "+newReport.length+" blocks");
}
- DatanodeDescriptor node = getDatanode( nodeID );
+ DatanodeDescriptor node = getDatanode(nodeID);
if (node == null) {
throw new IOException("ProcessReport from unregisterted node: "
+ nodeID.getName());
@@ -2152,14 +2154,14 @@
}
}
- for ( Iterator<Block> i = toRemove.iterator(); i.hasNext(); ) {
+ for (Iterator<Block> i = toRemove.iterator(); i.hasNext();) {
Block b = i.next();
- removeStoredBlock( b, node );
- node.removeBlock( b );
+ removeStoredBlock(b, node);
+ node.removeBlock(b);
}
- for ( Iterator<Block> i = toAdd.iterator(); i.hasNext(); ) {
+ for (Iterator<Block> i = toAdd.iterator(); i.hasNext();) {
Block b = i.next();
- node.addBlock( addStoredBlock(b, node) );
+ node.addBlock(addStoredBlock(b, node));
}
//
@@ -2175,7 +2177,7 @@
// should only be invoked infrequently.
//
Collection<Block> obsolete = new ArrayList<Block>();
- for (Iterator<Block> it = node.getBlockIterator(); it.hasNext(); ) {
+ for (Iterator<Block> it = node.getBlockIterator(); it.hasNext();) {
Block b = it.next();
//
@@ -2184,14 +2186,14 @@
// they are added to recentInvalidateSets and will be sent out
// thorugh succeeding heartbeat responses.
//
- if (! isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
+ if (!isValidBlock(b) && !pendingCreateBlocks.contains(b)) {
if (obsolete.size() > FSConstants.BLOCK_INVALIDATE_CHUNK) {
addToInvalidates(b, node);
} else {
obsolete.add(b);
}
NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
- +"ask "+nodeID.getName()+" to delete "+b.getBlockName() );
+ +"ask "+nodeID.getName()+" to delete "+b.getBlockName());
}
}
return (Block[]) obsolete.toArray(new Block[obsolete.size()]);
@@ -2204,22 +2206,22 @@
*/
synchronized Block addStoredBlock(Block block, DatanodeDescriptor node) {
- FSDirectory.INode fileINode = blocksMap.getINode( block );
+ FSDirectory.INode fileINode = blocksMap.getINode(block);
int replication = (fileINode != null) ? fileINode.getReplication() :
defaultReplication;
- boolean added = blocksMap.addNode( block, node, replication );
+ boolean added = blocksMap.addNode(block, node, replication);
- Block storedBlock = blocksMap.getStoredBlock( block ); //extra look up!
- if ( storedBlock != null && block != storedBlock ) {
- if ( block.getNumBytes() > 0 ) {
- storedBlock.setNumBytes( block.getNumBytes() );
+ Block storedBlock = blocksMap.getStoredBlock(block); //extra look up!
+ if (storedBlock != null && block != storedBlock) {
+ if (block.getNumBytes() > 0) {
+ storedBlock.setNumBytes(block.getNumBytes());
}
block = storedBlock;
}
int curReplicaDelta = 0;
- if ( added ) {
+ if (added) {
curReplicaDelta = 1;
//
// Hairong: I would prefer to set the level of next logrecord
@@ -2228,9 +2230,9 @@
// they simply take up all the space in the log file
// So I set the level to be trace
//
- if ( NameNode.stateChangeLog.isTraceEnabled() ) {
+ if (NameNode.stateChangeLog.isTraceEnabled()) {
NameNode.stateChangeLog.trace("BLOCK* NameSystem.addStoredBlock: "
- +"blockMap updated: "+node.getName()+" is added to "+block.getBlockName() );
+ +"blockMap updated: "+node.getName()+" is added to "+block.getBlockName());
}
} else {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
@@ -2238,16 +2240,16 @@
+ block.getBlockName() + " on " + node.getName());
}
- if( fileINode == null ) // block does not belong to any file
+ if (fileINode == null) // block does not belong to any file
return block;
// filter out containingNodes that are marked for decommission.
- int numCurrentReplica = countContainingNodes( block )
+ int numCurrentReplica = countContainingNodes(block)
+ pendingReplications.getNumReplicas(block);
// check whether safe replication is reached for the block
// only if it is a part of a files
- incrementSafeBlockCount( numCurrentReplica );
+ incrementSafeBlockCount(numCurrentReplica);
// handle underReplication/overReplication
short fileReplication = fileINode.getReplication();
@@ -2256,8 +2258,8 @@
} else {
neededReplications.update(block, curReplicaDelta, 0);
}
- if ( numCurrentReplica > fileReplication ) {
- proccessOverReplicatedBlock( block, fileReplication );
+ if (numCurrentReplica > fileReplication) {
+ proccessOverReplicatedBlock(block, fileReplication);
}
return block;
}
@@ -2267,13 +2269,13 @@
* If there are any extras, call chooseExcessReplicates() to
* mark them in the excessReplicateMap.
*/
- private void proccessOverReplicatedBlock( Block block, short replication ) {
+ private void proccessOverReplicatedBlock(Block block, short replication) {
Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
- for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator( block );
- it.hasNext(); ) {
+ for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
+ it.hasNext();) {
DatanodeDescriptor cur = it.next();
Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
- if (excessBlocks == null || ! excessBlocks.contains(block)) {
+ if (excessBlocks == null || !excessBlocks.contains(block)) {
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
nonExcess.add(cur);
}
@@ -2302,7 +2304,7 @@
DatanodeInfo node = iter.next();
long free = node.getRemaining();
- if(minSpace > free) {
+ if (minSpace > free) {
minSpace = free;
cur = node;
}
@@ -2317,7 +2319,7 @@
}
excessBlocks.add(b);
NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: "
- +"("+cur.getName()+", "+b.getBlockName()+") is added to excessReplicateMap" );
+ +"("+cur.getName()+", "+b.getBlockName()+") is added to excessReplicateMap");
//
// The 'excessblocks' tracks blocks until we get confirmation
@@ -2335,7 +2337,7 @@
}
invalidateSet.add(b);
NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: "
- +"("+cur.getName()+", "+b.getBlockName()+") is added to recentInvalidateSets" );
+ +"("+cur.getName()+", "+b.getBlockName()+") is added to recentInvalidateSets");
}
}
@@ -2345,22 +2347,22 @@
*/
synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
- +block.getBlockName() + " from "+node.getName() );
- if ( !blocksMap.removeNode( block, node ) ) {
+ +block.getBlockName() + " from "+node.getName());
+ if (!blocksMap.removeNode(block, node)) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
- +block.getBlockName()+" has already been removed from node "+node );
+ +block.getBlockName()+" has already been removed from node "+node);
return;
}
- decrementSafeBlockCount( block );
+ decrementSafeBlockCount(block);
//
// It's possible that the block was removed because of a datanode
// failure. If the block is still valid, check if replication is
// necessary. In that case, put block on a possibly-will-
// be-replicated list.
//
- FSDirectory.INode fileINode = blocksMap.getINode( block );
- if( fileINode != null ) {
+ FSDirectory.INode fileINode = blocksMap.getINode(block);
+ if (fileINode != null) {
neededReplications.update(block, -1, 0);
}
@@ -2372,7 +2374,7 @@
if (excessBlocks != null) {
excessBlocks.remove(block);
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
- +block.getBlockName()+" is removed from excessBlocks" );
+ +block.getBlockName()+" is removed from excessBlocks");
if (excessBlocks.size() == 0) {
excessReplicateMap.remove(node.getStorageID());
}
@@ -2382,22 +2384,22 @@
/**
* The given node is reporting that it received a certain block.
*/
- public synchronized void blockReceived( DatanodeID nodeID,
- Block block
- ) throws IOException {
- DatanodeDescriptor node = getDatanode( nodeID );
+ public synchronized void blockReceived(DatanodeID nodeID,
+ Block block
+ ) throws IOException {
+ DatanodeDescriptor node = getDatanode(nodeID);
if (node == null) {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
+ block.getBlockName() + " is received from an unrecorded node "
- + nodeID.getName() );
+ + nodeID.getName());
throw new IllegalArgumentException(
"Unexpected exception. Got blockReceived message from node "
+ block.getBlockName() + ", but there is no info for it");
}
- if ( NameNode.stateChangeLog.isDebugEnabled() ) {
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
- +block.getBlockName()+" is received from " + nodeID.getName() );
+ +block.getBlockName()+" is received from " + nodeID.getName());
}
// Check if this datanode should actually be shutdown instead.
@@ -2409,7 +2411,7 @@
//
// Modify the blocks->datanode map and node's map.
//
- node.addBlock( addStoredBlock(block, node) );
+ node.addBlock(addStoredBlock(block, node));
pendingReplications.remove(block);
}
@@ -2446,23 +2448,23 @@
synchronized (datanodeMap) {
results = new DatanodeInfo[datanodeMap.size()];
int i = 0;
- for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext(); )
- results[i++] = new DatanodeInfo( it.next() );
+ for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();)
+ results[i++] = new DatanodeInfo(it.next());
}
return results;
}
/**
*/
- public synchronized void DFSNodesStatus( ArrayList<DatanodeDescriptor> live,
- ArrayList<DatanodeDescriptor> dead ) {
+ public synchronized void DFSNodesStatus(ArrayList<DatanodeDescriptor> live,
+ ArrayList<DatanodeDescriptor> dead) {
synchronized (datanodeMap) {
- for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext(); ) {
+ for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
- if( isDatanodeDead(node))
- dead.add( node );
+ if (isDatanodeDead(node))
+ dead.add(node);
else
- live.add( node );
+ live.add(node);
}
}
}
@@ -2473,7 +2475,7 @@
private synchronized void datanodeDump(PrintWriter out) {
synchronized (datanodeMap) {
out.println("Metasave: Number of datanodes: " + datanodeMap.size());
- for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext(); ) {
+ for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
out.println(node.dumpDatanode());
}
@@ -2521,7 +2523,7 @@
for (int i = 0; i < nodes.length; i++) {
boolean found = false;
for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- it.hasNext(); ) {
+ it.hasNext();) {
DatanodeDescriptor node = it.next();
//
@@ -2583,14 +2585,14 @@
/**
* Check if there are any recently-deleted blocks a datanode should remove.
*/
- public synchronized Block[] blocksToInvalidate( DatanodeID nodeID ) {
+ public synchronized Block[] blocksToInvalidate(DatanodeID nodeID) {
// Ask datanodes to perform block delete
// only if safe mode is off.
- if( isInSafeMode() )
+ if (isInSafeMode())
return null;
- Collection<Block> invalidateSet = recentInvalidateSets.remove(
- nodeID.getStorageID() );
+ Collection<Block> invalidateSet = recentInvalidateSets.remove(
+ nodeID.getStorageID());
if (invalidateSet == null) {
return null;
@@ -2633,7 +2635,7 @@
blockList.append(block.getBlockName());
}
NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockToInvalidate: "
- +"ask "+nodeID.getName()+" to delete " + blockList );
+ +"ask "+nodeID.getName()+" to delete " + blockList);
}
return sendBlock.toArray(new Block[sendBlock.size()]);
}
@@ -2644,7 +2646,7 @@
*/
private int countContainingNodes(Iterator<DatanodeDescriptor> nodeIter) {
int count = 0;
- while ( nodeIter.hasNext() ) {
+ while (nodeIter.hasNext()) {
DatanodeDescriptor node = nodeIter.next();
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
count++;
@@ -2653,17 +2655,17 @@
return count;
}
- /** wrapper for countContainingNodes( Iterator ). */
- private int countContainingNodes( Block b ) {
- return countContainingNodes( blocksMap.nodeIterator( b ) );
+ /** wrapper for countContainingNodes(Iterator). */
+ private int countContainingNodes(Block b) {
+ return countContainingNodes(blocksMap.nodeIterator(b));
}
/** Reeturns a newly allocated list exluding the decommisioned nodes. */
- ArrayList<DatanodeDescriptor> containingNodeList( Block b ) {
+ ArrayList<DatanodeDescriptor> containingNodeList(Block b) {
ArrayList<DatanodeDescriptor> nonCommissionedNodeList =
new ArrayList<DatanodeDescriptor>();
- for( Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator( b );
- it.hasNext(); ) {
+ for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
+ it.hasNext();) {
DatanodeDescriptor node = it.next();
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
nonCommissionedNodeList.add(node);
@@ -2679,9 +2681,9 @@
Block decommissionBlocks[] = srcNode.getBlocks();
for (int i = 0; i < decommissionBlocks.length; i++) {
Block block = decommissionBlocks[i];
- FSDirectory.INode fileINode = blocksMap.getINode( block );
- if ( fileINode != null &&
- fileINode.getReplication() > countContainingNodes(block) ) {
+ FSDirectory.INode fileINode = blocksMap.getINode(block);
+ if (fileINode != null &&
+ fileINode.getReplication() > countContainingNodes(block)) {
return true;
}
}
@@ -2735,7 +2737,7 @@
int needed) {
// Ask datanodes to perform block replication
// only if safe mode is off.
- if( isInSafeMode() )
+ if (isInSafeMode())
return null;
synchronized (neededReplications) {
@@ -2757,27 +2759,27 @@
}
Block block = it.next();
long blockSize = block.getNumBytes();
- FSDirectory.INode fileINode = blocksMap.getINode( block );
+ FSDirectory.INode fileINode = blocksMap.getINode(block);
if (fileINode == null) { // block does not belong to any file
it.remove();
} else {
List<DatanodeDescriptor> containingNodes =
containingNodeList(block);
- Collection<Block> excessBlocks = excessReplicateMap.get(
- srcNode.getStorageID() );
+ Collection<Block> excessBlocks = excessReplicateMap.get(
+ srcNode.getStorageID());
// srcNode must contain the block, and the block must
// not be scheduled for removal on that node
if (containingNodes.contains(srcNode)
- && (excessBlocks == null || ! excessBlocks.contains(block))) {
+ && (excessBlocks == null || !excessBlocks.contains(block))) {
int numCurrentReplica = containingNodes.size() +
pendingReplications.getNumReplicas(block);
if (numCurrentReplica >= fileINode.getReplication()) {
it.remove();
} else {
DatanodeDescriptor targets[] = replicator.chooseTarget(
- Math.min( fileINode.getReplication() - numCurrentReplica,
- needed),
+ Math.min(fileINode.getReplication() - numCurrentReplica,
+ needed),
datanodeMap.get(srcNode.getStorageID()),
containingNodes, null, blockSize);
if (targets.length > 0) {
@@ -2828,7 +2830,7 @@
+ block.getBlockName() + " to " + targetList);
NameNode.stateChangeLog.debug(
"BLOCK* neededReplications = " + neededReplications.size()
- + " pendingReplications = " + pendingReplications.size() );
+ + " pendingReplications = " + pendingReplications.size());
}
}
@@ -2863,13 +2865,13 @@
class ReplicationTargetChooser {
final boolean considerLoad;
- ReplicationTargetChooser( boolean considerLoad ) {
+ ReplicationTargetChooser(boolean considerLoad) {
this.considerLoad = considerLoad;
}
private class NotEnoughReplicasException extends Exception {
- NotEnoughReplicasException( String msg ) {
- super( msg );
+ NotEnoughReplicasException(String msg) {
+ super(msg);
}
}
@@ -2888,8 +2890,8 @@
DatanodeDescriptor[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer,
List<DatanodeDescriptor> excludedNodes,
- long blocksize ) {
- if( excludedNodes == null) {
+ long blocksize) {
+ if (excludedNodes == null) {
excludedNodes = new ArrayList<DatanodeDescriptor>();
}
@@ -2914,18 +2916,18 @@
DatanodeDescriptor writer,
List<DatanodeDescriptor> choosenNodes,
List<DatanodeDescriptor> excludedNodes,
- long blocksize ) {
- if( numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0 ) {
+ long blocksize) {
+ if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return new DatanodeDescriptor[0];
}
- if( excludedNodes == null) {
+ if (excludedNodes == null) {
excludedNodes = new ArrayList<DatanodeDescriptor>();
}
int clusterSize = clusterMap.getNumOfLeaves();
int totalNumOfReplicas = choosenNodes.size()+numOfReplicas;
- if( totalNumOfReplicas > clusterSize) {
+ if (totalNumOfReplicas > clusterSize) {
numOfReplicas -= (totalNumOfReplicas-clusterSize);
totalNumOfReplicas = clusterSize;
}
@@ -2937,11 +2939,11 @@
new ArrayList<DatanodeDescriptor>(choosenNodes);
excludedNodes.addAll(choosenNodes);
- if(!clusterMap.contains(writer))
+ if (!clusterMap.contains(writer))
writer=null;
DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
- excludedNodes, blocksize, maxNodesPerRack, results );
+ excludedNodes, blocksize, maxNodesPerRack, results);
results.removeAll(choosenNodes);
@@ -2958,41 +2960,41 @@
int maxNodesPerRack,
List<DatanodeDescriptor> results) {
- if( numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0 ) {
+ if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return writer;
}
int numOfResults = results.size();
- if(writer == null && (numOfResults==1 || numOfResults==2) ) {
+ if (writer == null && (numOfResults==1 || numOfResults==2)) {
writer = results.get(0);
}
try {
- switch( numOfResults ) {
+ switch(numOfResults) {
case 0:
writer = chooseLocalNode(writer, excludedNodes,
blocksize, maxNodesPerRack, results);
- if(--numOfReplicas == 0) break;
+ if (--numOfReplicas == 0) break;
case 1:
chooseRemoteRack(1, writer, excludedNodes,
blocksize, maxNodesPerRack, results);
- if(--numOfReplicas == 0) break;
+ if (--numOfReplicas == 0) break;
case 2:
- if(clusterMap.isOnSameRack(results.get(0), results.get(1))) {
+ if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
chooseRemoteRack(1, writer, excludedNodes,
blocksize, maxNodesPerRack, results);
} else {
chooseLocalRack(writer, excludedNodes,
blocksize, maxNodesPerRack, results);
}
- if(--numOfReplicas == 0) break;
+ if (--numOfReplicas == 0) break;
default:
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
}
} catch (NotEnoughReplicasException e) {
LOG.warn("Not able to place enough replicas, still in need of "
- + numOfReplicas );
+ + numOfReplicas);
}
return writer;
}
@@ -3010,14 +3012,14 @@
List<DatanodeDescriptor> results)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
- if(localMachine == null)
+ if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
// otherwise try local machine first
- if(!excludedNodes.contains(localMachine)) {
+ if (!excludedNodes.contains(localMachine)) {
excludedNodes.add(localMachine);
- if( isGoodTarget(localMachine, blocksize,
+ if (isGoodTarget(localMachine, blocksize,
maxNodesPerRack, false, results)) {
results.add(localMachine);
return localMachine;
@@ -3044,9 +3046,9 @@
List<DatanodeDescriptor> results)
throws NotEnoughReplicasException {
// no local machine, so choose a random machine
- if( localMachine == null ) {
+ if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
- blocksize, maxNodesPerRack, results );
+ blocksize, maxNodesPerRack, results);
}
// choose one from the local rack
@@ -3060,17 +3062,17 @@
for(Iterator<DatanodeDescriptor> iter=results.iterator();
iter.hasNext();) {
DatanodeDescriptor nextNode = iter.next();
- if(nextNode != localMachine) {
+ if (nextNode != localMachine) {
newLocal = nextNode;
break;
}
}
- if( newLocal != null ) {
+ if (newLocal != null) {
try {
return chooseRandom(
newLocal.getNetworkLocation(),
excludedNodes, blocksize, maxNodesPerRack, results);
- } catch( NotEnoughReplicasException e2 ) {
+ } catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results);
@@ -3089,22 +3091,22 @@
* from the local rack
*/
- private void chooseRemoteRack( int numOfReplicas,
- DatanodeDescriptor localMachine,
- List<DatanodeDescriptor> excludedNodes,
- long blocksize,
- int maxReplicasPerRack,
- List<DatanodeDescriptor> results)
+ private void chooseRemoteRack(int numOfReplicas,
+ DatanodeDescriptor localMachine,
+ List<DatanodeDescriptor> excludedNodes,
+ long blocksize,
+ int maxReplicasPerRack,
+ List<DatanodeDescriptor> results)
throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
- chooseRandom( numOfReplicas, "~"+localMachine.getNetworkLocation(),
- excludedNodes, blocksize, maxReplicasPerRack, results );
+ chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
+ excludedNodes, blocksize, maxReplicasPerRack, results);
} catch (NotEnoughReplicasException e) {
- chooseRandom( numOfReplicas-(results.size()-oldNumOfReplicas),
- localMachine.getNetworkLocation(), excludedNodes, blocksize,
- maxReplicasPerRack, results);
+ chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
+ localMachine.getNetworkLocation(), excludedNodes, blocksize,
+ maxReplicasPerRack, results);
}
}
@@ -3122,12 +3124,12 @@
do {
DatanodeDescriptor[] selectedNodes =
chooseRandom(1, nodes, excludedNodes);
- if(selectedNodes.length == 0 ) {
- throw new NotEnoughReplicasException(
- "Not able to place enough replicas" );
+ if (selectedNodes.length == 0) {
+ throw new NotEnoughReplicasException(
+ "Not able to place enough replicas");
}
result = (DatanodeDescriptor)(selectedNodes[0]);
- } while( !isGoodTarget( result, blocksize, maxNodesPerRack, results));
+ } while(!isGoodTarget(result, blocksize, maxNodesPerRack, results));
results.add(result);
return result;
}
@@ -3145,20 +3147,20 @@
do {
DatanodeDescriptor[] selectedNodes =
chooseRandom(numOfReplicas, nodes, excludedNodes);
- if(selectedNodes.length < numOfReplicas) {
+ if (selectedNodes.length < numOfReplicas) {
toContinue = false;
}
for(int i=0; i<selectedNodes.length; i++) {
DatanodeDescriptor result = (DatanodeDescriptor)(selectedNodes[i]);
- if( isGoodTarget( result, blocksize, maxNodesPerRack, results)) {
+ if (isGoodTarget(result, blocksize, maxNodesPerRack, results)) {
numOfReplicas--;
results.add(result);
}
} // end of for
- } while (numOfReplicas>0 && toContinue );
+ } while (numOfReplicas>0 && toContinue);
- if(numOfReplicas>0) {
- throw new NotEnoughReplicasException(
+ if (numOfReplicas>0) {
+ throw new NotEnoughReplicasException(
"Not able to place enough replicas");
}
}
@@ -3175,10 +3177,10 @@
clusterMap.countNumOfAvailableNodes(nodes, excludedNodes);
numOfReplicas = (numOfAvailableNodes<numOfReplicas)?
numOfAvailableNodes:numOfReplicas;
- while( numOfReplicas > 0 ) {
+ while(numOfReplicas > 0) {
DatanodeDescriptor choosenNode = clusterMap.chooseRandom(nodes);
- if(!excludedNodes.contains(choosenNode)) {
- results.add( choosenNode );
+ if (!excludedNodes.contains(choosenNode)) {
+ results.add(choosenNode);
excludedNodes.add(choosenNode);
numOfReplicas--;
}
@@ -3191,40 +3193,40 @@
* return true if <i>node</i> has enough space,
* does not have too much load, and the rack does not have too many nodes
*/
- private boolean isGoodTarget( DatanodeDescriptor node,
- long blockSize, int maxTargetPerLoc,
- List<DatanodeDescriptor> results) {
+ private boolean isGoodTarget(DatanodeDescriptor node,
+ long blockSize, int maxTargetPerLoc,
+ List<DatanodeDescriptor> results) {
return isGoodTarget(node, blockSize, maxTargetPerLoc,
this.considerLoad, results);
}
- private boolean isGoodTarget( DatanodeDescriptor node,
- long blockSize, int maxTargetPerLoc,
- boolean considerLoad,
- List<DatanodeDescriptor> results) {
+ private boolean isGoodTarget(DatanodeDescriptor node,
+ long blockSize, int maxTargetPerLoc,
+ boolean considerLoad,
+ List<DatanodeDescriptor> results) {
// check if the node is (being) decommissed
- if(node.isDecommissionInProgress() || node.isDecommissioned()) {
+ if (node.isDecommissionInProgress() || node.isDecommissioned()) {
LOG.debug("Node "+node.getPath()+
" is not chosen because the node is (being) decommissioned");
return false;
}
// check the remaining capacity of the target machine
- if(blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>node.getRemaining() ) {
+ if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>node.getRemaining()) {
LOG.debug("Node "+node.getPath()+
" is not chosen because the node does not have enough space");
return false;
}
// check the communication traffic of the target machine
- if(considerLoad) {
+ if (considerLoad) {
double avgLoad = 0;
int size = clusterMap.getNumOfLeaves();
- if( size != 0 ) {
+ if (size != 0) {
avgLoad = (double)totalLoad()/size;
}
- if(node.getXceiverCount() > (2.0 * avgLoad)) {
+ if (node.getXceiverCount() > (2.0 * avgLoad)) {
LOG.debug("Node "+node.getPath()+
" is not chosen because the node is too busy");
return false;
@@ -3234,14 +3236,14 @@
// check if the target rack has chosen too many nodes
String rackname = node.getNetworkLocation();
int counter=1;
- for( Iterator<DatanodeDescriptor> iter = results.iterator();
- iter.hasNext(); ) {
+ for(Iterator<DatanodeDescriptor> iter = results.iterator();
+ iter.hasNext();) {
DatanodeDescriptor result = iter.next();
- if(rackname.equals(result.getNetworkLocation())) {
+ if (rackname.equals(result.getNetworkLocation())) {
counter++;
}
}
- if(counter>maxTargetPerLoc) {
+ if (counter>maxTargetPerLoc) {
LOG.debug("Node "+node.getPath()+
" is not chosen because the rack has too many chosen nodes");
return false;
@@ -3256,29 +3258,29 @@
*/
private DatanodeDescriptor[] getPipeline(
DatanodeDescriptor writer,
- DatanodeDescriptor[] nodes ) {
- if( nodes.length==0 ) return nodes;
+ DatanodeDescriptor[] nodes) {
+ if (nodes.length==0) return nodes;
- synchronized( clusterMap ) {
+ synchronized(clusterMap) {
int index=0;
- if(writer == null || !clusterMap.contains(writer)) {
+ if (writer == null || !clusterMap.contains(writer)) {
writer = nodes[0];
}
- for( ;index<nodes.length; index++ ) {
+ for(;index<nodes.length; index++) {
DatanodeDescriptor shortestNode = null;
int shortestDistance = Integer.MAX_VALUE;
int shortestIndex = index;
- for( int i=index; i<nodes.length; i++ ) {
+ for(int i=index; i<nodes.length; i++) {
DatanodeDescriptor currentNode = nodes[i];
- int currentDistance = clusterMap.getDistance( writer, currentNode );
- if(shortestDistance>currentDistance ) {
+ int currentDistance = clusterMap.getDistance(writer, currentNode);
+ if (shortestDistance>currentDistance) {
shortestDistance = currentDistance;
shortestNode = currentNode;
shortestIndex = i;
}
}
//switch position index & shortestIndex
- if( index != shortestIndex ) {
+ if (index != shortestIndex) {
nodes[shortestIndex] = nodes[index];
nodes[index] = shortestNode;
}
@@ -3321,7 +3323,7 @@
hostsReader.refresh();
synchronized (this) {
for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- it.hasNext(); ) {
+ it.hasNext();) {
DatanodeDescriptor node = it.next();
// Check if not include.
if (!inHostsList(node)) {
@@ -3384,7 +3386,7 @@
*/
public synchronized void decommissionedDatanodeCheck() {
for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- it.hasNext(); ) {
+ it.hasNext();) {
DatanodeDescriptor node = it.next();
checkDecommissionStateInternal(node);
}
@@ -3483,15 +3485,15 @@
* @return DatanodeDescriptor or null if the node is not found.
* @throws IOException
*/
- public DatanodeDescriptor getDatanode( DatanodeID nodeID ) throws IOException {
+ public DatanodeDescriptor getDatanode(DatanodeID nodeID) throws IOException {
UnregisteredDatanodeException e = null;
DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID());
if (node == null)
return null;
if (!node.getName().equals(nodeID.getName())) {
- e = new UnregisteredDatanodeException( nodeID, node );
+ e = new UnregisteredDatanodeException(nodeID, node);
NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: "
- + e.getLocalizedMessage() );
+ + e.getLocalizedMessage());
throw e;
}
return node;
@@ -3504,13 +3506,13 @@
/** Check if node is already in the map */
synchronized boolean contains(DatanodeDescriptor node) {
- if( node==null ) return false;
+ if (node==null) return false;
String host = node.getHost();
DatanodeDescriptor[] nodes = map.get(host);
- if( nodes != null ) {
+ if (nodes != null) {
for(DatanodeDescriptor containedNode:nodes) {
- if(node==containedNode)
+ if (node==containedNode)
return true;
}
}
@@ -3521,12 +3523,12 @@
* return true if the node is added; false otherwise
*/
synchronized boolean add(DatanodeDescriptor node) {
- if(node==null || contains(node)) return false;
+ if (node==null || contains(node)) return false;
String host = node.getHost();
DatanodeDescriptor[] nodes = map.get(host);
DatanodeDescriptor[] newNodes;
- if(nodes==null) {
+ if (nodes==null) {
newNodes = new DatanodeDescriptor[1];
newNodes[0]=node;
} else { // rare case: more than one datanode on the host
@@ -3542,15 +3544,15 @@
* return true if the node is removed; false otherwise
*/
synchronized boolean remove(DatanodeDescriptor node) {
- if(node==null) return false;
+ if (node==null) return false;
String host = node.getHost();
DatanodeDescriptor[] nodes = map.get(host);
- if(nodes==null) {
+ if (nodes==null) {
return false;
}
- if( nodes.length==1 ) {
- if( nodes[0]==node ) {
+ if (nodes.length==1) {
+ if (nodes[0]==node) {
map.remove(host);
return true;
} else {
@@ -3560,11 +3562,11 @@
//rare case
int i=0;
for(; i<nodes.length; i++) {
- if(nodes[i]==node) {
+ if (nodes[i]==node) {
break;
}
}
- if( i==nodes.length ) {
+ if (i==nodes.length) {
return false;
} else {
DatanodeDescriptor[] newNodes;
@@ -3579,12 +3581,12 @@
/** get a data node by its host
* @return DatanodeDescriptor if found; otherwise null
*/
- synchronized DatanodeDescriptor getDatanodeByHost( String host ) {
- if(host==null) return null;
+ synchronized DatanodeDescriptor getDatanodeByHost(String host) {
+ if (host==null) return null;
DatanodeDescriptor[] nodes = map.get(host);
// no entry
- if( nodes== null ) {
+ if (nodes== null) {
return null;
}
// one node
@@ -3600,8 +3602,8 @@
*
* @return DatanodeDescriptor if found or null otherwise
*/
- public DatanodeDescriptor getDatanodeByName( String name ) {
- if(name==null) return null;
+ public DatanodeDescriptor getDatanodeByName(String name) {
+ if (name==null) return null;
int colon = name.indexOf(":");
String host;
@@ -3613,11 +3615,11 @@
DatanodeDescriptor[] nodes = map.get(host);
// no entry
- if( nodes== null ) {
+ if (nodes== null) {
return null;
}
for(DatanodeDescriptor containedNode:nodes) {
- if(name.equals(containedNode.getName())) {
+ if (name.equals(containedNode.getName())) {
return containedNode;
}
}
@@ -3628,7 +3630,7 @@
private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
/** Stop at and return the datanode at index (used for content browsing)*/
- private DatanodeDescriptor getDatanodeByIndex( int index ) {
+ private DatanodeDescriptor getDatanodeByIndex(int index) {
int i = 0;
for (DatanodeDescriptor node : datanodeMap.values()) {
if (i == index) {
@@ -3710,10 +3712,10 @@
*
* @param conf configuration
*/
- SafeModeInfo( Configuration conf ) {
- this.threshold = conf.getFloat( "dfs.safemode.threshold.pct", 0.95f );
- this.extension = conf.getInt( "dfs.safemode.extension", 0 );
- this.safeReplication = conf.getInt( "dfs.replication.min", 1 );
+ SafeModeInfo(Configuration conf) {
+ this.threshold = conf.getFloat("dfs.safemode.threshold.pct", 0.95f);
+ this.extension = conf.getInt("dfs.safemode.extension", 0);
+ this.safeReplication = conf.getInt("dfs.replication.min", 1);
this.blockTotal = 0;
this.blockSafe = 0;
}
@@ -3745,8 +3747,8 @@
assert isConsistent() : " SafeMode: Inconsistent filesystem state: "
+ "Total num of blocks, active blocks, or "
+ "total safe blocks don't match.";
- } catch( IOException e ) {
- System.err.print( StringUtils.stringifyException( e ));
+ } catch(IOException e) {
+ System.err.print(StringUtils.stringifyException(e));
}
return this.reached >= 0;
}
@@ -3755,10 +3757,10 @@
* Enter safe mode.
*/
void enter() {
- if( reached != 0 )
+ if (reached != 0)
NameNode.stateChangeLog.info(
"STATE* SafeModeInfo.enter: " + "Safe mode is ON.\n"
- + getTurnOffTip() );
+ + getTurnOffTip());
this.reached = 0;
}
@@ -3766,16 +3768,16 @@
* Leave safe mode.
*/
synchronized void leave() {
- if( reached >= 0 )
+ if (reached >= 0)
NameNode.stateChangeLog.info(
- "STATE* SafeModeInfo.leave: " + "Safe mode is OFF." );
+ "STATE* SafeModeInfo.leave: " + "Safe mode is OFF.");
reached = -1;
safeMode = null;
NameNode.stateChangeLog.info("STATE* Network topology has "
+clusterMap.getNumOfRacks()+" racks and "
+clusterMap.getNumOfLeaves()+ " datanodes");
NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
- +neededReplications.size()+" blocks" );
+ +neededReplications.size()+" blocks");
}
/**
@@ -3785,11 +3787,11 @@
* @return true if can leave or false otherwise.
*/
synchronized boolean canLeave() {
- if( reached == 0 )
+ if (reached == 0)
return false;
- if( now() - reached < extension )
+ if (now() - reached < extension)
return false;
- return ! needEnter();
+ return !needEnter();
}
/**
@@ -3805,24 +3807,24 @@
* to be compared with the threshold.
*/
private float getSafeBlockRatio() {
- return ( blockTotal == 0 ? 1 : (float)blockSafe/blockTotal );
+ return (blockTotal == 0 ? 1 : (float)blockSafe/blockTotal);
}
/**
* Check and trigger safe mode if needed.
*/
private void checkMode() {
- if( needEnter() ) {
+ if (needEnter()) {
enter();
return;
}
// the threshold is reached
- if( ! isOn() || // safe mode is off
- extension <= 0 || threshold <= 0 ) { // don't need to wait
+ if (!isOn() || // safe mode is off
+ extension <= 0 || threshold <= 0) { // don't need to wait
this.leave(); // just leave safe mode
return;
}
- if( reached > 0 ) // threshold has already been reached before
+ if (reached > 0) // threshold has already been reached before
return;
// start monitor
reached = now();
@@ -3833,7 +3835,7 @@
/**
* Set total number of blocks.
*/
- synchronized void setBlockTotal( int total) {
+ synchronized void setBlockTotal(int total) {
this.blockTotal = total;
checkMode();
}
@@ -3843,8 +3845,8 @@
* reached minimal replication.
* @param replication current replication
*/
- synchronized void incrementSafeBlockCount( short replication ) {
[... 351 lines stripped ...]