You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2014/08/05 04:31:00 UTC
svn commit: r1615844 [3/5] - in
/hadoop/common/branches/fs-encryption/hadoop-hdfs-project:
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/
hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/
hadoop-hdfs/src/main/j...
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Aug 5 02:30:54 2014
@@ -1082,6 +1082,7 @@ public class BlockManager {
* Mark the block belonging to datanode as corrupt
* @param blk Block to be marked as corrupt
* @param dn Datanode which holds the corrupt replica
+ * @param storageID if known, null otherwise.
* @param reason a textual reason why the block should be marked corrupt,
* for logging purposes
*/
@@ -1098,19 +1099,29 @@ public class BlockManager {
+ blk + " not found");
return;
}
- markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
- blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
- dn, storageID);
- }
- private void markBlockAsCorrupt(BlockToMarkCorrupt b,
- DatanodeInfo dn, String storageID) throws IOException {
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
- throw new IOException("Cannot mark " + b
+ throw new IOException("Cannot mark " + blk
+ " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
+ ") does not exist");
}
+
+ markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
+ blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
+ storageID == null ? null : node.getStorageInfo(storageID),
+ node);
+ }
+
+ /**
+ *
+ * @param b
+ * @param storageInfo storage that contains the block, if known. null otherwise.
+ * @throws IOException
+ */
+ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
+ DatanodeStorageInfo storageInfo,
+ DatanodeDescriptor node) throws IOException {
BlockCollection bc = b.corrupted.getBlockCollection();
if (bc == null) {
@@ -1121,7 +1132,9 @@ public class BlockManager {
}
// Add replica to the data-node if it is not already there
- node.addBlock(storageID, b.stored);
+ if (storageInfo != null) {
+ storageInfo.addBlock(b.stored);
+ }
// Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
@@ -1460,7 +1473,7 @@ public class BlockManager {
* @throws IOException
* if the number of targets < minimum replication.
* @see BlockPlacementPolicy#chooseTarget(String, int, Node,
- * List, boolean, Set, long)
+ * List, boolean, Set, long, StorageType)
*/
public DatanodeStorageInfo[] chooseTarget(final String src,
final int numOfReplicas, final DatanodeDescriptor client,
@@ -1697,7 +1710,7 @@ public class BlockManager {
* @throws IOException
*/
public boolean processReport(final DatanodeID nodeID,
- final DatanodeStorage storage, final String poolId,
+ final DatanodeStorage storage,
final BlockListAsLongs newReport) throws IOException {
namesystem.writeLock();
final long startTime = Time.now(); //after acquiring write lock
@@ -1729,9 +1742,9 @@ public class BlockManager {
if (storageInfo.numBlocks() == 0) {
// The first block report can be processed a lot more efficiently than
// ordinary block reports. This shortens restart times.
- processFirstBlockReport(node, storage.getStorageID(), newReport);
+ processFirstBlockReport(storageInfo, newReport);
} else {
- processReport(node, storage, newReport);
+ processReport(storageInfo, newReport);
}
// Now that we have an up-to-date block report, we know that any
@@ -1793,9 +1806,8 @@ public class BlockManager {
}
}
- private void processReport(final DatanodeDescriptor node,
- final DatanodeStorage storage,
- final BlockListAsLongs report) throws IOException {
+ private void processReport(final DatanodeStorageInfo storageInfo,
+ final BlockListAsLongs report) throws IOException {
// Normal case:
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
@@ -1805,19 +1817,20 @@ public class BlockManager {
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
- reportDiff(node, storage, report,
+ reportDiff(storageInfo, report,
toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-
+
+ DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Process the blocks on each queue
for (StatefulBlockInfo b : toUC) {
- addStoredBlockUnderConstruction(b, node, storage.getStorageID());
+ addStoredBlockUnderConstruction(b, storageInfo);
}
for (Block b : toRemove) {
removeStoredBlock(b, node);
}
int numBlocksLogged = 0;
for (BlockInfo b : toAdd) {
- addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog);
+ addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
@@ -1831,7 +1844,7 @@ public class BlockManager {
addToInvalidates(b, node);
}
for (BlockToMarkCorrupt b : toCorrupt) {
- markBlockAsCorrupt(b, node, storage.getStorageID());
+ markBlockAsCorrupt(b, storageInfo, node);
}
}
@@ -1842,16 +1855,16 @@ public class BlockManager {
* a toRemove list (since there won't be any). It also silently discards
* any invalid blocks, thereby deferring their processing until
* the next block report.
- * @param node - DatanodeDescriptor of the node that sent the report
+ * @param storageInfo - DatanodeStorageInfo that sent the report
* @param report - the initial block report, to be processed
* @throws IOException
*/
- private void processFirstBlockReport(final DatanodeDescriptor node,
- final String storageID,
+ private void processFirstBlockReport(
+ final DatanodeStorageInfo storageInfo,
final BlockListAsLongs report) throws IOException {
if (report == null) return;
assert (namesystem.hasWriteLock());
- assert (node.getStorageInfo(storageID).numBlocks() == 0);
+ assert (storageInfo.numBlocks() == 0);
BlockReportIterator itBR = report.getBlockReportIterator();
while(itBR.hasNext()) {
@@ -1860,7 +1873,7 @@ public class BlockManager {
if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(iblk)) {
- queueReportedBlock(node, storageID, iblk, reportedState,
+ queueReportedBlock(storageInfo, iblk, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
continue;
}
@@ -1872,15 +1885,16 @@ public class BlockManager {
// If block is corrupt, mark it and continue to next block.
BlockUCState ucState = storedBlock.getBlockUCState();
BlockToMarkCorrupt c = checkReplicaCorrupt(
- iblk, reportedState, storedBlock, ucState, node);
+ iblk, reportedState, storedBlock, ucState,
+ storageInfo.getDatanodeDescriptor());
if (c != null) {
if (shouldPostponeBlocksFromFuture) {
// In the Standby, we may receive a block report for a file that we
// just have an out-of-date gen-stamp or state for, for example.
- queueReportedBlock(node, storageID, iblk, reportedState,
+ queueReportedBlock(storageInfo, iblk, reportedState,
QUEUE_REASON_CORRUPT_STATE);
} else {
- markBlockAsCorrupt(c, node, storageID);
+ markBlockAsCorrupt(c, storageInfo, storageInfo.getDatanodeDescriptor());
}
continue;
}
@@ -1888,7 +1902,7 @@ public class BlockManager {
// If block is under construction, add this replica to its list
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
- node.getStorageInfo(storageID), iblk, reportedState);
+ storageInfo, iblk, reportedState);
// OpenFileBlocks only inside snapshots also will be added to safemode
// threshold. So we need to update such blocks to safemode
// refer HDFS-5283
@@ -1901,12 +1915,12 @@ public class BlockManager {
}
//add replica if appropriate
if (reportedState == ReplicaState.FINALIZED) {
- addStoredBlockImmediate(storedBlock, node, storageID);
+ addStoredBlockImmediate(storedBlock, storageInfo);
}
}
}
- private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage,
+ private void reportDiff(DatanodeStorageInfo storageInfo,
BlockListAsLongs newReport,
Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
Collection<Block> toRemove, // remove from DatanodeDescriptor
@@ -1914,8 +1928,6 @@ public class BlockManager {
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
- final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID());
-
// place a delimiter in the list which separates blocks
// that have been reported from those that have not
BlockInfo delimiter = new BlockInfo(new Block(), 1);
@@ -1932,7 +1944,7 @@ public class BlockManager {
while(itBR.hasNext()) {
Block iblk = itBR.next();
ReplicaState iState = itBR.getCurrentReplicaState();
- BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(),
+ BlockInfo storedBlock = processReportedBlock(storageInfo,
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
// move block to the head of the list
@@ -1969,7 +1981,7 @@ public class BlockManager {
* BlockInfoUnderConstruction's list of replicas.</li>
* </ol>
*
- * @param dn descriptor for the datanode that made the report
+ * @param storageInfo DatanodeStorageInfo that sent the report.
* @param block reported block replica
* @param reportedState reported replica state
* @param toAdd add to DatanodeDescriptor
@@ -1981,14 +1993,16 @@ public class BlockManager {
* @return the up-to-date stored block, if it should be kept.
* Otherwise, null.
*/
- private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
- final String storageID,
+ private BlockInfo processReportedBlock(
+ final DatanodeStorageInfo storageInfo,
final Block block, final ReplicaState reportedState,
final Collection<BlockInfo> toAdd,
final Collection<Block> toInvalidate,
final Collection<BlockToMarkCorrupt> toCorrupt,
final Collection<StatefulBlockInfo> toUC) {
+ DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
+
if(LOG.isDebugEnabled()) {
LOG.debug("Reported block " + block
+ " on " + dn + " size " + block.getNumBytes()
@@ -1997,7 +2011,7 @@ public class BlockManager {
if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(block)) {
- queueReportedBlock(dn, storageID, block, reportedState,
+ queueReportedBlock(storageInfo, block, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP);
return null;
}
@@ -2037,7 +2051,7 @@ public class BlockManager {
// TODO: Pretty confident this should be s/storedBlock/block below,
// since we should be postponing the info of the reported block, not
// the stored block. See HDFS-6289 for more context.
- queueReportedBlock(dn, storageID, storedBlock, reportedState,
+ queueReportedBlock(storageInfo, storedBlock, reportedState,
QUEUE_REASON_CORRUPT_STATE);
} else {
toCorrupt.add(c);
@@ -2066,17 +2080,17 @@ public class BlockManager {
* standby node. @see PendingDataNodeMessages.
* @param reason a textual reason to report in the debug logs
*/
- private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
+ private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, String reason) {
assert shouldPostponeBlocksFromFuture;
if (LOG.isDebugEnabled()) {
LOG.debug("Queueing reported block " + block +
" in state " + reportedState +
- " from datanode " + dn + " for later processing " +
- "because " + reason + ".");
+ " from datanode " + storageInfo.getDatanodeDescriptor() +
+ " for later processing because " + reason + ".");
}
- pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState);
+ pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState);
}
/**
@@ -2099,7 +2113,7 @@ public class BlockManager {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing previouly queued message " + rbi);
}
- processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(),
+ processAndHandleReportedBlock(rbi.getStorageInfo(),
rbi.getBlock(), rbi.getReportedState(), null);
}
}
@@ -2156,6 +2170,16 @@ public class BlockManager {
} else {
return null; // not corrupt
}
+ case UNDER_CONSTRUCTION:
+ if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
+ final long reportedGS = reported.getGenerationStamp();
+ return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is "
+ + ucState + " and reported state " + reportedState
+ + ", But reported genstamp " + reportedGS
+ + " does not match genstamp in block map "
+ + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
+ }
+ return null;
default:
return null;
}
@@ -2219,19 +2243,20 @@ public class BlockManager {
}
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
- DatanodeDescriptor node, String storageID) throws IOException {
+ DatanodeStorageInfo storageInfo) throws IOException {
BlockInfoUnderConstruction block = ucBlock.storedBlock;
- block.addReplicaIfNotPresent(node.getStorageInfo(storageID),
- ucBlock.reportedBlock, ucBlock.reportedState);
+ block.addReplicaIfNotPresent(
+ storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
- if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
- addStoredBlock(block, node, storageID, null, true);
+ if (ucBlock.reportedState == ReplicaState.FINALIZED &&
+ block.findDatanode(storageInfo.getDatanodeDescriptor()) < 0) {
+ addStoredBlock(block, storageInfo, null, true);
}
}
/**
* Faster version of
- * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)}
+ * {@link #addStoredBlock(BlockInfo, DatanodeStorageInfo, DatanodeDescriptor, boolean)}
* , intended for use with initial block report at startup. If not in startup
* safe mode, will call standard addStoredBlock(). Assumes this method is
* called "immediately" so there is no need to refresh the storedBlock from
@@ -2242,17 +2267,17 @@ public class BlockManager {
* @throws IOException
*/
private void addStoredBlockImmediate(BlockInfo storedBlock,
- DatanodeDescriptor node, String storageID)
+ DatanodeStorageInfo storageInfo)
throws IOException {
assert (storedBlock != null && namesystem.hasWriteLock());
if (!namesystem.isInStartupSafeMode()
|| namesystem.isPopulatingReplQueues()) {
- addStoredBlock(storedBlock, node, storageID, null, false);
+ addStoredBlock(storedBlock, storageInfo, null, false);
return;
}
// just add it
- node.addBlock(storageID, storedBlock);
+ storageInfo.addBlock(storedBlock);
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
@@ -2274,13 +2299,13 @@ public class BlockManager {
* @return the block that is stored in blockMap.
*/
private Block addStoredBlock(final BlockInfo block,
- DatanodeDescriptor node,
- String storageID,
+ DatanodeStorageInfo storageInfo,
DatanodeDescriptor delNodeHint,
boolean logEveryBlock)
throws IOException {
assert block != null && namesystem.hasWriteLock();
BlockInfo storedBlock;
+ DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
if (block instanceof BlockInfoUnderConstruction) {
//refresh our copy in case the block got completed in another thread
storedBlock = blocksMap.getStoredBlock(block);
@@ -2300,7 +2325,7 @@ public class BlockManager {
assert bc != null : "Block must belong to a file";
// add block to the datanode
- boolean added = node.addBlock(storageID, storedBlock);
+ boolean added = storageInfo.addBlock(storedBlock);
int curReplicaDelta;
if (added) {
@@ -2829,12 +2854,15 @@ public class BlockManager {
} else {
final String[] datanodeUuids = new String[locations.size()];
final String[] storageIDs = new String[datanodeUuids.length];
+ final StorageType[] storageTypes = new StorageType[datanodeUuids.length];
for(int i = 0; i < locations.size(); i++) {
final DatanodeStorageInfo s = locations.get(i);
datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid();
storageIDs[i] = s.getStorageID();
+ storageTypes[i] = s.getStorageType();
}
- results.add(new BlockWithLocations(block, datanodeUuids, storageIDs));
+ results.add(new BlockWithLocations(block, datanodeUuids, storageIDs,
+ storageTypes));
return block.getNumBytes();
}
}
@@ -2843,8 +2871,9 @@ public class BlockManager {
* The given node is reporting that it received a certain block.
*/
@VisibleForTesting
- void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint)
+ void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint)
throws IOException {
+ DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Decrement number of blocks scheduled to this datanode.
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
// RECEIVED_BLOCK), we currently also decrease the approximate number.
@@ -2864,12 +2893,12 @@ public class BlockManager {
// Modify the blocks->datanode map and node's map.
//
pendingReplications.decrement(block, node);
- processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED,
+ processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode);
}
- private void processAndHandleReportedBlock(DatanodeDescriptor node,
- String storageID, Block block,
+ private void processAndHandleReportedBlock(
+ DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, DatanodeDescriptor delHintNode)
throws IOException {
// blockReceived reports a finalized block
@@ -2877,7 +2906,9 @@ public class BlockManager {
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
- processReportedBlock(node, storageID, block, reportedState,
+ final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
+
+ processReportedBlock(storageInfo, block, reportedState,
toAdd, toInvalidate, toCorrupt, toUC);
// the block is only in one of the to-do lists
// if it is in none then data-node already has it
@@ -2885,11 +2916,11 @@ public class BlockManager {
: "The block should be only in one of the lists.";
for (StatefulBlockInfo b : toUC) {
- addStoredBlockUnderConstruction(b, node, storageID);
+ addStoredBlockUnderConstruction(b, storageInfo);
}
long numBlocksLogged = 0;
for (BlockInfo b : toAdd) {
- addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog);
+ addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
@@ -2903,7 +2934,7 @@ public class BlockManager {
addToInvalidates(b, node);
}
for (BlockToMarkCorrupt b : toCorrupt) {
- markBlockAsCorrupt(b, node, storageID);
+ markBlockAsCorrupt(b, storageInfo, node);
}
}
@@ -2930,13 +2961,15 @@ public class BlockManager {
"Got incremental block report from unregistered or dead node");
}
- if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) {
+ DatanodeStorageInfo storageInfo =
+ node.getStorageInfo(srdb.getStorage().getStorageID());
+ if (storageInfo == null) {
// The DataNode is reporting an unknown storage. Usually the NN learns
// about new storages from heartbeats but during NN restart we may
// receive a block report or incremental report before the heartbeat.
// We must handle this for protocol compatibility. This issue was
// uncovered by HDFS-6094.
- node.updateStorage(srdb.getStorage());
+ storageInfo = node.updateStorage(srdb.getStorage());
}
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
@@ -2946,14 +2979,13 @@ public class BlockManager {
deleted++;
break;
case RECEIVED_BLOCK:
- addBlock(node, srdb.getStorage().getStorageID(),
- rdbi.getBlock(), rdbi.getDelHints());
+ addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
received++;
break;
case RECEIVING_BLOCK:
receiving++;
- processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(),
- rdbi.getBlock(), ReplicaState.RBW, null);
+ processAndHandleReportedBlock(storageInfo, rdbi.getBlock(),
+ ReplicaState.RBW, null);
break;
default:
String msg =
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Tue Aug 5 02:30:54 2014
@@ -260,8 +260,8 @@ public class DatanodeDescriptor extends
}
public StorageReport[] getStorageReports() {
- final StorageReport[] reports = new StorageReport[storageMap.size()];
final DatanodeStorageInfo[] infos = getStorageInfos();
+ final StorageReport[] reports = new StorageReport[infos.length];
for(int i = 0; i < infos.length; i++) {
reports[i] = infos[i].toStorageReport();
}
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java Tue Aug 5 02:30:54 2014
@@ -207,7 +207,7 @@ public class DatanodeStorageInfo {
return blockPoolUsed;
}
- boolean addBlock(BlockInfo b) {
+ public boolean addBlock(BlockInfo b) {
if(!b.addStorage(this))
return false;
// add to the head of the data-node list
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Tue Aug 5 02:30:54 2014
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
/**
* In the Standby Node, we can receive messages about blocks
@@ -41,14 +42,12 @@ class PendingDataNodeMessages {
static class ReportedBlockInfo {
private final Block block;
- private final DatanodeDescriptor dn;
- private final String storageID;
+ private final DatanodeStorageInfo storageInfo;
private final ReplicaState reportedState;
- ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block,
+ ReportedBlockInfo(DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState) {
- this.dn = dn;
- this.storageID = storageID;
+ this.storageInfo = storageInfo;
this.block = block;
this.reportedState = reportedState;
}
@@ -57,21 +56,18 @@ class PendingDataNodeMessages {
return block;
}
- DatanodeDescriptor getNode() {
- return dn;
- }
-
- String getStorageID() {
- return storageID;
- }
-
ReplicaState getReportedState() {
return reportedState;
}
+
+ DatanodeStorageInfo getStorageInfo() {
+ return storageInfo;
+ }
@Override
public String toString() {
- return "ReportedBlockInfo [block=" + block + ", dn=" + dn
+ return "ReportedBlockInfo [block=" + block + ", dn="
+ + storageInfo.getDatanodeDescriptor()
+ ", reportedState=" + reportedState + "]";
}
}
@@ -87,7 +83,7 @@ class PendingDataNodeMessages {
Queue<ReportedBlockInfo> oldQueue = entry.getValue();
while (!oldQueue.isEmpty()) {
ReportedBlockInfo rbi = oldQueue.remove();
- if (!rbi.getNode().equals(dn)) {
+ if (!rbi.getStorageInfo().getDatanodeDescriptor().equals(dn)) {
newQueue.add(rbi);
} else {
count--;
@@ -97,11 +93,11 @@ class PendingDataNodeMessages {
}
}
- void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
+ void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState) {
block = new Block(block);
getBlockQueue(block).add(
- new ReportedBlockInfo(dn, storageID, block, reportedState));
+ new ReportedBlockInfo(storageInfo, block, reportedState));
count++;
}
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Tue Aug 5 02:30:54 2014
@@ -21,6 +21,7 @@ import com.google.common.annotations.Vis
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@@ -38,6 +39,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* One instance per block-pool/namespace on the DN, which handles the
@@ -91,6 +94,28 @@ class BPOfferService {
*/
private long lastActiveClaimTxId = -1;
+ private final ReentrantReadWriteLock mReadWriteLock =
+ new ReentrantReadWriteLock();
+ private final Lock mReadLock = mReadWriteLock.readLock();
+ private final Lock mWriteLock = mReadWriteLock.writeLock();
+
+ // utility methods to acquire and release read lock and write lock
+ void readLock() {
+ mReadLock.lock();
+ }
+
+ void readUnlock() {
+ mReadLock.unlock();
+ }
+
+ void writeLock() {
+ mWriteLock.lock();
+ }
+
+ void writeUnlock() {
+ mWriteLock.unlock();
+ }
+
BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
Preconditions.checkArgument(!nnAddrs.isEmpty(),
"Must pass at least one NN.");
@@ -135,14 +160,19 @@ class BPOfferService {
}
return false;
}
-
- synchronized String getBlockPoolId() {
- if (bpNSInfo != null) {
- return bpNSInfo.getBlockPoolID();
- } else {
- LOG.warn("Block pool ID needed, but service not yet registered with NN",
- new Exception("trace"));
- return null;
+
+ String getBlockPoolId() {
+ readLock();
+ try {
+ if (bpNSInfo != null) {
+ return bpNSInfo.getBlockPoolID();
+ } else {
+ LOG.warn("Block pool ID needed, but service not yet registered with NN",
+ new Exception("trace"));
+ return null;
+ }
+ } finally {
+ readUnlock();
}
}
@@ -150,27 +180,37 @@ class BPOfferService {
return getNamespaceInfo() != null;
}
- synchronized NamespaceInfo getNamespaceInfo() {
- return bpNSInfo;
+ NamespaceInfo getNamespaceInfo() {
+ readLock();
+ try {
+ return bpNSInfo;
+ } finally {
+ readUnlock();
+ }
}
@Override
- public synchronized String toString() {
- if (bpNSInfo == null) {
- // If we haven't yet connected to our NN, we don't yet know our
- // own block pool ID.
- // If _none_ of the block pools have connected yet, we don't even
- // know the DatanodeID ID of this DN.
- String datanodeUuid = dn.getDatanodeUuid();
+ public String toString() {
+ readLock();
+ try {
+ if (bpNSInfo == null) {
+ // If we haven't yet connected to our NN, we don't yet know our
+ // own block pool ID.
+ // If _none_ of the block pools have connected yet, we don't even
+ // know the DatanodeID ID of this DN.
+ String datanodeUuid = dn.getDatanodeUuid();
- if (datanodeUuid == null || datanodeUuid.isEmpty()) {
- datanodeUuid = "unassigned";
+ if (datanodeUuid == null || datanodeUuid.isEmpty()) {
+ datanodeUuid = "unassigned";
+ }
+ return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
+ } else {
+ return "Block pool " + getBlockPoolId() +
+ " (Datanode Uuid " + dn.getDatanodeUuid() +
+ ")";
}
- return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
- } else {
- return "Block pool " + getBlockPoolId() +
- " (Datanode Uuid " + dn.getDatanodeUuid() +
- ")";
+ } finally {
+ readUnlock();
}
}
@@ -266,32 +306,37 @@ class BPOfferService {
* verifies that this namespace matches (eg to prevent a misconfiguration
* where a StandbyNode from a different cluster is specified)
*/
- synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
- if (this.bpNSInfo == null) {
- this.bpNSInfo = nsInfo;
- boolean success = false;
-
- // Now that we know the namespace ID, etc, we can pass this to the DN.
- // The DN can now initialize its local storage if we are the
- // first BP to handshake, etc.
- try {
- dn.initBlockPool(this);
- success = true;
- } finally {
- if (!success) {
- // The datanode failed to initialize the BP. We need to reset
- // the namespace info so that other BPService actors still have
- // a chance to set it, and re-initialize the datanode.
- this.bpNSInfo = null;
+ void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
+ writeLock();
+ try {
+ if (this.bpNSInfo == null) {
+ this.bpNSInfo = nsInfo;
+ boolean success = false;
+
+ // Now that we know the namespace ID, etc, we can pass this to the DN.
+ // The DN can now initialize its local storage if we are the
+ // first BP to handshake, etc.
+ try {
+ dn.initBlockPool(this);
+ success = true;
+ } finally {
+ if (!success) {
+ // The datanode failed to initialize the BP. We need to reset
+ // the namespace info so that other BPService actors still have
+ // a chance to set it, and re-initialize the datanode.
+ this.bpNSInfo = null;
+ }
}
+ } else {
+ checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
+ "Blockpool ID");
+ checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
+ "Namespace ID");
+ checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
+ "Cluster ID");
}
- } else {
- checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
- "Blockpool ID");
- checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
- "Namespace ID");
- checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
- "Cluster ID");
+ } finally {
+ writeUnlock();
}
}
@@ -300,22 +345,27 @@ class BPOfferService {
* NN, it calls this function to verify that the NN it connected to
* is consistent with other NNs serving the block-pool.
*/
- synchronized void registrationSucceeded(BPServiceActor bpServiceActor,
+ void registrationSucceeded(BPServiceActor bpServiceActor,
DatanodeRegistration reg) throws IOException {
- if (bpRegistration != null) {
- checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
- reg.getStorageInfo().getNamespaceID(), "namespace ID");
- checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
- reg.getStorageInfo().getClusterID(), "cluster ID");
- } else {
- bpRegistration = reg;
- }
-
- dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
- // Add the initial block token secret keys to the DN's secret manager.
- if (dn.isBlockTokenEnabled) {
- dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
- reg.getExportedKeys());
+ writeLock();
+ try {
+ if (bpRegistration != null) {
+ checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
+ reg.getStorageInfo().getNamespaceID(), "namespace ID");
+ checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
+ reg.getStorageInfo().getClusterID(), "cluster ID");
+ } else {
+ bpRegistration = reg;
+ }
+
+ dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
+ // Add the initial block token secret keys to the DN's secret manager.
+ if (dn.isBlockTokenEnabled) {
+ dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
+ reg.getExportedKeys());
+ }
+ } finally {
+ writeUnlock();
}
}
@@ -333,25 +383,35 @@ class BPOfferService {
}
}
- synchronized DatanodeRegistration createRegistration() {
- Preconditions.checkState(bpNSInfo != null,
- "getRegistration() can only be called after initial handshake");
- return dn.createBPRegistration(bpNSInfo);
+ DatanodeRegistration createRegistration() {
+ writeLock();
+ try {
+ Preconditions.checkState(bpNSInfo != null,
+ "getRegistration() can only be called after initial handshake");
+ return dn.createBPRegistration(bpNSInfo);
+ } finally {
+ writeUnlock();
+ }
}
/**
* Called when an actor shuts down. If this is the last actor
* to shut down, shuts down the whole blockpool in the DN.
*/
- synchronized void shutdownActor(BPServiceActor actor) {
- if (bpServiceToActive == actor) {
- bpServiceToActive = null;
- }
+ void shutdownActor(BPServiceActor actor) {
+ writeLock();
+ try {
+ if (bpServiceToActive == actor) {
+ bpServiceToActive = null;
+ }
- bpServices.remove(actor);
+ bpServices.remove(actor);
- if (bpServices.isEmpty()) {
- dn.shutdownBlockPool(this);
+ if (bpServices.isEmpty()) {
+ dn.shutdownBlockPool(this);
+ }
+ } finally {
+ writeUnlock();
}
}
@@ -392,11 +452,16 @@ class BPOfferService {
* @return a proxy to the active NN, or null if the BPOS has not
* acknowledged any NN as active yet.
*/
- synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() {
- if (bpServiceToActive != null) {
- return bpServiceToActive.bpNamenode;
- } else {
- return null;
+ DatanodeProtocolClientSideTranslatorPB getActiveNN() {
+ readLock();
+ try {
+ if (bpServiceToActive != null) {
+ return bpServiceToActive.bpNamenode;
+ } else {
+ return null;
+ }
+ } finally {
+ readUnlock();
}
}
@@ -424,45 +489,50 @@ class BPOfferService {
* @param actor the actor which received the heartbeat
* @param nnHaState the HA-related heartbeat contents
*/
- synchronized void updateActorStatesFromHeartbeat(
+ void updateActorStatesFromHeartbeat(
BPServiceActor actor,
NNHAStatusHeartbeat nnHaState) {
- final long txid = nnHaState.getTxId();
-
- final boolean nnClaimsActive =
- nnHaState.getState() == HAServiceState.ACTIVE;
- final boolean bposThinksActive = bpServiceToActive == actor;
- final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;
-
- if (nnClaimsActive && !bposThinksActive) {
- LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
- "txid=" + txid);
- if (!isMoreRecentClaim) {
- // Split-brain scenario - an NN is trying to claim active
- // state when a different NN has already claimed it with a higher
- // txid.
- LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
- txid + " but there was already a more recent claim at txid=" +
- lastActiveClaimTxId);
- return;
- } else {
- if (bpServiceToActive == null) {
- LOG.info("Acknowledging ACTIVE Namenode " + actor);
+ writeLock();
+ try {
+ final long txid = nnHaState.getTxId();
+
+ final boolean nnClaimsActive =
+ nnHaState.getState() == HAServiceState.ACTIVE;
+ final boolean bposThinksActive = bpServiceToActive == actor;
+ final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;
+
+ if (nnClaimsActive && !bposThinksActive) {
+ LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
+ "txid=" + txid);
+ if (!isMoreRecentClaim) {
+ // Split-brain scenario - an NN is trying to claim active
+ // state when a different NN has already claimed it with a higher
+ // txid.
+ LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
+ txid + " but there was already a more recent claim at txid=" +
+ lastActiveClaimTxId);
+ return;
} else {
- LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
- bpServiceToActive + " at higher txid=" + txid);
+ if (bpServiceToActive == null) {
+ LOG.info("Acknowledging ACTIVE Namenode " + actor);
+ } else {
+ LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
+ bpServiceToActive + " at higher txid=" + txid);
+ }
+ bpServiceToActive = actor;
}
- bpServiceToActive = actor;
+ } else if (!nnClaimsActive && bposThinksActive) {
+ LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
+ "txid=" + nnHaState.getTxId());
+ bpServiceToActive = null;
}
- } else if (!nnClaimsActive && bposThinksActive) {
- LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
- "txid=" + nnHaState.getTxId());
- bpServiceToActive = null;
- }
-
- if (bpServiceToActive == actor) {
- assert txid >= lastActiveClaimTxId;
- lastActiveClaimTxId = txid;
+
+ if (bpServiceToActive == actor) {
+ assert txid >= lastActiveClaimTxId;
+ lastActiveClaimTxId = txid;
+ }
+ } finally {
+ writeUnlock();
}
}
@@ -533,12 +603,15 @@ class BPOfferService {
actor.reRegister();
return true;
}
- synchronized (this) {
+ writeLock();
+ try {
if (actor == bpServiceToActive) {
return processCommandFromActive(cmd, actor);
} else {
return processCommandFromStandby(cmd, actor);
}
+ } finally {
+ writeUnlock();
}
}
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java Tue Aug 5 02:30:54 2014
@@ -152,7 +152,7 @@ public class BlockPoolSliceStorage exten
// During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup.
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
- doTransition(getStorageDir(idx), nsInfo, startOpt);
+ doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
assert getCTime() == nsInfo.getCTime()
: "Data-node and name-node CTimes must be the same.";
}
@@ -242,7 +242,7 @@ public class BlockPoolSliceStorage exten
* @param startOpt startup option
* @throws IOException
*/
- private void doTransition(StorageDirectory sd,
+ private void doTransition(DataNode datanode, StorageDirectory sd,
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
if (startOpt == StartupOption.ROLLBACK) {
doRollback(sd, nsInfo); // rollback if applicable
@@ -275,7 +275,7 @@ public class BlockPoolSliceStorage exten
}
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION
|| this.cTime < nsInfo.getCTime()) {
- doUpgrade(sd, nsInfo); // upgrade
+ doUpgrade(datanode, sd, nsInfo); // upgrade
return;
}
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
@@ -304,7 +304,8 @@ public class BlockPoolSliceStorage exten
* @param nsInfo Namespace Info from the namenode
* @throws IOException on error
*/
- void doUpgrade(StorageDirectory bpSd, NamespaceInfo nsInfo) throws IOException {
+ void doUpgrade(DataNode datanode, StorageDirectory bpSd, NamespaceInfo nsInfo)
+ throws IOException {
// Upgrading is applicable only to release with federation or after
if (!DataNodeLayoutVersion.supports(
LayoutVersion.Feature.FEDERATION, layoutVersion)) {
@@ -312,7 +313,7 @@ public class BlockPoolSliceStorage exten
}
LOG.info("Upgrading block pool storage directory " + bpSd.getRoot()
+ ".\n old LV = " + this.getLayoutVersion() + "; old CTime = "
- + this.getCTime() + ".\n new LV = " + nsInfo.getLayoutVersion()
+ + this.getCTime() + ".\n new LV = " + HdfsConstants.DATANODE_LAYOUT_VERSION
+ "; new CTime = " + nsInfo.getCTime());
// get <SD>/previous directory
String dnRoot = getDataNodeStorageRoot(bpSd.getRoot().getCanonicalPath());
@@ -340,7 +341,7 @@ public class BlockPoolSliceStorage exten
rename(bpCurDir, bpTmpDir);
// 3. Create new <SD>/current with block files hardlinks and VERSION
- linkAllBlocks(bpTmpDir, bpCurDir);
+ linkAllBlocks(datanode, bpTmpDir, bpCurDir);
this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
assert this.namespaceID == nsInfo.getNamespaceID()
: "Data-node and name-node layout versions must be the same.";
@@ -517,14 +518,15 @@ public class BlockPoolSliceStorage exten
* @param toDir the current data directory
* @throws IOException if error occurs during hardlink
*/
- private void linkAllBlocks(File fromDir, File toDir) throws IOException {
+ private void linkAllBlocks(DataNode datanode, File fromDir, File toDir)
+ throws IOException {
// do the link
int diskLayoutVersion = this.getLayoutVersion();
// hardlink finalized blocks in tmpDir
HardLink hardLink = new HardLink();
- DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED),
+ DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED),
new File(toDir,DataStorage.STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
- DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_RBW),
+ DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_RBW),
new File(toDir, DataStorage.STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
LOG.info( hardLink.linkStats.report() );
}
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Aug 5 02:30:54 2014
@@ -253,7 +253,7 @@ class BlockReceiver implements Closeable
if (cause != null) { // possible disk error
ioe = cause;
- datanode.checkDiskError();
+ datanode.checkDiskErrorAsync();
}
throw ioe;
@@ -329,7 +329,7 @@ class BlockReceiver implements Closeable
}
// disk check
if(ioe != null) {
- datanode.checkDiskError();
+ datanode.checkDiskErrorAsync();
throw ioe;
}
}
@@ -639,7 +639,7 @@ class BlockReceiver implements Closeable
manageWriterOsCache(offsetInBlock);
}
} catch (IOException iex) {
- datanode.checkDiskError();
+ datanode.checkDiskErrorAsync();
throw iex;
}
}
@@ -1208,7 +1208,7 @@ class BlockReceiver implements Closeable
} catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) {
- datanode.checkDiskError();
+ datanode.checkDiskErrorAsync();
LOG.info(myString, e);
running = false;
if (!Thread.interrupted()) { // failure not caused by interruption
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Aug 5 02:30:54 2014
@@ -1075,6 +1075,11 @@ public class DataNode extends Configured
// In the case that this is the first block pool to connect, initialize
// the dataset, block scanners, etc.
initStorage(nsInfo);
+
+ // Exclude failed disks before initializing the block pools to avoid startup
+ // failures.
+ checkDiskError();
+
initPeriodicScanners(conf);
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
@@ -1510,9 +1515,9 @@ public class DataNode extends Configured
/**
- * Check if there is a disk failure and if so, handle the error
+ * Check if there is a disk failure asynchronously and if so, handle the error
*/
- public void checkDiskError() {
+ public void checkDiskErrorAsync() {
synchronized(checkDiskErrorMutex) {
checkDiskErrorFlag = true;
if(checkDiskErrorThread == null) {
@@ -1821,7 +1826,7 @@ public class DataNode extends Configured
LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
targets[0] + " got ", ie);
// check if there are any disk problem
- checkDiskError();
+ checkDiskErrorAsync();
} finally {
xmitsInProgress.getAndDecrement();
IOUtils.closeStream(blockSender);
@@ -2759,7 +2764,18 @@ public class DataNode extends Configured
public ShortCircuitRegistry getShortCircuitRegistry() {
return shortCircuitRegistry;
}
-
+
+ /**
+ * Check the disk error
+ */
+ private void checkDiskError() {
+ try {
+ data.checkDataDir();
+ } catch (DiskErrorException de) {
+ handleDiskError(de.getMessage());
+ }
+ }
+
/**
* Starts a new thread which will check for disk error check request
* every 5 sec
@@ -2776,9 +2792,7 @@ public class DataNode extends Configured
}
if(tempFlag) {
try {
- data.checkDataDir();
- } catch (DiskErrorException de) {
- handleDiskError(de.getMessage());
+ checkDiskError();
} catch (Exception e) {
LOG.warn("Unexpected exception occurred while checking disk error " + e);
checkDiskErrorThread = null;
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java Tue Aug 5 02:30:54 2014
@@ -62,7 +62,10 @@ public class DataNodeLayoutVersion {
* </ul>
*/
public static enum Feature implements LayoutFeature {
- FIRST_LAYOUT(-55, -53, "First datanode layout", false);
+ FIRST_LAYOUT(-55, -53, "First datanode layout", false),
+ BLOCKID_BASED_LAYOUT(-56,
+ "The block ID of a finalized block uniquely determines its position " +
+ "in the directory structure");
private final FeatureInfo info;
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Tue Aug 5 02:30:54 2014
@@ -18,13 +18,19 @@
package org.apache.hadoop.hdfs.server.datanode;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
@@ -35,13 +41,30 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
-import java.io.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
/**
* Data storage information file.
@@ -261,6 +284,7 @@ public class DataStorage extends Storage
STORAGE_DIR_CURRENT));
bpDataDirs.add(bpRoot);
}
+
// mkdir for the list of BlockPoolStorage
makeBlockPoolDataDir(bpDataDirs, null);
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
@@ -488,7 +512,7 @@ public class DataStorage extends Storage
// do upgrade
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
- doUpgrade(sd, nsInfo); // upgrade
+ doUpgrade(datanode, sd, nsInfo); // upgrade
return;
}
@@ -523,7 +547,8 @@ public class DataStorage extends Storage
* @param sd storage directory
* @throws IOException on error
*/
- void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
+ void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo)
+ throws IOException {
// If the existing on-disk layout version supportes federation, simply
// update its layout version.
if (DataNodeLayoutVersion.supports(
@@ -568,7 +593,8 @@ public class DataStorage extends Storage
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(),
nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
bpStorage.format(curDir, nsInfo);
- linkAllBlocks(tmpDir, bbwDir, new File(curBpDir, STORAGE_DIR_CURRENT));
+ linkAllBlocks(datanode, tmpDir, bbwDir, new File(curBpDir,
+ STORAGE_DIR_CURRENT));
// 4. Write version file under <SD>/current
layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
@@ -746,22 +772,22 @@ public class DataStorage extends Storage
*
* @throws IOException If error occurs during hardlink
*/
- private void linkAllBlocks(File fromDir, File fromBbwDir, File toDir)
- throws IOException {
+ private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir,
+ File toDir) throws IOException {
HardLink hardLink = new HardLink();
// do the link
int diskLayoutVersion = this.getLayoutVersion();
if (DataNodeLayoutVersion.supports(
LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
// hardlink finalized blocks in tmpDir/finalized
- linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED),
+ linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED),
new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
// hardlink rbw blocks in tmpDir/rbw
- linkBlocks(new File(fromDir, STORAGE_DIR_RBW),
+ linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW),
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
} else { // pre-RBW version
// hardlink finalized blocks in tmpDir
- linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
+ linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
diskLayoutVersion, hardLink);
if (fromBbwDir.exists()) {
/*
@@ -770,15 +796,67 @@ public class DataStorage extends Storage
* NOT underneath the 'current' directory in those releases. See
* HDFS-3731 for details.
*/
- linkBlocks(fromBbwDir,
+ linkBlocks(datanode, fromBbwDir,
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
}
}
LOG.info( hardLink.linkStats.report() );
}
+
+ private static class LinkArgs {
+ public File src;
+ public File dst;
+
+ public LinkArgs(File src, File dst) {
+ this.src = src;
+ this.dst = dst;
+ }
+ }
+
+ static void linkBlocks(DataNode datanode, File from, File to, int oldLV,
+ HardLink hl) throws IOException {
+ boolean upgradeToIdBasedLayout = false;
+ // If we are upgrading from a version older than the one where we introduced
+ // block ID-based layout AND we're working with the finalized directory,
+ // we'll need to upgrade from the old flat layout to the block ID-based one
+ if (oldLV > DataNodeLayoutVersion.Feature.BLOCKID_BASED_LAYOUT.getInfo().
+ getLayoutVersion() && to.getName().equals(STORAGE_DIR_FINALIZED)) {
+ upgradeToIdBasedLayout = true;
+ }
+
+ final List<LinkArgs> idBasedLayoutSingleLinks = Lists.newArrayList();
+ linkBlocksHelper(from, to, oldLV, hl, upgradeToIdBasedLayout, to,
+ idBasedLayoutSingleLinks);
+ int numLinkWorkers = datanode.getConf().getInt(
+ DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY,
+ DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS);
+ ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers);
+ final int step = idBasedLayoutSingleLinks.size() / numLinkWorkers + 1;
+ List<Future<Void>> futures = Lists.newArrayList();
+ for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) {
+ final int iCopy = i;
+ futures.add(linkWorkers.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ int upperBound = Math.min(iCopy + step,
+ idBasedLayoutSingleLinks.size());
+ for (int j = iCopy; j < upperBound; j++) {
+ LinkArgs cur = idBasedLayoutSingleLinks.get(j);
+ NativeIO.link(cur.src, cur.dst);
+ }
+ return null;
+ }
+ }));
+ }
+ linkWorkers.shutdown();
+ for (Future<Void> f : futures) {
+ Futures.get(f, IOException.class);
+ }
+ }
- static void linkBlocks(File from, File to, int oldLV, HardLink hl)
- throws IOException {
+ static void linkBlocksHelper(File from, File to, int oldLV, HardLink hl,
+ boolean upgradeToIdBasedLayout, File blockRoot,
+ List<LinkArgs> idBasedLayoutSingleLinks) throws IOException {
if (!from.exists()) {
return;
}
@@ -805,9 +883,6 @@ public class DataStorage extends Storage
// from is a directory
hl.linkStats.countDirs++;
- if (!to.mkdirs())
- throw new IOException("Cannot create directory " + to);
-
String[] blockNames = from.list(new java.io.FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
@@ -815,12 +890,36 @@ public class DataStorage extends Storage
}
});
+ // If we are upgrading to block ID-based layout, we don't want to recreate
+ // any subdirs from the source that contain blocks, since we have a new
+ // directory structure
+ if (!upgradeToIdBasedLayout || !to.getName().startsWith(
+ BLOCK_SUBDIR_PREFIX)) {
+ if (!to.mkdirs())
+ throw new IOException("Cannot create directory " + to);
+ }
+
// Block files just need hard links with the same file names
// but a different directory
if (blockNames.length > 0) {
- HardLink.createHardLinkMult(from, blockNames, to);
- hl.linkStats.countMultLinks++;
- hl.linkStats.countFilesMultLinks += blockNames.length;
+ if (upgradeToIdBasedLayout) {
+ for (String blockName : blockNames) {
+ long blockId = Block.getBlockId(blockName);
+ File blockLocation = DatanodeUtil.idToBlockDir(blockRoot, blockId);
+ if (!blockLocation.exists()) {
+ if (!blockLocation.mkdirs()) {
+ throw new IOException("Failed to mkdirs " + blockLocation);
+ }
+ }
+ idBasedLayoutSingleLinks.add(new LinkArgs(new File(from, blockName),
+ new File(blockLocation, blockName)));
+ hl.linkStats.countSingleLinks++;
+ }
+ } else {
+ HardLink.createHardLinkMult(from, blockNames, to);
+ hl.linkStats.countMultLinks++;
+ hl.linkStats.countFilesMultLinks += blockNames.length;
+ }
} else {
hl.linkStats.countEmptyDirs++;
}
@@ -834,8 +933,9 @@ public class DataStorage extends Storage
}
});
for(int i = 0; i < otherNames.length; i++)
- linkBlocks(new File(from, otherNames[i]),
- new File(to, otherNames[i]), oldLV, hl);
+ linkBlocksHelper(new File(from, otherNames[i]),
+ new File(to, otherNames[i]), oldLV, hl, upgradeToIdBasedLayout,
+ blockRoot, idBasedLayoutSingleLinks);
}
/**
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java Tue Aug 5 02:30:54 2014
@@ -30,6 +30,8 @@ public class DatanodeUtil {
public static final String DISK_ERROR = "Possible disk error: ";
+ private static final String SEP = System.getProperty("file.separator");
+
/** Get the cause of an I/O exception if caused by a possible disk error
* @param ioe an I/O exception
* @return cause if the I/O exception is caused by a possible disk error;
@@ -78,4 +80,38 @@ public class DatanodeUtil {
public static File getUnlinkTmpFile(File f) {
return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
}
+
+ /**
+ * Checks whether there are any files anywhere in the directory tree rooted
+ * at dir (directories don't count as files). dir must exist
+ * @return true if there are no files
+ * @throws IOException if unable to list subdirectories
+ */
+ public static boolean dirNoFilesRecursive(File dir) throws IOException {
+ File[] contents = dir.listFiles();
+ if (contents == null) {
+ throw new IOException("Cannot list contents of " + dir);
+ }
+ for (File f : contents) {
+ if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Get the directory where a finalized block with this ID should be stored.
+ * Do not attempt to create the directory.
+ * @param root the root directory where finalized blocks are stored
+ * @param blockId
+ * @return
+ */
+ public static File idToBlockDir(File root, long blockId) {
+ int d1 = (int)((blockId >> 16) & 0xff);
+ int d2 = (int)((blockId >> 8) & 0xff);
+ String path = DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP +
+ DataStorage.BLOCK_SUBDIR_PREFIX + d2;
+ return new File(root, path);
+ }
}
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Tue Aug 5 02:30:54 2014
@@ -54,10 +54,10 @@ abstract public class ReplicaInfo extend
private File baseDir;
/**
- * Ints representing the sub directory path from base dir to the directory
- * containing this replica.
+ * Whether or not this replica's parent directory includes subdirs, in which
+ * case we can generate them based on the replica's block ID
*/
- private int[] subDirs;
+ private boolean hasSubdirs;
private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
@@ -151,18 +151,8 @@ abstract public class ReplicaInfo extend
* @return the parent directory path where this replica is located
*/
File getDir() {
- if (subDirs == null) {
- return null;
- }
-
- StringBuilder sb = new StringBuilder();
- for (int i : subDirs) {
- sb.append(DataStorage.BLOCK_SUBDIR_PREFIX);
- sb.append(i);
- sb.append("/");
- }
- File ret = new File(baseDir, sb.toString());
- return ret;
+ return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir,
+ getBlockId()) : baseDir;
}
/**
@@ -175,54 +165,46 @@ abstract public class ReplicaInfo extend
private void setDirInternal(File dir) {
if (dir == null) {
- subDirs = null;
baseDir = null;
return;
}
- ReplicaDirInfo replicaDirInfo = parseSubDirs(dir);
- this.subDirs = replicaDirInfo.subDirs;
+ ReplicaDirInfo dirInfo = parseBaseDir(dir);
+ this.hasSubdirs = dirInfo.hasSubidrs;
synchronized (internedBaseDirs) {
- if (!internedBaseDirs.containsKey(replicaDirInfo.baseDirPath)) {
+ if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) {
// Create a new String path of this file and make a brand new File object
// to guarantee we drop the reference to the underlying char[] storage.
- File baseDir = new File(replicaDirInfo.baseDirPath);
- internedBaseDirs.put(replicaDirInfo.baseDirPath, baseDir);
+ File baseDir = new File(dirInfo.baseDirPath);
+ internedBaseDirs.put(dirInfo.baseDirPath, baseDir);
}
- this.baseDir = internedBaseDirs.get(replicaDirInfo.baseDirPath);
+ this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath);
}
}
-
+
@VisibleForTesting
public static class ReplicaDirInfo {
- @VisibleForTesting
public String baseDirPath;
-
- @VisibleForTesting
- public int[] subDirs;
+ public boolean hasSubidrs;
+
+ public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) {
+ this.baseDirPath = baseDirPath;
+ this.hasSubidrs = hasSubidrs;
+ }
}
@VisibleForTesting
- public static ReplicaDirInfo parseSubDirs(File dir) {
- ReplicaDirInfo ret = new ReplicaDirInfo();
+ public static ReplicaDirInfo parseBaseDir(File dir) {
File currentDir = dir;
- List<Integer> subDirList = new ArrayList<Integer>();
+ boolean hasSubdirs = false;
while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
- // Prepend the integer into the list.
- subDirList.add(0, Integer.parseInt(currentDir.getName().replaceFirst(
- DataStorage.BLOCK_SUBDIR_PREFIX, "")));
+ hasSubdirs = true;
currentDir = currentDir.getParentFile();
}
- ret.subDirs = new int[subDirList.size()];
- for (int i = 0; i < subDirList.size(); i++) {
- ret.subDirs[i] = subDirList.get(i);
- }
-
- ret.baseDirPath = currentDir.getAbsolutePath();
- return ret;
+ return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
}
/**
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java Tue Aug 5 02:30:54 2014
@@ -59,7 +59,8 @@ class BlockPoolSlice {
private final String bpid;
private final FsVolumeImpl volume; // volume to which this BlockPool belongs to
private final File currentDir; // StorageDirectory/current/bpid/current
- private final LDir finalizedDir; // directory store Finalized replica
+ // directory where finalized replicas are stored
+ private final File finalizedDir;
private final File rbwDir; // directory store RBW replica
private final File tmpDir; // directory store Temporary replica
private static final String DU_CACHE_FILE = "dfsUsed";
@@ -82,8 +83,13 @@ class BlockPoolSlice {
this.bpid = bpid;
this.volume = volume;
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
- final File finalizedDir = new File(
+ this.finalizedDir = new File(
currentDir, DataStorage.STORAGE_DIR_FINALIZED);
+ if (!this.finalizedDir.exists()) {
+ if (!this.finalizedDir.mkdirs()) {
+ throw new IOException("Failed to mkdirs " + this.finalizedDir);
+ }
+ }
// Files that were being written when the datanode was last shutdown
// are now moved back to the data directory. It is possible that
@@ -95,10 +101,6 @@ class BlockPoolSlice {
FileUtil.fullyDelete(tmpDir);
}
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
- final int maxBlocksPerDir = conf.getInt(
- DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
- DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
- this.finalizedDir = new LDir(finalizedDir, maxBlocksPerDir);
if (!rbwDir.mkdirs()) { // create rbw directory if not exist
if (!rbwDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + rbwDir.toString());
@@ -131,7 +133,7 @@ class BlockPoolSlice {
}
File getFinalizedDir() {
- return finalizedDir.dir;
+ return finalizedDir;
}
File getRbwDir() {
@@ -239,26 +241,57 @@ class BlockPoolSlice {
}
File addBlock(Block b, File f) throws IOException {
- File blockFile = finalizedDir.addBlock(b, f);
+ File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
+ if (!blockDir.exists()) {
+ if (!blockDir.mkdirs()) {
+ throw new IOException("Failed to mkdirs " + blockDir);
+ }
+ }
+ File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
return blockFile;
}
void checkDirs() throws DiskErrorException {
- finalizedDir.checkDirTree();
+ DiskChecker.checkDirs(finalizedDir);
DiskChecker.checkDir(tmpDir);
DiskChecker.checkDir(rbwDir);
}
void getVolumeMap(ReplicaMap volumeMap) throws IOException {
// add finalized replicas
- finalizedDir.getVolumeMap(bpid, volumeMap, volume);
+ addToReplicasMap(volumeMap, finalizedDir, true);
// add rbw replicas
addToReplicasMap(volumeMap, rbwDir, false);
}
/**
+ * Recover an unlinked tmp file on datanode restart. If the original block
+ * does not exist, then the tmp file is renamed to be the
+ * original file name and the original name is returned; otherwise the tmp
+ * file is deleted and null is returned.
+ */
+ File recoverTempUnlinkedBlock(File unlinkedTmp) throws IOException {
+ File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp);
+ if (blockFile.exists()) {
+ // If the original block file still exists, then no recovery is needed.
+ if (!unlinkedTmp.delete()) {
+ throw new IOException("Unable to cleanup unlinked tmp file " +
+ unlinkedTmp);
+ }
+ return null;
+ } else {
+ if (!unlinkedTmp.renameTo(blockFile)) {
+ throw new IOException("Unable to rename unlinked tmp file " +
+ unlinkedTmp);
+ }
+ return blockFile;
+ }
+ }
+
+
+ /**
* Add replicas under the given directory to the volume map
* @param volumeMap the replicas map
* @param dir an input directory
@@ -267,23 +300,34 @@ class BlockPoolSlice {
*/
void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
) throws IOException {
- File blockFiles[] = FileUtil.listFiles(dir);
- for (File blockFile : blockFiles) {
- if (!Block.isBlockFilename(blockFile))
+ File files[] = FileUtil.listFiles(dir);
+ for (File file : files) {
+ if (file.isDirectory()) {
+ addToReplicasMap(volumeMap, file, isFinalized);
+ }
+
+ if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
+ file = recoverTempUnlinkedBlock(file);
+ if (file == null) { // the original block still exists, so we cover it
+ // in another iteration and can continue here
+ continue;
+ }
+ }
+ if (!Block.isBlockFilename(file))
continue;
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
- blockFiles, blockFile);
- long blockId = Block.filename2id(blockFile.getName());
+ files, file);
+ long blockId = Block.filename2id(file.getName());
ReplicaInfo newReplica = null;
if (isFinalized) {
newReplica = new FinalizedReplica(blockId,
- blockFile.length(), genStamp, volume, blockFile.getParentFile());
+ file.length(), genStamp, volume, file.getParentFile());
} else {
boolean loadRwr = true;
- File restartMeta = new File(blockFile.getParent() +
- File.pathSeparator + "." + blockFile.getName() + ".restart");
+ File restartMeta = new File(file.getParent() +
+ File.pathSeparator + "." + file.getName() + ".restart");
Scanner sc = null;
try {
sc = new Scanner(restartMeta);
@@ -291,8 +335,8 @@ class BlockPoolSlice {
if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
// It didn't expire. Load the replica as a RBW.
newReplica = new ReplicaBeingWritten(blockId,
- validateIntegrityAndSetLength(blockFile, genStamp),
- genStamp, volume, blockFile.getParentFile(), null);
+ validateIntegrityAndSetLength(file, genStamp),
+ genStamp, volume, file.getParentFile(), null);
loadRwr = false;
}
sc.close();
@@ -301,7 +345,7 @@ class BlockPoolSlice {
restartMeta.getPath());
}
} catch (FileNotFoundException fnfe) {
- // nothing to do here
+ // nothing to do hereFile dir =
} finally {
if (sc != null) {
sc.close();
@@ -310,15 +354,15 @@ class BlockPoolSlice {
// Restart meta doesn't exist or expired.
if (loadRwr) {
newReplica = new ReplicaWaitingToBeRecovered(blockId,
- validateIntegrityAndSetLength(blockFile, genStamp),
- genStamp, volume, blockFile.getParentFile());
+ validateIntegrityAndSetLength(file, genStamp),
+ genStamp, volume, file.getParentFile());
}
}
ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
if (oldReplica != null) {
FsDatasetImpl.LOG.warn("Two block files with the same block id exist " +
- "on disk: " + oldReplica.getBlockFile() + " and " + blockFile );
+ "on disk: " + oldReplica.getBlockFile() + " and " + file );
}
}
}
@@ -405,10 +449,6 @@ class BlockPoolSlice {
}
}
- void clearPath(File f) {
- finalizedDir.clearPath(f);
- }
-
@Override
public String toString() {
return currentDir.getAbsolutePath();
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Tue Aug 5 02:30:54 2014
@@ -1151,7 +1151,7 @@ class FsDatasetImpl implements FsDataset
return f;
// if file is not null, but doesn't exist - possibly disk failed
- datanode.checkDiskError();
+ datanode.checkDiskErrorAsync();
}
if (LOG.isDebugEnabled()) {
@@ -1224,13 +1224,6 @@ class FsDatasetImpl implements FsDataset
+ ". Parent not found for file " + f);
continue;
}
- ReplicaState replicaState = info.getState();
- if (replicaState == ReplicaState.FINALIZED ||
- (replicaState == ReplicaState.RUR &&
- ((ReplicaUnderRecovery)info).getOriginalReplica().getState() ==
- ReplicaState.FINALIZED)) {
- v.clearPath(bpid, parent);
- }
volumeMap.remove(bpid, invalidBlks[i]);
}
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java Tue Aug 5 02:30:54 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -235,10 +236,6 @@ class FsVolumeImpl implements FsVolumeSp
// dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
bp.addToReplicasMap(volumeMap, dir, isFinalized);
}
-
- void clearPath(String bpid, File f) throws IOException {
- getBlockPoolSlice(bpid).clearPath(f);
- }
@Override
public String toString() {
@@ -274,7 +271,8 @@ class FsVolumeImpl implements FsVolumeSp
File finalizedDir = new File(bpCurrentDir,
DataStorage.STORAGE_DIR_FINALIZED);
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
- if (finalizedDir.exists() && FileUtil.list(finalizedDir).length != 0) {
+ if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive(
+ finalizedDir)) {
return false;
}
if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
@@ -301,7 +299,8 @@ class FsVolumeImpl implements FsVolumeSp
if (!rbwDir.delete()) {
throw new IOException("Failed to delete " + rbwDir);
}
- if (!finalizedDir.delete()) {
+ if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) ||
+ !FileUtil.fullyDelete(finalizedDir)) {
throw new IOException("Failed to delete " + finalizedDir);
}
FileUtil.fullyDelete(tmpDir);
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Aug 5 02:30:54 2014
@@ -1103,9 +1103,6 @@ public class FSDirectory implements Clos
count++;
}
- // update inodeMap
- removeFromInodeMap(Arrays.asList(allSrcInodes));
-
trgInode.setModificationTime(timestamp, trgLatestSnapshot);
trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
// update quota on the parent directory ('count' files removed, 0 space)
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1615844&r1=1615843&r2=1615844&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Aug 5 02:30:54 2014
@@ -4585,8 +4585,11 @@ public class FSNamesystem implements Nam
// Otherwise fsck will report these blocks as MISSING, especially if the
// blocksReceived from Datanodes take a long time to arrive.
for (int i = 0; i < trimmedTargets.size(); i++) {
- trimmedTargets.get(i).addBlock(
- trimmedStorages.get(i), storedBlock);
+ DatanodeStorageInfo storageInfo =
+ trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
+ if (storageInfo != null) {
+ storageInfo.addBlock(storedBlock);
+ }
}
}
@@ -6066,7 +6069,7 @@ public class FSNamesystem implements Nam
}
public void processIncrementalBlockReport(final DatanodeID nodeID,
- final String poolId, final StorageReceivedDeletedBlocks srdb)
+ final StorageReceivedDeletedBlocks srdb)
throws IOException {
writeLock();
try {
@@ -8824,6 +8827,29 @@ public class FSNamesystem implements Nam
}
}
+ void checkAccess(String src, FsAction mode) throws AccessControlException,
+ FileNotFoundException, UnresolvedLinkException, IOException {
+ checkOperation(OperationCategory.READ);
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ readLock();
+ try {
+ checkOperation(OperationCategory.READ);
+ src = FSDirectory.resolvePath(src, pathComponents, dir);
+ if (dir.getINode(src) == null) {
+ throw new FileNotFoundException("Path not found");
+ }
+ if (isPermissionEnabled) {
+ FSPermissionChecker pc = getPermissionChecker();
+ checkPathAccess(pc, src, mode);
+ }
+ } catch (AccessControlException e) {
+ logAuditEvent(false, "checkAccess", src);
+ throw e;
+ } finally {
+ readUnlock();
+ }
+ }
+
/**
* Default AuditLogger implementation; used when no access logger is
* defined in the config file. It can also be explicitly listed in the