You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC
svn commit: r1619012 [11/35] - in
/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Aug 19 23:49:39 2014
@@ -261,13 +261,19 @@ public class BlockManager {
this.namesystem = namesystem;
datanodeManager = new DatanodeManager(this, namesystem, conf);
heartbeatManager = datanodeManager.getHeartbeatManager();
- invalidateBlocks = new InvalidateBlocks(datanodeManager);
+
+ final long pendingPeriod = conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L;
+ invalidateBlocks = new InvalidateBlocks(
+ datanodeManager.blockInvalidateLimit, pendingPeriod);
// Compute the map capacity by allocating 2% of total memory
blocksMap = new BlocksMap(
LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
blockplacement = BlockPlacementPolicy.getInstance(
- conf, stats, datanodeManager.getNetworkTopology());
+ conf, stats, datanodeManager.getNetworkTopology(),
+ datanodeManager.getHost2DatanodeMap());
pendingReplications = new PendingReplicationBlocks(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
@@ -423,10 +429,8 @@ public class BlockManager {
public void close() {
try {
- if (replicationThread != null) {
- replicationThread.interrupt();
- replicationThread.join(3000);
- }
+ replicationThread.interrupt();
+ replicationThread.join(3000);
} catch (InterruptedException ie) {
}
datanodeManager.close();
@@ -549,7 +553,6 @@ public class BlockManager {
}
/**
- * @param block
* @return true if the block has minimum replicas
*/
public boolean checkMinReplication(Block block) {
@@ -699,7 +702,7 @@ public class BlockManager {
// remove this block from the list of pending blocks to be deleted.
for (DatanodeStorageInfo storage : targets) {
- invalidateBlocks.remove(storage.getStorageID(), oldBlock);
+ invalidateBlocks.remove(storage.getDatanodeDescriptor(), oldBlock);
}
// Adjust safe-mode totals, since under-construction blocks don't
@@ -722,9 +725,8 @@ public class BlockManager {
final List<DatanodeStorageInfo> locations
= new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
- final String storageID = storage.getStorageID();
// filter invalidate replicas
- if(!invalidateBlocks.contains(storageID, block)) {
+ if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
locations.add(storage);
}
}
@@ -819,7 +821,7 @@ public class BlockManager {
for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
final DatanodeDescriptor d = storage.getDatanodeDescriptor();
final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
- if (isCorrupt || (!isCorrupt && !replicaCorrupt))
+ if (isCorrupt || (!replicaCorrupt))
machines[j++] = storage;
}
}
@@ -943,6 +945,16 @@ public class BlockManager {
}
/**
+ * Check if a block is replicated to at least the minimum replication.
+ */
+ public boolean isSufficientlyReplicated(BlockInfo b) {
+ // Compare against the lesser of the minReplication and number of live DNs.
+ final int replication =
+ Math.min(minReplication, getDatanodeManager().getNumLiveDataNodes());
+ return countNodes(b).liveReplicas() >= replication;
+ }
+
+ /**
* return a list of blocks & their locations on <code>datanode</code> whose
* total size is <code>size</code>
*
@@ -1010,9 +1022,11 @@ public class BlockManager {
while(it.hasNext()) {
removeStoredBlock(it.next(), node);
}
+ // Remove all pending DN messages referencing this DN.
+ pendingDNMessages.removeAllMessagesForDatanode(node);
node.resetBlocks();
- invalidateBlocks.remove(node.getDatanodeUuid());
+ invalidateBlocks.remove(node);
// If the DN hasn't block-reported since the most recent
// failover, then we may have been holding up on processing
@@ -1035,6 +1049,9 @@ public class BlockManager {
* datanode and log the operation
*/
void addToInvalidates(final Block block, final DatanodeInfo datanode) {
+ if (!namesystem.isPopulatingReplQueues()) {
+ return;
+ }
invalidateBlocks.add(block, datanode, true);
}
@@ -1043,6 +1060,9 @@ public class BlockManager {
* datanodes.
*/
private void addToInvalidates(Block b) {
+ if (!namesystem.isPopulatingReplQueues()) {
+ return;
+ }
StringBuilder datanodes = new StringBuilder();
for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
@@ -1059,6 +1079,7 @@ public class BlockManager {
* Mark the block belonging to datanode as corrupt
* @param blk Block to be marked as corrupt
* @param dn Datanode which holds the corrupt replica
+ * @param storageID if known, null otherwise.
* @param reason a textual reason why the block should be marked corrupt,
* for logging purposes
*/
@@ -1075,17 +1096,29 @@ public class BlockManager {
+ blk + " not found");
return;
}
- markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason,
- Reason.CORRUPTION_REPORTED), dn, storageID);
- }
- private void markBlockAsCorrupt(BlockToMarkCorrupt b,
- DatanodeInfo dn, String storageID) throws IOException {
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
- throw new IOException("Cannot mark " + b
- + " as corrupt because datanode " + dn + " does not exist");
+ throw new IOException("Cannot mark " + blk
+ + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
+ + ") does not exist");
}
+
+ markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
+ blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
+ storageID == null ? null : node.getStorageInfo(storageID),
+ node);
+ }
+
+ /**
+ *
+ * @param b
+ * @param storageInfo storage that contains the block, if known. null otherwise.
+ * @throws IOException
+ */
+ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
+ DatanodeStorageInfo storageInfo,
+ DatanodeDescriptor node) throws IOException {
BlockCollection bc = b.corrupted.getBlockCollection();
if (bc == null) {
@@ -1096,12 +1129,32 @@ public class BlockManager {
}
// Add replica to the data-node if it is not already there
- node.addBlock(storageID, b.stored);
+ if (storageInfo != null) {
+ storageInfo.addBlock(b.stored);
+ }
// Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
b.reasonCode);
- if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) {
+
+ NumberReplicas numberOfReplicas = countNodes(b.stored);
+ boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= bc
+ .getBlockReplication();
+ boolean minReplicationSatisfied =
+ numberOfReplicas.liveReplicas() >= minReplication;
+ boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
+ (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
+ bc.getBlockReplication();
+ boolean corruptedDuringWrite = minReplicationSatisfied &&
+ (b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp());
+ // case 1: have enough number of live replicas
+ // case 2: corrupted replicas + live replicas > Replication factor
+ // case 3: Block is marked corrupt due to failure while writing. In this
+ // case genstamp will be different than that of valid block.
+ // In all these cases we can delete the replica.
+ // In case of 3, rbw block will be deleted and valid block can be replicated
+ if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
+ || corruptedDuringWrite) {
// the block is over-replicated so invalidate the replicas immediately
invalidateBlock(b, node);
} else if (namesystem.isPopulatingReplQueues()) {
@@ -1179,14 +1232,20 @@ public class BlockManager {
* @return total number of block for deletion
*/
int computeInvalidateWork(int nodesToProcess) {
- final List<String> nodes = invalidateBlocks.getStorageIDs();
+ final List<DatanodeInfo> nodes = invalidateBlocks.getDatanodes();
Collections.shuffle(nodes);
nodesToProcess = Math.min(nodes.size(), nodesToProcess);
int blockCnt = 0;
- for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
- blockCnt += invalidateWorkForOneNode(nodes.get(nodeCnt));
+ for (DatanodeInfo dnInfo : nodes) {
+ int blocks = invalidateWorkForOneNode(dnInfo);
+ if (blocks > 0) {
+ blockCnt += blocks;
+ if (--nodesToProcess == 0) {
+ break;
+ }
+ }
}
return blockCnt;
}
@@ -1411,7 +1470,7 @@ public class BlockManager {
* @throws IOException
* if the number of targets < minimum replication.
* @see BlockPlacementPolicy#chooseTarget(String, int, Node,
- * List, boolean, Set, long)
+ * List, boolean, Set, long, StorageType)
*/
public DatanodeStorageInfo[] chooseTarget(final String src,
final int numOfReplicas, final DatanodeDescriptor client,
@@ -1648,7 +1707,7 @@ public class BlockManager {
* @throws IOException
*/
public boolean processReport(final DatanodeID nodeID,
- final DatanodeStorage storage, final String poolId,
+ final DatanodeStorage storage,
final BlockListAsLongs newReport) throws IOException {
namesystem.writeLock();
final long startTime = Time.now(); //after acquiring write lock
@@ -1680,9 +1739,9 @@ public class BlockManager {
if (storageInfo.numBlocks() == 0) {
// The first block report can be processed a lot more efficiently than
// ordinary block reports. This shortens restart times.
- processFirstBlockReport(node, storage.getStorageID(), newReport);
+ processFirstBlockReport(storageInfo, newReport);
} else {
- processReport(node, storage, newReport);
+ processReport(storageInfo, newReport);
}
// Now that we have an up-to-date block report, we know that any
@@ -1708,6 +1767,7 @@ public class BlockManager {
}
blockLog.info("BLOCK* processReport: from storage " + storage.getStorageID()
+ " node " + nodeID + ", blocks: " + newReport.getNumberOfBlocks()
+ + ", hasStaleStorages: " + node.hasStaleStorages()
+ ", processing time: " + (endTime - startTime) + " msecs");
return !node.hasStaleStorages();
}
@@ -1743,9 +1803,8 @@ public class BlockManager {
}
}
- private void processReport(final DatanodeDescriptor node,
- final DatanodeStorage storage,
- final BlockListAsLongs report) throws IOException {
+ private void processReport(final DatanodeStorageInfo storageInfo,
+ final BlockListAsLongs report) throws IOException {
// Normal case:
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
@@ -1755,19 +1814,20 @@ public class BlockManager {
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
- reportDiff(node, storage, report,
+ reportDiff(storageInfo, report,
toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-
+
+ DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Process the blocks on each queue
for (StatefulBlockInfo b : toUC) {
- addStoredBlockUnderConstruction(b, node, storage.getStorageID());
+ addStoredBlockUnderConstruction(b, storageInfo);
}
for (Block b : toRemove) {
removeStoredBlock(b, node);
}
int numBlocksLogged = 0;
for (BlockInfo b : toAdd) {
- addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog);
+ addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
@@ -1781,7 +1841,7 @@ public class BlockManager {
addToInvalidates(b, node);
}
for (BlockToMarkCorrupt b : toCorrupt) {
- markBlockAsCorrupt(b, node, storage.getStorageID());
+ markBlockAsCorrupt(b, storageInfo, node);
}
}
@@ -1792,16 +1852,16 @@ public class BlockManager {
* a toRemove list (since there won't be any). It also silently discards
* any invalid blocks, thereby deferring their processing until
* the next block report.
- * @param node - DatanodeDescriptor of the node that sent the report
+ * @param storageInfo - DatanodeStorageInfo that sent the report
* @param report - the initial block report, to be processed
* @throws IOException
*/
- private void processFirstBlockReport(final DatanodeDescriptor node,
- final String storageID,
+ private void processFirstBlockReport(
+ final DatanodeStorageInfo storageInfo,
final BlockListAsLongs report) throws IOException {
if (report == null) return;
assert (namesystem.hasWriteLock());
- assert (node.getStorageInfo(storageID).numBlocks() == 0);
+ assert (storageInfo.numBlocks() == 0);
BlockReportIterator itBR = report.getBlockReportIterator();
while(itBR.hasNext()) {
@@ -1810,7 +1870,7 @@ public class BlockManager {
if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(iblk)) {
- queueReportedBlock(node, storageID, iblk, reportedState,
+ queueReportedBlock(storageInfo, iblk, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
continue;
}
@@ -1822,15 +1882,16 @@ public class BlockManager {
// If block is corrupt, mark it and continue to next block.
BlockUCState ucState = storedBlock.getBlockUCState();
BlockToMarkCorrupt c = checkReplicaCorrupt(
- iblk, reportedState, storedBlock, ucState, node);
+ iblk, reportedState, storedBlock, ucState,
+ storageInfo.getDatanodeDescriptor());
if (c != null) {
if (shouldPostponeBlocksFromFuture) {
// In the Standby, we may receive a block report for a file that we
// just have an out-of-date gen-stamp or state for, for example.
- queueReportedBlock(node, storageID, iblk, reportedState,
+ queueReportedBlock(storageInfo, iblk, reportedState,
QUEUE_REASON_CORRUPT_STATE);
} else {
- markBlockAsCorrupt(c, node, storageID);
+ markBlockAsCorrupt(c, storageInfo, storageInfo.getDatanodeDescriptor());
}
continue;
}
@@ -1838,7 +1899,7 @@ public class BlockManager {
// If block is under construction, add this replica to its list
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
- node.getStorageInfo(storageID), iblk, reportedState);
+ storageInfo, iblk, reportedState);
// OpenFileBlocks only inside snapshots also will be added to safemode
// threshold. So we need to update such blocks to safemode
// refer HDFS-5283
@@ -1851,12 +1912,12 @@ public class BlockManager {
}
//add replica if appropriate
if (reportedState == ReplicaState.FINALIZED) {
- addStoredBlockImmediate(storedBlock, node, storageID);
+ addStoredBlockImmediate(storedBlock, storageInfo);
}
}
}
- private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage,
+ private void reportDiff(DatanodeStorageInfo storageInfo,
BlockListAsLongs newReport,
Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
Collection<Block> toRemove, // remove from DatanodeDescriptor
@@ -1864,8 +1925,6 @@ public class BlockManager {
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
- final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID());
-
// place a delimiter in the list which separates blocks
// that have been reported from those that have not
BlockInfo delimiter = new BlockInfo(new Block(), 1);
@@ -1882,7 +1941,7 @@ public class BlockManager {
while(itBR.hasNext()) {
Block iblk = itBR.next();
ReplicaState iState = itBR.getCurrentReplicaState();
- BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(),
+ BlockInfo storedBlock = processReportedBlock(storageInfo,
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
// move block to the head of the list
@@ -1919,7 +1978,7 @@ public class BlockManager {
* BlockInfoUnderConstruction's list of replicas.</li>
* </ol>
*
- * @param dn descriptor for the datanode that made the report
+ * @param storageInfo DatanodeStorageInfo that sent the report.
* @param block reported block replica
* @param reportedState reported replica state
* @param toAdd add to DatanodeDescriptor
@@ -1931,14 +1990,16 @@ public class BlockManager {
* @return the up-to-date stored block, if it should be kept.
* Otherwise, null.
*/
- private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
- final String storageID,
+ private BlockInfo processReportedBlock(
+ final DatanodeStorageInfo storageInfo,
final Block block, final ReplicaState reportedState,
final Collection<BlockInfo> toAdd,
final Collection<Block> toInvalidate,
final Collection<BlockToMarkCorrupt> toCorrupt,
final Collection<StatefulBlockInfo> toUC) {
+ DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
+
if(LOG.isDebugEnabled()) {
LOG.debug("Reported block " + block
+ " on " + dn + " size " + block.getNumBytes()
@@ -1947,7 +2008,7 @@ public class BlockManager {
if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(block)) {
- queueReportedBlock(dn, storageID, block, reportedState,
+ queueReportedBlock(storageInfo, block, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
return null;
}
@@ -1968,7 +2029,7 @@ public class BlockManager {
}
// Ignore replicas already scheduled to be removed from the DN
- if(invalidateBlocks.contains(dn.getDatanodeUuid(), block)) {
+ if(invalidateBlocks.contains(dn, block)) {
/*
* TODO: following assertion is incorrect, see HDFS-2668 assert
* storedBlock.findDatanode(dn) < 0 : "Block " + block +
@@ -1984,7 +2045,10 @@ public class BlockManager {
// If the block is an out-of-date generation stamp or state,
// but we're the standby, we shouldn't treat it as corrupt,
// but instead just queue it for later processing.
- queueReportedBlock(dn, storageID, storedBlock, reportedState,
+ // TODO: Pretty confident this should be s/storedBlock/block below,
+ // since we should be postponing the info of the reported block, not
+ // the stored block. See HDFS-6289 for more context.
+ queueReportedBlock(storageInfo, storedBlock, reportedState,
QUEUE_REASON_CORRUPT_STATE);
} else {
toCorrupt.add(c);
@@ -2001,7 +2065,7 @@ public class BlockManager {
// Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated.
if (reportedState == ReplicaState.FINALIZED
- && (storedBlock.findDatanode(dn) < 0
+ && (!storedBlock.findDatanode(dn)
|| corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
toAdd.add(storedBlock);
}
@@ -2013,17 +2077,17 @@ public class BlockManager {
* standby node. @see PendingDataNodeMessages.
* @param reason a textual reason to report in the debug logs
*/
- private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
+ private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, String reason) {
assert shouldPostponeBlocksFromFuture;
if (LOG.isDebugEnabled()) {
LOG.debug("Queueing reported block " + block +
" in state " + reportedState +
- " from datanode " + dn + " for later processing " +
- "because " + reason + ".");
+ " from datanode " + storageInfo.getDatanodeDescriptor() +
+ " for later processing because " + reason + ".");
}
- pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState);
+ pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState);
}
/**
@@ -2046,7 +2110,7 @@ public class BlockManager {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing previouly queued message " + rbi);
}
- processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(),
+ processAndHandleReportedBlock(rbi.getStorageInfo(),
rbi.getBlock(), rbi.getReportedState(), null);
}
}
@@ -2103,6 +2167,16 @@ public class BlockManager {
} else {
return null; // not corrupt
}
+ case UNDER_CONSTRUCTION:
+ if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
+ final long reportedGS = reported.getGenerationStamp();
+ return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is "
+ + ucState + " and reported state " + reportedState
+ + ", But reported genstamp " + reportedGS
+ + " does not match genstamp in block map "
+ + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
+ }
+ return null;
default:
return null;
}
@@ -2166,19 +2240,20 @@ public class BlockManager {
}
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
- DatanodeDescriptor node, String storageID) throws IOException {
+ DatanodeStorageInfo storageInfo) throws IOException {
BlockInfoUnderConstruction block = ucBlock.storedBlock;
- block.addReplicaIfNotPresent(node.getStorageInfo(storageID),
- ucBlock.reportedBlock, ucBlock.reportedState);
+ block.addReplicaIfNotPresent(
+ storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
- if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
- addStoredBlock(block, node, storageID, null, true);
+ if (ucBlock.reportedState == ReplicaState.FINALIZED &&
+ !block.findDatanode(storageInfo.getDatanodeDescriptor())) {
+ addStoredBlock(block, storageInfo, null, true);
}
}
/**
* Faster version of
- * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)}
+ * {@link #addStoredBlock(BlockInfo, DatanodeStorageInfo, DatanodeDescriptor, boolean)}
* , intended for use with initial block report at startup. If not in startup
* safe mode, will call standard addStoredBlock(). Assumes this method is
* called "immediately" so there is no need to refresh the storedBlock from
@@ -2189,17 +2264,17 @@ public class BlockManager {
* @throws IOException
*/
private void addStoredBlockImmediate(BlockInfo storedBlock,
- DatanodeDescriptor node, String storageID)
+ DatanodeStorageInfo storageInfo)
throws IOException {
assert (storedBlock != null && namesystem.hasWriteLock());
if (!namesystem.isInStartupSafeMode()
|| namesystem.isPopulatingReplQueues()) {
- addStoredBlock(storedBlock, node, storageID, null, false);
+ addStoredBlock(storedBlock, storageInfo, null, false);
return;
}
// just add it
- node.addBlock(storageID, storedBlock);
+ storageInfo.addBlock(storedBlock);
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
@@ -2221,13 +2296,13 @@ public class BlockManager {
* @return the block that is stored in blockMap.
*/
private Block addStoredBlock(final BlockInfo block,
- DatanodeDescriptor node,
- String storageID,
+ DatanodeStorageInfo storageInfo,
DatanodeDescriptor delNodeHint,
boolean logEveryBlock)
throws IOException {
assert block != null && namesystem.hasWriteLock();
BlockInfo storedBlock;
+ DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
if (block instanceof BlockInfoUnderConstruction) {
//refresh our copy in case the block got completed in another thread
storedBlock = blocksMap.getStoredBlock(block);
@@ -2243,12 +2318,11 @@ public class BlockManager {
// it will happen in next block report otherwise.
return block;
}
- assert storedBlock != null : "Block must be stored by now";
BlockCollection bc = storedBlock.getBlockCollection();
assert bc != null : "Block must belong to a file";
// add block to the datanode
- boolean added = node.addBlock(storageID, storedBlock);
+ boolean added = storageInfo.addBlock(storedBlock);
int curReplicaDelta;
if (added) {
@@ -2587,7 +2661,7 @@ public class BlockManager {
if (addedNode == delNodeHint) {
delNodeHint = null;
}
- Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
+ Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(block);
for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
@@ -2607,7 +2681,7 @@ public class BlockManager {
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
// exclude corrupt replicas
if (corruptNodes == null || !corruptNodes.contains(cur)) {
- nonExcess.add(cur);
+ nonExcess.add(storage);
}
}
}
@@ -2631,7 +2705,7 @@ public class BlockManager {
* If no such a node is available,
* then pick a node with least free space
*/
- private void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess,
+ private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess,
Block b, short replication,
DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint,
@@ -2639,28 +2713,33 @@ public class BlockManager {
assert namesystem.hasWriteLock();
// first form a rack to datanodes map and
BlockCollection bc = getBlockCollection(b);
- final Map<String, List<DatanodeDescriptor>> rackMap
- = new HashMap<String, List<DatanodeDescriptor>>();
- final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
- final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
+
+ final Map<String, List<DatanodeStorageInfo>> rackMap
+ = new HashMap<String, List<DatanodeStorageInfo>>();
+ final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
+ final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
// split nodes into two sets
// moreThanOne contains nodes on rack with more than one replica
// exactlyOne contains the remaining nodes
- replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne,
- exactlyOne);
+ replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
// pick one node to delete that favors the delete hint
// otherwise pick one with least space from priSet if it is not empty
// otherwise one node with least space from remains
boolean firstOne = true;
+ final DatanodeStorageInfo delNodeHintStorage
+ = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
+ final DatanodeStorageInfo addedNodeStorage
+ = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);
while (nonExcess.size() - replication > 0) {
// check if we can delete delNodeHint
- final DatanodeInfo cur;
- if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
- && (moreThanOne.contains(delNodeHint)
- || (addedNode != null && !moreThanOne.contains(addedNode))) ) {
- cur = delNodeHint;
+ final DatanodeStorageInfo cur;
+ if (firstOne && delNodeHintStorage != null
+ && (moreThanOne.contains(delNodeHintStorage)
+ || (addedNodeStorage != null
+ && !moreThanOne.contains(addedNodeStorage)))) {
+ cur = delNodeHintStorage;
} else { // regular excessive replica removal
cur = replicator.chooseReplicaToDelete(bc, b, replication,
moreThanOne, exactlyOne);
@@ -2672,7 +2751,7 @@ public class BlockManager {
exactlyOne, cur);
nonExcess.remove(cur);
- addToExcessReplicate(cur, b);
+ addToExcessReplicate(cur.getDatanodeDescriptor(), b);
//
// The 'excessblocks' tracks blocks until we get confirmation
@@ -2683,7 +2762,7 @@ public class BlockManager {
// should be deleted. Items are removed from the invalidate list
// upon giving instructions to the namenode.
//
- addToInvalidates(b, cur);
+ addToInvalidates(b, cur.getDatanodeDescriptor());
blockLog.info("BLOCK* chooseExcessReplicates: "
+"("+cur+", "+b+") is added to invalidated blocks set");
}
@@ -2772,12 +2851,15 @@ public class BlockManager {
} else {
final String[] datanodeUuids = new String[locations.size()];
final String[] storageIDs = new String[datanodeUuids.length];
+ final StorageType[] storageTypes = new StorageType[datanodeUuids.length];
for(int i = 0; i < locations.size(); i++) {
final DatanodeStorageInfo s = locations.get(i);
datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid();
storageIDs[i] = s.getStorageID();
+ storageTypes[i] = s.getStorageType();
}
- results.add(new BlockWithLocations(block, datanodeUuids, storageIDs));
+ results.add(new BlockWithLocations(block, datanodeUuids, storageIDs,
+ storageTypes));
return block.getNumBytes();
}
}
@@ -2786,8 +2868,9 @@ public class BlockManager {
* The given node is reporting that it received a certain block.
*/
@VisibleForTesting
- void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint)
+ void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint)
throws IOException {
+ DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Decrement number of blocks scheduled to this datanode.
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
// RECEIVED_BLOCK), we currently also decrease the approximate number.
@@ -2807,12 +2890,12 @@ public class BlockManager {
// Modify the blocks->datanode map and node's map.
//
pendingReplications.decrement(block, node);
- processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED,
+ processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode);
}
- private void processAndHandleReportedBlock(DatanodeDescriptor node,
- String storageID, Block block,
+ private void processAndHandleReportedBlock(
+ DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, DatanodeDescriptor delHintNode)
throws IOException {
// blockReceived reports a finalized block
@@ -2820,7 +2903,9 @@ public class BlockManager {
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
- processReportedBlock(node, storageID, block, reportedState,
+ final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
+
+ processReportedBlock(storageInfo, block, reportedState,
toAdd, toInvalidate, toCorrupt, toUC);
// the block is only in one of the to-do lists
// if it is in none then data-node already has it
@@ -2828,11 +2913,11 @@ public class BlockManager {
: "The block should be only in one of the lists.";
for (StatefulBlockInfo b : toUC) {
- addStoredBlockUnderConstruction(b, node, storageID);
+ addStoredBlockUnderConstruction(b, storageInfo);
}
long numBlocksLogged = 0;
for (BlockInfo b : toAdd) {
- addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog);
+ addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
@@ -2846,7 +2931,7 @@ public class BlockManager {
addToInvalidates(b, node);
}
for (BlockToMarkCorrupt b : toCorrupt) {
- markBlockAsCorrupt(b, node, storageID);
+ markBlockAsCorrupt(b, storageInfo, node);
}
}
@@ -2873,13 +2958,15 @@ public class BlockManager {
"Got incremental block report from unregistered or dead node");
}
- if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) {
+ DatanodeStorageInfo storageInfo =
+ node.getStorageInfo(srdb.getStorage().getStorageID());
+ if (storageInfo == null) {
// The DataNode is reporting an unknown storage. Usually the NN learns
// about new storages from heartbeats but during NN restart we may
// receive a block report or incremental report before the heartbeat.
// We must handle this for protocol compatibility. This issue was
// uncovered by HDFS-6094.
- node.updateStorage(srdb.getStorage());
+ storageInfo = node.updateStorage(srdb.getStorage());
}
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
@@ -2889,14 +2976,13 @@ public class BlockManager {
deleted++;
break;
case RECEIVED_BLOCK:
- addBlock(node, srdb.getStorage().getStorageID(),
- rdbi.getBlock(), rdbi.getDelHints());
+ addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
received++;
break;
case RECEIVING_BLOCK:
receiving++;
- processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(),
- rdbi.getBlock(), ReplicaState.RBW, null);
+ processAndHandleReportedBlock(storageInfo, rdbi.getBlock(),
+ ReplicaState.RBW, null);
break;
default:
String msg =
@@ -2999,10 +3085,14 @@ public class BlockManager {
/**
* On stopping decommission, check if the node has excess replicas.
- * If there are any excess replicas, call processOverReplicatedBlock()
+ * If there are any excess replicas, call processOverReplicatedBlock().
+ * Process over replicated blocks only when active NN is out of safe mode.
*/
void processOverReplicatedBlocksOnReCommission(
final DatanodeDescriptor srcNode) {
+ if (!namesystem.isPopulatingReplQueues()) {
+ return;
+ }
final Iterator<? extends Block> it = srcNode.getBlockIterator();
int numOverReplicated = 0;
while(it.hasNext()) {
@@ -3068,11 +3158,13 @@ public class BlockManager {
}
}
if (!neededReplications.contains(block) &&
- pendingReplications.getNumReplicas(block) == 0) {
+ pendingReplications.getNumReplicas(block) == 0 &&
+ namesystem.isPopulatingReplQueues()) {
//
// These blocks have been reported from the datanode
// after the startDecommission method has been executed. These
// blocks were in flight when the decommissioning was started.
+ // Process these blocks only when active NN is out of safe mode.
//
neededReplications.add(block,
curReplicas,
@@ -3082,6 +3174,15 @@ public class BlockManager {
}
}
}
+
+ if (!status && !srcNode.isAlive) {
+ LOG.warn("srcNode " + srcNode + " is dead " +
+ "when decommission is in progress. Continue to mark " +
+ "it as decommission in progress. In that way, when it rejoins the " +
+ "cluster it can continue the decommission process.");
+ status = true;
+ }
+
srcNode.decommissioningStatus.set(underReplicatedBlocks,
decommissionOnlyReplicas,
underReplicatedInOpenFiles);
@@ -3186,9 +3287,8 @@ public class BlockManager {
*
* @return number of blocks scheduled for removal during this iteration.
*/
- private int invalidateWorkForOneNode(String nodeId) {
+ private int invalidateWorkForOneNode(DatanodeInfo dn) {
final List<Block> toInvalidate;
- final DatanodeDescriptor dn;
namesystem.writeLock();
try {
@@ -3197,15 +3297,13 @@ public class BlockManager {
LOG.debug("In safemode, not computing replication work");
return 0;
}
- // get blocks to invalidate for the nodeId
- assert nodeId != null;
- dn = datanodeManager.getDatanode(nodeId);
- if (dn == null) {
- invalidateBlocks.remove(nodeId);
- return 0;
- }
- toInvalidate = invalidateBlocks.invalidateWork(nodeId, dn);
- if (toInvalidate == null) {
+ try {
+ toInvalidate = invalidateBlocks.invalidateWork(datanodeManager.getDatanode(dn));
+
+ if (toInvalidate == null) {
+ return 0;
+ }
+ } catch(UnregisteredNodeException une) {
return 0;
}
} finally {
@@ -3287,12 +3385,7 @@ public class BlockManager {
}
public int getCapacity() {
- namesystem.readLock();
- try {
- return blocksMap.getCapacity();
- } finally {
- namesystem.readUnlock();
- }
+ return blocksMap.getCapacity();
}
/**
@@ -3344,8 +3437,11 @@ public class BlockManager {
public void run() {
while (namesystem.isRunning()) {
try {
- computeDatanodeWork();
- processPendingReplications();
+ // Process replication work only when active NN is out of safe mode.
+ if (namesystem.isPopulatingReplQueues()) {
+ computeDatanodeWork();
+ processPendingReplications();
+ }
Thread.sleep(replicationRecheckInterval);
} catch (Throwable t) {
if (!namesystem.isRunning()) {
@@ -3373,7 +3469,6 @@ public class BlockManager {
* heartbeat.
*
* @return number of blocks scheduled for replication or removal.
- * @throws IOException
*/
int computeDatanodeWork() {
// Blocks should not be replicated or removed if in safe mode.
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Tue Aug 19 23:49:39 2014
@@ -61,7 +61,7 @@ public abstract class BlockPlacementPoli
* @param srcPath the file to which this chooseTargets is being invoked.
* @param numOfReplicas additional number of replicas wanted.
* @param writer the writer's machine, null if not in the cluster.
- * @param chosenNodes datanodes that have been chosen as targets.
+ * @param chosen datanodes that have been chosen as targets.
* @param returnChosenNodes decide if the chosenNodes are returned.
* @param excludedNodes datanodes that should not be considered as targets.
* @param blocksize size of the data to be written.
@@ -78,8 +78,8 @@ public abstract class BlockPlacementPoli
StorageType storageType);
/**
- * Same as {@link #chooseTarget(String, int, Node, List, boolean,
- * Set, long)} with added parameter {@code favoredDatanodes}
+ * Same as {@link #chooseTarget(String, int, Node, Set, long, List, StorageType)}
+ * with added parameter {@code favoredDatanodes}
* @param favoredNodes datanodes that should be favored as targets. This
* is only a hint and due to cluster state, namenode may not be
* able to place the blocks on these datanodes.
@@ -124,11 +124,12 @@ public abstract class BlockPlacementPoli
listed in the previous parameter.
* @return the replica that is the best candidate for deletion
*/
- abstract public DatanodeDescriptor chooseReplicaToDelete(BlockCollection srcBC,
- Block block,
- short replicationFactor,
- Collection<DatanodeDescriptor> existingReplicas,
- Collection<DatanodeDescriptor> moreExistingReplicas);
+ abstract public DatanodeStorageInfo chooseReplicaToDelete(
+ BlockCollection srcBC,
+ Block block,
+ short replicationFactor,
+ Collection<DatanodeStorageInfo> existingReplicas,
+ Collection<DatanodeStorageInfo> moreExistingReplicas);
/**
* Used to setup a BlockPlacementPolicy object. This should be defined by
@@ -139,11 +140,13 @@ public abstract class BlockPlacementPoli
* @param clusterMap cluster topology
*/
abstract protected void initialize(Configuration conf, FSClusterStats stats,
- NetworkTopology clusterMap);
+ NetworkTopology clusterMap,
+ Host2NodesMap host2datanodeMap);
/**
* Get an instance of the configured Block Placement Policy based on the
- * the configuration property {@link DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}.
+ * the configuration property
+ * {@link DFSConfigKeys#DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}.
*
* @param conf the configuration to be used
* @param stats an object that is used to retrieve the load on the cluster
@@ -152,14 +155,15 @@ public abstract class BlockPlacementPoli
*/
public static BlockPlacementPolicy getInstance(Configuration conf,
FSClusterStats stats,
- NetworkTopology clusterMap) {
+ NetworkTopology clusterMap,
+ Host2NodesMap host2datanodeMap) {
final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass(
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
BlockPlacementPolicy.class);
final BlockPlacementPolicy replicator = ReflectionUtils.newInstance(
replicatorClass, conf);
- replicator.initialize(conf, stats, clusterMap);
+ replicator.initialize(conf, stats, clusterMap, host2datanodeMap);
return replicator;
}
@@ -172,21 +176,23 @@ public abstract class BlockPlacementPoli
* @param exactlyOne The List of replica nodes on rack with only one replica
* @param cur current replica to remove
*/
- public void adjustSetsWithChosenReplica(final Map<String,
- List<DatanodeDescriptor>> rackMap,
- final List<DatanodeDescriptor> moreThanOne,
- final List<DatanodeDescriptor> exactlyOne, final DatanodeInfo cur) {
+ public void adjustSetsWithChosenReplica(
+ final Map<String, List<DatanodeStorageInfo>> rackMap,
+ final List<DatanodeStorageInfo> moreThanOne,
+ final List<DatanodeStorageInfo> exactlyOne,
+ final DatanodeStorageInfo cur) {
- String rack = getRack(cur);
- final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
- datanodes.remove(cur);
- if (datanodes.isEmpty()) {
+ final String rack = getRack(cur.getDatanodeDescriptor());
+ final List<DatanodeStorageInfo> storages = rackMap.get(rack);
+ storages.remove(cur);
+ if (storages.isEmpty()) {
rackMap.remove(rack);
}
if (moreThanOne.remove(cur)) {
- if (datanodes.size() == 1) {
- moreThanOne.remove(datanodes.get(0));
- exactlyOne.add(datanodes.get(0));
+ if (storages.size() == 1) {
+ final DatanodeStorageInfo remaining = storages.get(0);
+ moreThanOne.remove(remaining);
+ exactlyOne.add(remaining);
}
} else {
exactlyOne.remove(cur);
@@ -195,7 +201,6 @@ public abstract class BlockPlacementPoli
/**
* Get rack string from a data node
- * @param datanode
* @return rack of data node
*/
protected String getRack(final DatanodeInfo datanode) {
@@ -206,34 +211,34 @@ public abstract class BlockPlacementPoli
* Split data nodes into two sets, one set includes nodes on rack with
* more than one replica, the other set contains the remaining nodes.
*
- * @param dataNodes
+ * @param dataNodes datanodes to be split into two sets
* @param rackMap a map from rack to datanodes
* @param moreThanOne contains nodes on rack with more than one replica
* @param exactlyOne remains contains the remaining nodes
*/
public void splitNodesWithRack(
- Collection<DatanodeDescriptor> dataNodes,
- final Map<String, List<DatanodeDescriptor>> rackMap,
- final List<DatanodeDescriptor> moreThanOne,
- final List<DatanodeDescriptor> exactlyOne) {
- for(DatanodeDescriptor node : dataNodes) {
- final String rackName = getRack(node);
- List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
- if (datanodeList == null) {
- datanodeList = new ArrayList<DatanodeDescriptor>();
- rackMap.put(rackName, datanodeList);
+ final Iterable<DatanodeStorageInfo> storages,
+ final Map<String, List<DatanodeStorageInfo>> rackMap,
+ final List<DatanodeStorageInfo> moreThanOne,
+ final List<DatanodeStorageInfo> exactlyOne) {
+ for(DatanodeStorageInfo s: storages) {
+ final String rackName = getRack(s.getDatanodeDescriptor());
+ List<DatanodeStorageInfo> storageList = rackMap.get(rackName);
+ if (storageList == null) {
+ storageList = new ArrayList<DatanodeStorageInfo>();
+ rackMap.put(rackName, storageList);
}
- datanodeList.add(node);
+ storageList.add(s);
}
// split nodes into two sets
- for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
- if (datanodeList.size() == 1) {
+ for(List<DatanodeStorageInfo> storageList : rackMap.values()) {
+ if (storageList.size() == 1) {
// exactlyOne contains nodes on rack with only one replica
- exactlyOne.add(datanodeList.get(0));
+ exactlyOne.add(storageList.get(0));
} else {
// moreThanOne contains nodes on rack with more than one replica
- moreThanOne.addAll(datanodeList);
+ moreThanOne.addAll(storageList);
}
}
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Tue Aug 19 23:49:39 2014
@@ -70,6 +70,7 @@ public class BlockPlacementPolicyDefault
protected boolean considerLoad;
private boolean preferLocalNode = true;
protected NetworkTopology clusterMap;
+ protected Host2NodesMap host2datanodeMap;
private FSClusterStats stats;
protected long heartbeatInterval; // interval for DataNode heartbeats
private long staleInterval; // interval used to identify stale DataNodes
@@ -80,8 +81,9 @@ public class BlockPlacementPolicyDefault
protected int tolerateHeartbeatMultiplier;
protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
- NetworkTopology clusterMap) {
- initialize(conf, stats, clusterMap);
+ NetworkTopology clusterMap,
+ Host2NodesMap host2datanodeMap) {
+ initialize(conf, stats, clusterMap, host2datanodeMap);
}
protected BlockPlacementPolicyDefault() {
@@ -89,11 +91,13 @@ public class BlockPlacementPolicyDefault
@Override
public void initialize(Configuration conf, FSClusterStats stats,
- NetworkTopology clusterMap) {
+ NetworkTopology clusterMap,
+ Host2NodesMap host2datanodeMap) {
this.considerLoad = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
this.stats = stats;
this.clusterMap = clusterMap;
+ this.host2datanodeMap = host2datanodeMap;
this.heartbeatInterval = conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000;
@@ -141,14 +145,14 @@ public class BlockPlacementPolicyDefault
List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
boolean avoidStaleNodes = stats != null
&& stats.isAvoidingStaleDataNodesForWrite();
- for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) {
+ for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; i++) {
DatanodeDescriptor favoredNode = favoredNodes.get(i);
// Choose a single node which is local to favoredNode.
// 'results' is updated within chooseLocalNode
final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
favoriteAndExcludedNodes, blocksize,
getMaxNodesPerRack(results.size(), numOfReplicas)[1],
- results, avoidStaleNodes, storageType);
+ results, avoidStaleNodes, storageType, false);
if (target == null) {
LOG.warn("Could not find a target for file " + src
+ " with favored node " + favoredNode);
@@ -267,7 +271,7 @@ public class BlockPlacementPolicyDefault
try {
if (numOfResults == 0) {
writer = chooseLocalStorage(writer, excludedNodes, blocksize,
- maxNodesPerRack, results, avoidStaleNodes, storageType)
+ maxNodesPerRack, results, avoidStaleNodes, storageType, true)
.getDatanodeDescriptor();
if (--numOfReplicas == 0) {
return writer;
@@ -341,12 +345,14 @@ public class BlockPlacementPolicyDefault
int maxNodesPerRack,
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
- StorageType storageType)
+ StorageType storageType,
+ boolean fallbackToLocalRack)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
- if (localMachine == null)
+ if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType);
+ }
if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
// otherwise try local machine first
@@ -359,7 +365,11 @@ public class BlockPlacementPolicyDefault
}
}
}
- }
+ }
+
+ if (!fallbackToLocalRack) {
+ return null;
+ }
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType);
@@ -632,15 +642,11 @@ public class BlockPlacementPolicyDefault
// check the communication traffic of the target machine
if (considerLoad) {
- double avgLoad = 0;
- if (stats != null) {
- int size = stats.getNumDatanodesInService();
- if (size != 0) {
- avgLoad = (double)stats.getTotalLoad()/size;
- }
- }
- if (node.getXceiverCount() > (2.0 * avgLoad)) {
- logNodeIsNotChosen(storage, "the node is too busy ");
+ final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
+ final int nodeLoad = node.getXceiverCount();
+ if (nodeLoad > maxLoad) {
+ logNodeIsNotChosen(storage,
+ "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") ");
return false;
}
}
@@ -723,31 +729,34 @@ public class BlockPlacementPolicyDefault
}
@Override
- public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc,
+ public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc,
Block block, short replicationFactor,
- Collection<DatanodeDescriptor> first,
- Collection<DatanodeDescriptor> second) {
+ Collection<DatanodeStorageInfo> first,
+ Collection<DatanodeStorageInfo> second) {
long oldestHeartbeat =
now() - heartbeatInterval * tolerateHeartbeatMultiplier;
- DatanodeDescriptor oldestHeartbeatNode = null;
+ DatanodeStorageInfo oldestHeartbeatStorage = null;
long minSpace = Long.MAX_VALUE;
- DatanodeDescriptor minSpaceNode = null;
+ DatanodeStorageInfo minSpaceStorage = null;
// Pick the node with the oldest heartbeat or with the least free space,
// if all hearbeats are within the tolerable heartbeat interval
- for(DatanodeDescriptor node : pickupReplicaSet(first, second)) {
+ for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) {
+ final DatanodeDescriptor node = storage.getDatanodeDescriptor();
long free = node.getRemaining();
long lastHeartbeat = node.getLastUpdate();
if(lastHeartbeat < oldestHeartbeat) {
oldestHeartbeat = lastHeartbeat;
- oldestHeartbeatNode = node;
+ oldestHeartbeatStorage = storage;
}
if (minSpace > free) {
minSpace = free;
- minSpaceNode = node;
+ minSpaceStorage = storage;
}
}
- return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
+
+ return oldestHeartbeatStorage != null? oldestHeartbeatStorage
+ : minSpaceStorage;
}
/**
@@ -756,9 +765,9 @@ public class BlockPlacementPolicyDefault
* replica while second set contains remaining replica nodes.
* So pick up first set if not empty. If first is empty, then pick second.
*/
- protected Collection<DatanodeDescriptor> pickupReplicaSet(
- Collection<DatanodeDescriptor> first,
- Collection<DatanodeDescriptor> second) {
+ protected Collection<DatanodeStorageInfo> pickupReplicaSet(
+ Collection<DatanodeStorageInfo> first,
+ Collection<DatanodeStorageInfo> second) {
return first.isEmpty() ? second : first;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Tue Aug 19 23:49:39 2014
@@ -47,8 +47,8 @@ import org.apache.hadoop.net.NodeBase;
public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault {
protected BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats,
- NetworkTopology clusterMap) {
- initialize(conf, stats, clusterMap);
+ NetworkTopology clusterMap, DatanodeManager datanodeManager) {
+ initialize(conf, stats, clusterMap, host2datanodeMap);
}
protected BlockPlacementPolicyWithNodeGroup() {
@@ -56,8 +56,9 @@ public class BlockPlacementPolicyWithNod
@Override
public void initialize(Configuration conf, FSClusterStats stats,
- NetworkTopology clusterMap) {
- super.initialize(conf, stats, clusterMap);
+ NetworkTopology clusterMap,
+ Host2NodesMap host2datanodeMap) {
+ super.initialize(conf, stats, clusterMap, host2datanodeMap);
}
/** choose local node of localMachine as the target.
@@ -69,7 +70,8 @@ public class BlockPlacementPolicyWithNod
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
- StorageType storageType) throws NotEnoughReplicasException {
+ StorageType storageType, boolean fallbackToLocalRack
+ ) throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
@@ -96,6 +98,10 @@ public class BlockPlacementPolicyWithNod
if (chosenStorage != null) {
return chosenStorage;
}
+
+ if (!fallbackToLocalRack) {
+ return null;
+ }
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
@@ -241,6 +247,36 @@ public class BlockPlacementPolicyWithNod
countOfExcludedNodes++;
}
}
+
+ countOfExcludedNodes += addDependentNodesToExcludedNodes(
+ chosenNode, excludedNodes);
+ return countOfExcludedNodes;
+ }
+
+ /**
+ * Add all nodes from a dependent nodes list to excludedNodes.
+ * @return number of new excluded nodes
+ */
+ private int addDependentNodesToExcludedNodes(DatanodeDescriptor chosenNode,
+ Set<Node> excludedNodes) {
+ if (this.host2datanodeMap == null) {
+ return 0;
+ }
+ int countOfExcludedNodes = 0;
+ for(String hostname : chosenNode.getDependentHostNames()) {
+ DatanodeDescriptor node =
+ this.host2datanodeMap.getDataNodeByHostName(hostname);
+ if(node!=null) {
+ if (excludedNodes.add(node)) {
+ countOfExcludedNodes++;
+ }
+ } else {
+ LOG.warn("Not able to find datanode " + hostname
+ + " which has dependency with datanode "
+ + chosenNode.getHostName());
+ }
+ }
+
return countOfExcludedNodes;
}
@@ -255,9 +291,9 @@ public class BlockPlacementPolicyWithNod
* If first is empty, then pick second.
*/
@Override
- public Collection<DatanodeDescriptor> pickupReplicaSet(
- Collection<DatanodeDescriptor> first,
- Collection<DatanodeDescriptor> second) {
+ public Collection<DatanodeStorageInfo> pickupReplicaSet(
+ Collection<DatanodeStorageInfo> first,
+ Collection<DatanodeStorageInfo> second) {
// If no replica within same rack, return directly.
if (first.isEmpty()) {
return second;
@@ -265,25 +301,24 @@ public class BlockPlacementPolicyWithNod
// Split data nodes in the first set into two sets,
// moreThanOne contains nodes on nodegroup with more than one replica
// exactlyOne contains the remaining nodes
- Map<String, List<DatanodeDescriptor>> nodeGroupMap =
- new HashMap<String, List<DatanodeDescriptor>>();
+ Map<String, List<DatanodeStorageInfo>> nodeGroupMap =
+ new HashMap<String, List<DatanodeStorageInfo>>();
- for(DatanodeDescriptor node : first) {
- final String nodeGroupName =
- NetworkTopology.getLastHalf(node.getNetworkLocation());
- List<DatanodeDescriptor> datanodeList =
- nodeGroupMap.get(nodeGroupName);
- if (datanodeList == null) {
- datanodeList = new ArrayList<DatanodeDescriptor>();
- nodeGroupMap.put(nodeGroupName, datanodeList);
+ for(DatanodeStorageInfo storage : first) {
+ final String nodeGroupName = NetworkTopology.getLastHalf(
+ storage.getDatanodeDescriptor().getNetworkLocation());
+ List<DatanodeStorageInfo> storageList = nodeGroupMap.get(nodeGroupName);
+ if (storageList == null) {
+ storageList = new ArrayList<DatanodeStorageInfo>();
+ nodeGroupMap.put(nodeGroupName, storageList);
}
- datanodeList.add(node);
+ storageList.add(storage);
}
- final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
- final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
+ final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
+ final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
// split nodes into two sets
- for(List<DatanodeDescriptor> datanodeList : nodeGroupMap.values()) {
+ for(List<DatanodeStorageInfo> datanodeList : nodeGroupMap.values()) {
if (datanodeList.size() == 1 ) {
// exactlyOne contains nodes on nodegroup with exactly one replica
exactlyOne.add(datanodeList.get(0));
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Tue Aug 19 23:49:39 2014
@@ -23,8 +23,8 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet;
-import org.apache.hadoop.util.LightWeightGSet.SetIterator;
+import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
@@ -217,9 +217,14 @@ class BlocksMap {
BlockInfo currentBlock = blocks.get(newBlock);
assert currentBlock != null : "the block if not in blocksMap";
// replace block in data-node lists
- for(int idx = currentBlock.numNodes()-1; idx >= 0; idx--) {
- DatanodeDescriptor dn = currentBlock.getDatanode(idx);
- dn.replaceBlock(currentBlock, newBlock);
+ for (int i = currentBlock.numNodes() - 1; i >= 0; i--) {
+ final DatanodeDescriptor dn = currentBlock.getDatanode(i);
+ final DatanodeStorageInfo storage = currentBlock.findStorageInfo(dn);
+ final boolean removed = storage.removeBlock(currentBlock);
+ Preconditions.checkState(removed, "currentBlock not found.");
+
+ final boolean added = storage.addBlock(newBlock);
+ Preconditions.checkState(added, "newBlock already exists.");
}
// replace block in the map itself
blocks.put(newBlock);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Tue Aug 19 23:49:39 2014
@@ -33,8 +33,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -53,8 +51,11 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+;
/**
* Scans the namesystem, scheduling blocks to be cached as appropriate.
@@ -65,8 +66,8 @@ import com.google.common.base.Preconditi
@InterfaceAudience.LimitedPrivate({"HDFS"})
public class CacheReplicationMonitor extends Thread implements Closeable {
- private static final Log LOG =
- LogFactory.getLog(CacheReplicationMonitor.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CacheReplicationMonitor.class);
private final FSNamesystem namesystem;
@@ -103,21 +104,21 @@ public class CacheReplicationMonitor ext
private final Condition scanFinished;
/**
- * Whether there are pending CacheManager operations that necessitate a
- * CacheReplicationMonitor rescan. Protected by the CRM lock.
+ * The number of rescans completed. Used to wait for scans to finish.
+ * Protected by the CacheReplicationMonitor lock.
*/
- private boolean needsRescan = true;
+ private long completedScanCount = 0;
/**
- * Whether we are currently doing a rescan. Protected by the CRM lock.
+ * The scan we're currently performing, or -1 if no scan is in progress.
+ * Protected by the CacheReplicationMonitor lock.
*/
- private boolean isScanning = false;
+ private long curScanCount = -1;
/**
- * The number of rescans completed. Used to wait for scans to finish.
- * Protected by the CacheReplicationMonitor lock.
+ * The number of rescans we need to complete. Protected by the CRM lock.
*/
- private long scanCount = 0;
+ private long neededScanCount = 0;
/**
* True if this monitor should terminate. Protected by the CRM lock.
@@ -168,7 +169,7 @@ public class CacheReplicationMonitor ext
LOG.info("Shutting down CacheReplicationMonitor");
return;
}
- if (needsRescan) {
+ if (completedScanCount < neededScanCount) {
LOG.info("Rescanning because of pending operations");
break;
}
@@ -181,8 +182,6 @@ public class CacheReplicationMonitor ext
doRescan.await(delta, TimeUnit.MILLISECONDS);
curTimeMs = Time.monotonicNow();
}
- isScanning = true;
- needsRescan = false;
} finally {
lock.unlock();
}
@@ -193,8 +192,8 @@ public class CacheReplicationMonitor ext
// Update synchronization-related variables.
lock.lock();
try {
- isScanning = false;
- scanCount++;
+ completedScanCount = curScanCount;
+ curScanCount = -1;
scanFinished.signalAll();
} finally {
lock.unlock();
@@ -207,7 +206,7 @@ public class CacheReplicationMonitor ext
LOG.info("Shutting down CacheReplicationMonitor.");
return;
} catch (Throwable t) {
- LOG.fatal("Thread exiting", t);
+ LOG.error("Thread exiting", t);
terminate(1, t);
}
}
@@ -225,16 +224,15 @@ public class CacheReplicationMonitor ext
"Must not hold the FSN write lock when waiting for a rescan.");
Preconditions.checkArgument(lock.isHeldByCurrentThread(),
"Must hold the CRM lock when waiting for a rescan.");
- if (!needsRescan) {
+ if (neededScanCount <= completedScanCount) {
return;
}
// If no scan is already ongoing, mark the CRM as dirty and kick
- if (!isScanning) {
+ if (curScanCount < 0) {
doRescan.signal();
}
// Wait until the scan finishes and the count advances
- final long startCount = scanCount;
- while ((!shutdown) && (startCount >= scanCount)) {
+ while ((!shutdown) && (completedScanCount < neededScanCount)) {
try {
scanFinished.await();
} catch (InterruptedException e) {
@@ -252,7 +250,14 @@ public class CacheReplicationMonitor ext
public void setNeedsRescan() {
Preconditions.checkArgument(lock.isHeldByCurrentThread(),
"Must hold the CRM lock when setting the needsRescan bit.");
- this.needsRescan = true;
+ if (curScanCount >= 0) {
+ // If there is a scan in progress, we need to wait for the scan after
+ // that.
+ neededScanCount = curScanCount + 1;
+ } else {
+ // If there is no scan in progress, we need to wait for the next scan.
+ neededScanCount = completedScanCount + 1;
+ }
}
/**
@@ -281,12 +286,19 @@ public class CacheReplicationMonitor ext
private void rescan() throws InterruptedException {
scannedDirectives = 0;
scannedBlocks = 0;
- namesystem.writeLock();
try {
- if (shutdown) {
- throw new InterruptedException("CacheReplicationMonitor was " +
- "shut down.");
+ namesystem.writeLock();
+ try {
+ lock.lock();
+ if (shutdown) {
+ throw new InterruptedException("CacheReplicationMonitor was " +
+ "shut down.");
+ }
+ curScanCount = completedScanCount + 1;
+ } finally {
+ lock.unlock();
}
+
resetStatistics();
rescanCacheDirectives();
rescanCachedBlockMap();
@@ -316,11 +328,8 @@ public class CacheReplicationMonitor ext
scannedDirectives++;
// Skip processing this entry if it has expired
if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Directive " + directive.getId() + ": the directive " +
- "expired at " + directive.getExpiryTime() + " (now = " +
- now + ")");
- }
+ LOG.debug("Directive {}: the directive expired at {} (now = {})",
+ directive.getId(), directive.getExpiryTime(), now);
continue;
}
String path = directive.getPath();
@@ -329,17 +338,14 @@ public class CacheReplicationMonitor ext
node = fsDir.getINode(path);
} catch (UnresolvedLinkException e) {
// We don't cache through symlinks
- if (LOG.isDebugEnabled()) {
- LOG.debug("Directive " + directive.getId() +
- ": got UnresolvedLinkException while resolving path " + path);
- }
+ LOG.debug("Directive {}: got UnresolvedLinkException while resolving "
+ + "path {}", directive.getId(), path
+ );
continue;
}
if (node == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Directive " + directive.getId() +
- ": No inode found at " + path);
- }
+ LOG.debug("Directive {}: No inode found at {}", directive.getId(),
+ path);
} else if (node.isDirectory()) {
INodeDirectory dir = node.asDirectory();
ReadOnlyList<INode> children = dir
@@ -352,10 +358,8 @@ public class CacheReplicationMonitor ext
} else if (node.isFile()) {
rescanFile(directive, node.asFile());
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Directive " + directive.getId() +
- ": ignoring non-directive, non-file inode " + node);
- }
+ LOG.debug("Directive {}: ignoring non-directive, non-file inode {} ",
+ directive.getId(), node);
}
}
}
@@ -381,15 +385,13 @@ public class CacheReplicationMonitor ext
// do not cache this file.
CachePool pool = directive.getPool();
if (pool.getBytesNeeded() > pool.getLimit()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Directive %d: not scanning file %s because " +
- "bytesNeeded for pool %s is %d, but the pool's limit is %d",
- directive.getId(),
- file.getFullPathName(),
- pool.getPoolName(),
- pool.getBytesNeeded(),
- pool.getLimit()));
- }
+ LOG.debug("Directive {}: not scanning file {} because " +
+ "bytesNeeded for pool {} is {}, but the pool's limit is {}",
+ directive.getId(),
+ file.getFullPathName(),
+ pool.getPoolName(),
+ pool.getBytesNeeded(),
+ pool.getLimit());
return;
}
@@ -397,11 +399,10 @@ public class CacheReplicationMonitor ext
for (BlockInfo blockInfo : blockInfos) {
if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) {
// We don't try to cache blocks that are under construction.
- if (LOG.isTraceEnabled()) {
- LOG.trace("Directive " + directive.getId() + ": can't cache " +
- "block " + blockInfo + " because it is in state " +
- blockInfo.getBlockUCState() + ", not COMPLETE.");
- }
+ LOG.trace("Directive {}: can't cache block {} because it is in state "
+ + "{}, not COMPLETE.", directive.getId(), blockInfo,
+ blockInfo.getBlockUCState()
+ );
continue;
}
Block block = new Block(blockInfo.getBlockId());
@@ -415,7 +416,7 @@ public class CacheReplicationMonitor ext
// Update bytesUsed using the current replication levels.
// Assumptions: we assume that all the blocks are the same length
// on each datanode. We can assume this because we're only caching
- // blocks in state COMMITTED.
+ // blocks in state COMPLETE.
// Note that if two directives are caching the same block(s), they will
// both get them added to their bytesCached.
List<DatanodeDescriptor> cachedOn =
@@ -441,21 +442,16 @@ public class CacheReplicationMonitor ext
ocblock.setReplicationAndMark(directive.getReplication(), mark);
}
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("Directive " + directive.getId() + ": setting replication " +
- "for block " + blockInfo + " to " + ocblock.getReplication());
- }
+ LOG.trace("Directive {}: setting replication for block {} to {}",
+ directive.getId(), blockInfo, ocblock.getReplication());
}
// Increment the "cached" statistics
directive.addBytesCached(cachedTotal);
if (cachedTotal == neededTotal) {
directive.addFilesCached(1);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Directive " + directive.getId() + ": caching " +
- file.getFullPathName() + ": " + cachedTotal + "/" + neededTotal +
- " bytes");
- }
+ LOG.debug("Directive {}: caching {}: {}/{} bytes", directive.getId(),
+ file.getFullPathName(), cachedTotal, neededTotal);
}
private String findReasonForNotCaching(CachedBlock cblock,
@@ -512,11 +508,9 @@ public class CacheReplicationMonitor ext
iter.hasNext(); ) {
DatanodeDescriptor datanode = iter.next();
if (!cblock.isInList(datanode.getCached())) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
- "PENDING_UNCACHED for node " + datanode.getDatanodeUuid() +
- "because the DataNode uncached it.");
- }
+ LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} "
+ + "because the DataNode uncached it.", cblock.getBlockId(),
+ datanode.getDatanodeUuid());
datanode.getPendingUncached().remove(cblock);
iter.remove();
}
@@ -526,10 +520,8 @@ public class CacheReplicationMonitor ext
String reason = findReasonForNotCaching(cblock, blockInfo);
int neededCached = 0;
if (reason != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Block " + cblock.getBlockId() + ": can't cache " +
- "block because it is " + reason);
- }
+ LOG.trace("Block {}: can't cache block because it is {}",
+ cblock.getBlockId(), reason);
} else {
neededCached = cblock.getReplication();
}
@@ -541,12 +533,12 @@ public class CacheReplicationMonitor ext
DatanodeDescriptor datanode = iter.next();
datanode.getPendingCached().remove(cblock);
iter.remove();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
- "PENDING_CACHED for node " + datanode.getDatanodeUuid() +
- "because we already have " + numCached + " cached " +
- "replicas and we only need " + neededCached);
- }
+ LOG.trace("Block {}: removing from PENDING_CACHED for node {}"
+ + "because we already have {} cached replicas and we only" +
+ " need {}",
+ cblock.getBlockId(), datanode.getDatanodeUuid(), numCached,
+ neededCached
+ );
}
}
if (numCached < neededCached) {
@@ -556,12 +548,11 @@ public class CacheReplicationMonitor ext
DatanodeDescriptor datanode = iter.next();
datanode.getPendingUncached().remove(cblock);
iter.remove();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
- "PENDING_UNCACHED for node " + datanode.getDatanodeUuid() +
- "because we only have " + numCached + " cached replicas " +
- "and we need " + neededCached);
- }
+ LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} "
+ + "because we only have {} cached replicas and we need " +
+ "{}", cblock.getBlockId(), datanode.getDatanodeUuid(),
+ numCached, neededCached
+ );
}
}
int neededUncached = numCached -
@@ -581,11 +572,10 @@ public class CacheReplicationMonitor ext
pendingUncached.isEmpty() &&
pendingCached.isEmpty()) {
// we have nothing more to do with this block.
- if (LOG.isTraceEnabled()) {
- LOG.trace("Block " + cblock.getBlockId() + ": removing from " +
- "cachedBlocks, since neededCached == 0, and " +
- "pendingUncached and pendingCached are empty.");
- }
+ LOG.trace("Block {}: removing from cachedBlocks, since neededCached "
+ + "== 0, and pendingUncached and pendingCached are empty.",
+ cblock.getBlockId()
+ );
cbIter.remove();
}
}
@@ -643,18 +633,14 @@ public class CacheReplicationMonitor ext
BlockInfo blockInfo = blockManager.
getStoredBlock(new Block(cachedBlock.getBlockId()));
if (blockInfo == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Block " + cachedBlock.getBlockId() + ": can't add new " +
- "cached replicas, because there is no record of this block " +
- "on the NameNode.");
- }
+ LOG.debug("Block {}: can't add new cached replicas," +
+ " because there is no record of this block " +
+ "on the NameNode.", cachedBlock.getBlockId());
return;
}
if (!blockInfo.isComplete()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Block " + cachedBlock.getBlockId() + ": can't cache this " +
- "block, because it is not yet complete.");
- }
+ LOG.debug("Block {}: can't cache this block, because it is not yet"
+ + " complete.", cachedBlock.getBlockId());
return;
}
// Filter the list of replicas to only the valid targets
@@ -678,7 +664,7 @@ public class CacheReplicationMonitor ext
if (pendingCached.contains(datanode) || cached.contains(datanode)) {
continue;
}
- long pendingCapacity = datanode.getCacheRemaining();
+ long pendingBytes = 0;
// Subtract pending cached blocks from effective capacity
Iterator<CachedBlock> it = datanode.getPendingCached().iterator();
while (it.hasNext()) {
@@ -686,7 +672,7 @@ public class CacheReplicationMonitor ext
BlockInfo info =
blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
if (info != null) {
- pendingCapacity -= info.getNumBytes();
+ pendingBytes -= info.getNumBytes();
}
}
it = datanode.getPendingUncached().iterator();
@@ -696,17 +682,17 @@ public class CacheReplicationMonitor ext
BlockInfo info =
blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
if (info != null) {
- pendingCapacity += info.getNumBytes();
+ pendingBytes += info.getNumBytes();
}
}
+ long pendingCapacity = pendingBytes + datanode.getCacheRemaining();
if (pendingCapacity < blockInfo.getNumBytes()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Block " + blockInfo.getBlockId() + ": DataNode " +
- datanode.getDatanodeUuid() + " is not a valid possibility " +
- "because the block has size " + blockInfo.getNumBytes() + ", but " +
- "the DataNode only has " + datanode.getCacheRemaining() + " " +
- "bytes of cache remaining.");
- }
+ LOG.trace("Block {}: DataNode {} is not a valid possibility " +
+ "because the block has size {}, but the DataNode only has {}" +
+ "bytes of cache remaining ({} pending bytes, {} already cached.",
+ blockInfo.getBlockId(), datanode.getDatanodeUuid(),
+ blockInfo.getNumBytes(), pendingCapacity, pendingBytes,
+ datanode.getCacheRemaining());
outOfCapacity++;
continue;
}
@@ -715,22 +701,20 @@ public class CacheReplicationMonitor ext
List<DatanodeDescriptor> chosen = chooseDatanodesForCaching(possibilities,
neededCached, blockManager.getDatanodeManager().getStaleInterval());
for (DatanodeDescriptor datanode : chosen) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Block " + blockInfo.getBlockId() + ": added to " +
- "PENDING_CACHED on DataNode " + datanode.getDatanodeUuid());
- }
+ LOG.trace("Block {}: added to PENDING_CACHED on DataNode {}",
+ blockInfo.getBlockId(), datanode.getDatanodeUuid());
pendingCached.add(datanode);
boolean added = datanode.getPendingCached().add(cachedBlock);
assert added;
}
// We were unable to satisfy the requested replication factor
if (neededCached > chosen.size()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Block " + blockInfo.getBlockId() + ": we only have " +
- (cachedBlock.getReplication() - neededCached + chosen.size()) +
- " of " + cachedBlock.getReplication() + " cached replicas. " +
- outOfCapacity + " DataNodes have insufficient cache capacity.");
- }
+ LOG.debug("Block {}: we only have {} of {} cached replicas."
+ + " {} DataNodes have insufficient cache capacity.",
+ blockInfo.getBlockId(),
+ (cachedBlock.getReplication() - neededCached + chosen.size()),
+ cachedBlock.getReplication(), outOfCapacity
+ );
}
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java Tue Aug 19 23:49:39 2014
@@ -48,18 +48,6 @@ public class CorruptReplicasMap{
private final SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();
-
- /**
- * Mark the block belonging to datanode as corrupt.
- *
- * @param blk Block to be added to CorruptReplicasMap
- * @param dn DatanodeDescriptor which holds the corrupt replica
- * @param reason a textual reason (for logging purposes)
- */
- public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
- String reason) {
- addToCorruptReplicasMap(blk, dn, reason, Reason.NONE);
- }
/**
* Mark the block belonging to datanode as corrupt.
@@ -69,7 +57,7 @@ public class CorruptReplicasMap{
* @param reason a textual reason (for logging purposes)
* @param reasonCode the enum representation of the reason
*/
- public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
+ void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
String reason, Reason reasonCode) {
Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
if (nodes == null) {
@@ -127,7 +115,6 @@ public class CorruptReplicasMap{
boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
Reason reason) {
Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
- boolean removed = false;
if (datanodes==null)
return false;
@@ -174,12 +161,12 @@ public class CorruptReplicasMap{
return ((nodes != null) && (nodes.contains(node)));
}
- public int numCorruptReplicas(Block blk) {
+ int numCorruptReplicas(Block blk) {
Collection<DatanodeDescriptor> nodes = getNodes(blk);
return (nodes == null) ? 0 : nodes.size();
}
- public int size() {
+ int size() {
return corruptReplicasMap.size();
}