You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2014/08/01 18:58:07 UTC
svn commit: r1615169 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/main/java/org/apache/hadoop/hdfs/server/namenode/
src/test/java/org/apache/hadoop/hdfs/server/blockmanag...
Author: arp
Date: Fri Aug 1 16:58:06 2014
New Revision: 1615169
URL: http://svn.apache.org/r1615169
Log:
HDFS-6794. Update BlockManager methods to use DatanodeStorageInfo where possible. (Arpit Agarwal)
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug 1 16:58:06 2014
@@ -343,6 +343,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6796. Improve the argument check during balancer command line parsing.
(Benoy Antony via szetszwo)
+ HDFS-6794. Update BlockManager methods to use DatanodeStorageInfo
+ where possible (Arpit Agarwal)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Aug 1 16:58:06 2014
@@ -1079,6 +1079,7 @@ public class BlockManager {
* Mark the block belonging to datanode as corrupt
* @param blk Block to be marked as corrupt
* @param dn Datanode which holds the corrupt replica
+ * @param storageID if known, null otherwise.
* @param reason a textual reason why the block should be marked corrupt,
* for logging purposes
*/
@@ -1095,19 +1096,29 @@ public class BlockManager {
+ blk + " not found");
return;
}
- markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
- blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
- dn, storageID);
- }
- private void markBlockAsCorrupt(BlockToMarkCorrupt b,
- DatanodeInfo dn, String storageID) throws IOException {
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
- throw new IOException("Cannot mark " + b
+ throw new IOException("Cannot mark " + blk
+ " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
+ ") does not exist");
}
+
+ markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
+ blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
+ storageID == null ? null : node.getStorageInfo(storageID),
+ node);
+ }
+
+ /**
+ *
+ * @param b
+ * @param storageInfo storage that contains the block, if known. null otherwise.
+ * @throws IOException
+ */
+ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
+ DatanodeStorageInfo storageInfo,
+ DatanodeDescriptor node) throws IOException {
BlockCollection bc = b.corrupted.getBlockCollection();
if (bc == null) {
@@ -1118,7 +1129,9 @@ public class BlockManager {
}
// Add replica to the data-node if it is not already there
- node.addBlock(storageID, b.stored);
+ if (storageInfo != null) {
+ storageInfo.addBlock(b.stored);
+ }
// Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
@@ -1457,7 +1470,7 @@ public class BlockManager {
* @throws IOException
* if the number of targets < minimum replication.
* @see BlockPlacementPolicy#chooseTarget(String, int, Node,
- * List, boolean, Set, long)
+ * List, boolean, Set, long, StorageType)
*/
public DatanodeStorageInfo[] chooseTarget(final String src,
final int numOfReplicas, final DatanodeDescriptor client,
@@ -1694,7 +1707,7 @@ public class BlockManager {
* @throws IOException
*/
public boolean processReport(final DatanodeID nodeID,
- final DatanodeStorage storage, final String poolId,
+ final DatanodeStorage storage,
final BlockListAsLongs newReport) throws IOException {
namesystem.writeLock();
final long startTime = Time.now(); //after acquiring write lock
@@ -1726,9 +1739,9 @@ public class BlockManager {
if (storageInfo.numBlocks() == 0) {
// The first block report can be processed a lot more efficiently than
// ordinary block reports. This shortens restart times.
- processFirstBlockReport(node, storage.getStorageID(), newReport);
+ processFirstBlockReport(storageInfo, newReport);
} else {
- processReport(node, storage, newReport);
+ processReport(storageInfo, newReport);
}
// Now that we have an up-to-date block report, we know that any
@@ -1790,9 +1803,8 @@ public class BlockManager {
}
}
- private void processReport(final DatanodeDescriptor node,
- final DatanodeStorage storage,
- final BlockListAsLongs report) throws IOException {
+ private void processReport(final DatanodeStorageInfo storageInfo,
+ final BlockListAsLongs report) throws IOException {
// Normal case:
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
@@ -1802,19 +1814,20 @@ public class BlockManager {
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
- reportDiff(node, storage, report,
+ reportDiff(storageInfo, report,
toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-
+
+ DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Process the blocks on each queue
for (StatefulBlockInfo b : toUC) {
- addStoredBlockUnderConstruction(b, node, storage.getStorageID());
+ addStoredBlockUnderConstruction(b, storageInfo);
}
for (Block b : toRemove) {
removeStoredBlock(b, node);
}
int numBlocksLogged = 0;
for (BlockInfo b : toAdd) {
- addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog);
+ addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
@@ -1828,7 +1841,7 @@ public class BlockManager {
addToInvalidates(b, node);
}
for (BlockToMarkCorrupt b : toCorrupt) {
- markBlockAsCorrupt(b, node, storage.getStorageID());
+ markBlockAsCorrupt(b, storageInfo, node);
}
}
@@ -1839,16 +1852,16 @@ public class BlockManager {
* a toRemove list (since there won't be any). It also silently discards
* any invalid blocks, thereby deferring their processing until
* the next block report.
- * @param node - DatanodeDescriptor of the node that sent the report
+ * @param storageInfo - DatanodeStorageInfo that sent the report
* @param report - the initial block report, to be processed
* @throws IOException
*/
- private void processFirstBlockReport(final DatanodeDescriptor node,
- final String storageID,
+ private void processFirstBlockReport(
+ final DatanodeStorageInfo storageInfo,
final BlockListAsLongs report) throws IOException {
if (report == null) return;
assert (namesystem.hasWriteLock());
- assert (node.getStorageInfo(storageID).numBlocks() == 0);
+ assert (storageInfo.numBlocks() == 0);
BlockReportIterator itBR = report.getBlockReportIterator();
while(itBR.hasNext()) {
@@ -1857,7 +1870,7 @@ public class BlockManager {
if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(iblk)) {
- queueReportedBlock(node, storageID, iblk, reportedState,
+ queueReportedBlock(storageInfo, iblk, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
continue;
}
@@ -1869,15 +1882,16 @@ public class BlockManager {
// If block is corrupt, mark it and continue to next block.
BlockUCState ucState = storedBlock.getBlockUCState();
BlockToMarkCorrupt c = checkReplicaCorrupt(
- iblk, reportedState, storedBlock, ucState, node);
+ iblk, reportedState, storedBlock, ucState,
+ storageInfo.getDatanodeDescriptor());
if (c != null) {
if (shouldPostponeBlocksFromFuture) {
// In the Standby, we may receive a block report for a file that we
// just have an out-of-date gen-stamp or state for, for example.
- queueReportedBlock(node, storageID, iblk, reportedState,
+ queueReportedBlock(storageInfo, iblk, reportedState,
QUEUE_REASON_CORRUPT_STATE);
} else {
- markBlockAsCorrupt(c, node, storageID);
+ markBlockAsCorrupt(c, storageInfo, storageInfo.getDatanodeDescriptor());
}
continue;
}
@@ -1885,7 +1899,7 @@ public class BlockManager {
// If block is under construction, add this replica to its list
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
- node.getStorageInfo(storageID), iblk, reportedState);
+ storageInfo, iblk, reportedState);
// OpenFileBlocks only inside snapshots also will be added to safemode
// threshold. So we need to update such blocks to safemode
// refer HDFS-5283
@@ -1898,12 +1912,12 @@ public class BlockManager {
}
//add replica if appropriate
if (reportedState == ReplicaState.FINALIZED) {
- addStoredBlockImmediate(storedBlock, node, storageID);
+ addStoredBlockImmediate(storedBlock, storageInfo);
}
}
}
- private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage,
+ private void reportDiff(DatanodeStorageInfo storageInfo,
BlockListAsLongs newReport,
Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
Collection<Block> toRemove, // remove from DatanodeDescriptor
@@ -1911,8 +1925,6 @@ public class BlockManager {
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
- final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID());
-
// place a delimiter in the list which separates blocks
// that have been reported from those that have not
BlockInfo delimiter = new BlockInfo(new Block(), 1);
@@ -1929,7 +1941,7 @@ public class BlockManager {
while(itBR.hasNext()) {
Block iblk = itBR.next();
ReplicaState iState = itBR.getCurrentReplicaState();
- BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(),
+ BlockInfo storedBlock = processReportedBlock(storageInfo,
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
// move block to the head of the list
@@ -1966,7 +1978,7 @@ public class BlockManager {
* BlockInfoUnderConstruction's list of replicas.</li>
* </ol>
*
- * @param dn descriptor for the datanode that made the report
+ * @param storageInfo DatanodeStorageInfo that sent the report.
* @param block reported block replica
* @param reportedState reported replica state
* @param toAdd add to DatanodeDescriptor
@@ -1978,14 +1990,16 @@ public class BlockManager {
* @return the up-to-date stored block, if it should be kept.
* Otherwise, null.
*/
- private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
- final String storageID,
+ private BlockInfo processReportedBlock(
+ final DatanodeStorageInfo storageInfo,
final Block block, final ReplicaState reportedState,
final Collection<BlockInfo> toAdd,
final Collection<Block> toInvalidate,
final Collection<BlockToMarkCorrupt> toCorrupt,
final Collection<StatefulBlockInfo> toUC) {
+ DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
+
if(LOG.isDebugEnabled()) {
LOG.debug("Reported block " + block
+ " on " + dn + " size " + block.getNumBytes()
@@ -1994,7 +2008,7 @@ public class BlockManager {
if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(block)) {
- queueReportedBlock(dn, storageID, block, reportedState,
+ queueReportedBlock(storageInfo, block, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
return null;
}
@@ -2034,7 +2048,7 @@ public class BlockManager {
// TODO: Pretty confident this should be s/storedBlock/block below,
// since we should be postponing the info of the reported block, not
// the stored block. See HDFS-6289 for more context.
- queueReportedBlock(dn, storageID, storedBlock, reportedState,
+ queueReportedBlock(storageInfo, storedBlock, reportedState,
QUEUE_REASON_CORRUPT_STATE);
} else {
toCorrupt.add(c);
@@ -2063,17 +2077,17 @@ public class BlockManager {
* standby node. @see PendingDataNodeMessages.
* @param reason a textual reason to report in the debug logs
*/
- private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
+ private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, String reason) {
assert shouldPostponeBlocksFromFuture;
if (LOG.isDebugEnabled()) {
LOG.debug("Queueing reported block " + block +
" in state " + reportedState +
- " from datanode " + dn + " for later processing " +
- "because " + reason + ".");
+ " from datanode " + storageInfo.getDatanodeDescriptor() +
+ " for later processing because " + reason + ".");
}
- pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState);
+ pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState);
}
/**
@@ -2096,7 +2110,7 @@ public class BlockManager {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing previouly queued message " + rbi);
}
- processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(),
+ processAndHandleReportedBlock(rbi.getStorageInfo(),
rbi.getBlock(), rbi.getReportedState(), null);
}
}
@@ -2216,19 +2230,20 @@ public class BlockManager {
}
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
- DatanodeDescriptor node, String storageID) throws IOException {
+ DatanodeStorageInfo storageInfo) throws IOException {
BlockInfoUnderConstruction block = ucBlock.storedBlock;
- block.addReplicaIfNotPresent(node.getStorageInfo(storageID),
- ucBlock.reportedBlock, ucBlock.reportedState);
+ block.addReplicaIfNotPresent(
+ storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
- if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
- addStoredBlock(block, node, storageID, null, true);
+ if (ucBlock.reportedState == ReplicaState.FINALIZED &&
+ block.findDatanode(storageInfo.getDatanodeDescriptor()) < 0) {
+ addStoredBlock(block, storageInfo, null, true);
}
}
/**
* Faster version of
- * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)}
+ * {@link #addStoredBlock(BlockInfo, DatanodeStorageInfo, DatanodeDescriptor, boolean)}
* , intended for use with initial block report at startup. If not in startup
* safe mode, will call standard addStoredBlock(). Assumes this method is
* called "immediately" so there is no need to refresh the storedBlock from
@@ -2239,17 +2254,17 @@ public class BlockManager {
* @throws IOException
*/
private void addStoredBlockImmediate(BlockInfo storedBlock,
- DatanodeDescriptor node, String storageID)
+ DatanodeStorageInfo storageInfo)
throws IOException {
assert (storedBlock != null && namesystem.hasWriteLock());
if (!namesystem.isInStartupSafeMode()
|| namesystem.isPopulatingReplQueues()) {
- addStoredBlock(storedBlock, node, storageID, null, false);
+ addStoredBlock(storedBlock, storageInfo, null, false);
return;
}
// just add it
- node.addBlock(storageID, storedBlock);
+ storageInfo.addBlock(storedBlock);
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
@@ -2271,13 +2286,13 @@ public class BlockManager {
* @return the block that is stored in blockMap.
*/
private Block addStoredBlock(final BlockInfo block,
- DatanodeDescriptor node,
- String storageID,
+ DatanodeStorageInfo storageInfo,
DatanodeDescriptor delNodeHint,
boolean logEveryBlock)
throws IOException {
assert block != null && namesystem.hasWriteLock();
BlockInfo storedBlock;
+ DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
if (block instanceof BlockInfoUnderConstruction) {
//refresh our copy in case the block got completed in another thread
storedBlock = blocksMap.getStoredBlock(block);
@@ -2297,7 +2312,7 @@ public class BlockManager {
assert bc != null : "Block must belong to a file";
// add block to the datanode
- boolean added = node.addBlock(storageID, storedBlock);
+ boolean added = storageInfo.addBlock(storedBlock);
int curReplicaDelta;
if (added) {
@@ -2843,8 +2858,9 @@ public class BlockManager {
* The given node is reporting that it received a certain block.
*/
@VisibleForTesting
- void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint)
+ void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint)
throws IOException {
+ DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Decrement number of blocks scheduled to this datanode.
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
// RECEIVED_BLOCK), we currently also decrease the approximate number.
@@ -2864,12 +2880,12 @@ public class BlockManager {
// Modify the blocks->datanode map and node's map.
//
pendingReplications.decrement(block, node);
- processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED,
+ processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode);
}
- private void processAndHandleReportedBlock(DatanodeDescriptor node,
- String storageID, Block block,
+ private void processAndHandleReportedBlock(
+ DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, DatanodeDescriptor delHintNode)
throws IOException {
// blockReceived reports a finalized block
@@ -2877,7 +2893,9 @@ public class BlockManager {
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
- processReportedBlock(node, storageID, block, reportedState,
+ final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
+
+ processReportedBlock(storageInfo, block, reportedState,
toAdd, toInvalidate, toCorrupt, toUC);
// the block is only in one of the to-do lists
// if it is in none then data-node already has it
@@ -2885,11 +2903,11 @@ public class BlockManager {
: "The block should be only in one of the lists.";
for (StatefulBlockInfo b : toUC) {
- addStoredBlockUnderConstruction(b, node, storageID);
+ addStoredBlockUnderConstruction(b, storageInfo);
}
long numBlocksLogged = 0;
for (BlockInfo b : toAdd) {
- addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog);
+ addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
@@ -2903,7 +2921,7 @@ public class BlockManager {
addToInvalidates(b, node);
}
for (BlockToMarkCorrupt b : toCorrupt) {
- markBlockAsCorrupt(b, node, storageID);
+ markBlockAsCorrupt(b, storageInfo, node);
}
}
@@ -2930,13 +2948,15 @@ public class BlockManager {
"Got incremental block report from unregistered or dead node");
}
- if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) {
+ DatanodeStorageInfo storageInfo =
+ node.getStorageInfo(srdb.getStorage().getStorageID());
+ if (storageInfo == null) {
// The DataNode is reporting an unknown storage. Usually the NN learns
// about new storages from heartbeats but during NN restart we may
// receive a block report or incremental report before the heartbeat.
// We must handle this for protocol compatibility. This issue was
// uncovered by HDFS-6094.
- node.updateStorage(srdb.getStorage());
+ storageInfo = node.updateStorage(srdb.getStorage());
}
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
@@ -2946,14 +2966,13 @@ public class BlockManager {
deleted++;
break;
case RECEIVED_BLOCK:
- addBlock(node, srdb.getStorage().getStorageID(),
- rdbi.getBlock(), rdbi.getDelHints());
+ addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
received++;
break;
case RECEIVING_BLOCK:
receiving++;
- processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(),
- rdbi.getBlock(), ReplicaState.RBW, null);
+ processAndHandleReportedBlock(storageInfo, rdbi.getBlock(),
+ ReplicaState.RBW, null);
break;
default:
String msg =
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java Fri Aug 1 16:58:06 2014
@@ -207,7 +207,7 @@ public class DatanodeStorageInfo {
return blockPoolUsed;
}
- boolean addBlock(BlockInfo b) {
+ public boolean addBlock(BlockInfo b) {
if(!b.addStorage(this))
return false;
// add to the head of the data-node list
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Fri Aug 1 16:58:06 2014
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
/**
* In the Standby Node, we can receive messages about blocks
@@ -41,14 +42,12 @@ class PendingDataNodeMessages {
static class ReportedBlockInfo {
private final Block block;
- private final DatanodeDescriptor dn;
- private final String storageID;
+ private final DatanodeStorageInfo storageInfo;
private final ReplicaState reportedState;
- ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block,
+ ReportedBlockInfo(DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState) {
- this.dn = dn;
- this.storageID = storageID;
+ this.storageInfo = storageInfo;
this.block = block;
this.reportedState = reportedState;
}
@@ -57,21 +56,18 @@ class PendingDataNodeMessages {
return block;
}
- DatanodeDescriptor getNode() {
- return dn;
- }
-
- String getStorageID() {
- return storageID;
- }
-
ReplicaState getReportedState() {
return reportedState;
}
+
+ DatanodeStorageInfo getStorageInfo() {
+ return storageInfo;
+ }
@Override
public String toString() {
- return "ReportedBlockInfo [block=" + block + ", dn=" + dn
+ return "ReportedBlockInfo [block=" + block + ", dn="
+ + storageInfo.getDatanodeDescriptor()
+ ", reportedState=" + reportedState + "]";
}
}
@@ -87,7 +83,7 @@ class PendingDataNodeMessages {
Queue<ReportedBlockInfo> oldQueue = entry.getValue();
while (!oldQueue.isEmpty()) {
ReportedBlockInfo rbi = oldQueue.remove();
- if (!rbi.getNode().equals(dn)) {
+ if (!rbi.getStorageInfo().getDatanodeDescriptor().equals(dn)) {
newQueue.add(rbi);
} else {
count--;
@@ -97,11 +93,11 @@ class PendingDataNodeMessages {
}
}
- void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
+ void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState) {
block = new Block(block);
getBlockQueue(block).add(
- new ReportedBlockInfo(dn, storageID, block, reportedState));
+ new ReportedBlockInfo(storageInfo, block, reportedState));
count++;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Aug 1 16:58:06 2014
@@ -4355,8 +4355,11 @@ public class FSNamesystem implements Nam
// Otherwise fsck will report these blocks as MISSING, especially if the
// blocksReceived from Datanodes take a long time to arrive.
for (int i = 0; i < trimmedTargets.size(); i++) {
- trimmedTargets.get(i).addBlock(
- trimmedStorages.get(i), storedBlock);
+ DatanodeStorageInfo storageInfo =
+ trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
+ if (storageInfo != null) {
+ storageInfo.addBlock(storedBlock);
+ }
}
}
@@ -5835,7 +5838,7 @@ public class FSNamesystem implements Nam
}
public void processIncrementalBlockReport(final DatanodeID nodeID,
- final String poolId, final StorageReceivedDeletedBlocks srdb)
+ final StorageReceivedDeletedBlocks srdb)
throws IOException {
writeLock();
try {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Fri Aug 1 16:58:06 2014
@@ -1065,7 +1065,7 @@ class NameNodeRpcServer implements Namen
// for the same node and storage, so the value returned by the last
// call of this loop is the final updated value for noStaleStorage.
//
- noStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
+ noStaleStorages = bm.processReport(nodeReg, r.getStorage(), blocks);
metrics.incrStorageBlockReportOps();
}
@@ -1101,7 +1101,7 @@ class NameNodeRpcServer implements Namen
+" blocks.");
}
for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
- namesystem.processIncrementalBlockReport(nodeReg, poolId, r);
+ namesystem.processIncrementalBlockReport(nodeReg, r);
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Fri Aug 1 16:58:06 2014
@@ -368,7 +368,7 @@ public class TestBlockManager {
DatanodeStorageInfo[] pipeline) throws IOException {
for (int i = 1; i < pipeline.length; i++) {
DatanodeStorageInfo storage = pipeline[i];
- bm.addBlock(storage.getDatanodeDescriptor(), storage.getStorageID(), blockInfo, null);
+ bm.addBlock(storage, blockInfo, null);
blockInfo.addStorage(storage);
}
}
@@ -549,12 +549,12 @@ public class TestBlockManager {
// send block report, should be processed
reset(node);
- bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+ bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
assertEquals(1, ds.getBlockReportCount());
// send block report again, should NOT be processed
reset(node);
- bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+ bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
assertEquals(1, ds.getBlockReportCount());
@@ -566,7 +566,7 @@ public class TestBlockManager {
assertEquals(0, ds.getBlockReportCount()); // ready for report again
// send block report, should be processed after restart
reset(node);
- bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+ bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
assertEquals(1, ds.getBlockReportCount());
}
@@ -595,7 +595,7 @@ public class TestBlockManager {
// send block report while pretending to already have blocks
reset(node);
doReturn(1).when(node).numBlocks();
- bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+ bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null));
assertEquals(1, ds.getBlockReportCount());
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java Fri Aug 1 16:58:06 2014
@@ -63,16 +63,16 @@ public class TestDatanodeDescriptor {
assertTrue(storages.length > 0);
final String storageID = storages[0].getStorageID();
// add first block
- assertTrue(dd.addBlock(storageID, blk));
+ assertTrue(storages[0].addBlock(blk));
assertEquals(1, dd.numBlocks());
// remove a non-existent block
assertFalse(dd.removeBlock(blk1));
assertEquals(1, dd.numBlocks());
// add an existent block
- assertFalse(dd.addBlock(storageID, blk));
+ assertFalse(storages[0].addBlock(blk));
assertEquals(1, dd.numBlocks());
// add second block
- assertTrue(dd.addBlock(storageID, blk1));
+ assertTrue(storages[0].addBlock(blk1));
assertEquals(2, dd.numBlocks());
// remove first block
assertTrue(dd.removeBlock(blk));
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java Fri Aug 1 16:58:06 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.DFSTestUti
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.junit.Test;
import com.google.common.base.Joiner;
@@ -43,8 +44,10 @@ public class TestPendingDataNodeMessages
@Test
public void testQueues() {
DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor();
- msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs1, ReplicaState.FINALIZED);
- msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs2, ReplicaState.FINALIZED);
+ DatanodeStorage storage = new DatanodeStorage("STORAGE_ID");
+ DatanodeStorageInfo storageInfo = new DatanodeStorageInfo(fakeDN, storage);
+ msgs.enqueueReportedBlock(storageInfo, block1Gs1, ReplicaState.FINALIZED);
+ msgs.enqueueReportedBlock(storageInfo, block1Gs2, ReplicaState.FINALIZED);
assertEquals(2, msgs.count());
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1615169&r1=1615168&r2=1615169&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Fri Aug 1 16:58:06 2014
@@ -82,7 +82,7 @@ public class TestReplicationPolicy {
private static NameNode namenode;
private static BlockPlacementPolicy replicator;
private static final String filename = "/dummyfile.txt";
- private static DatanodeDescriptor dataNodes[];
+ private static DatanodeDescriptor[] dataNodes;
private static DatanodeStorageInfo[] storages;
// The interval for marking a datanode as stale,
private static final long staleInterval =
@@ -1118,8 +1118,7 @@ public class TestReplicationPolicy {
// Adding this block will increase its current replication, and that will
// remove it from the queue.
bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info,
- ReplicaState.FINALIZED), TestReplicationPolicy.dataNodes[0],
- "STORAGE");
+ ReplicaState.FINALIZED), TestReplicationPolicy.storages[0]);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.