You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/10/01 00:57:34 UTC
svn commit: r820487 [4/6] - in /hadoop/hdfs/branches/branch-0.21: ./
.eclipse.templates/.launches/ lib/ src/contrib/block_forensics/
src/contrib/block_forensics/client/ src/contrib/block_forensics/ivy/
src/contrib/block_forensics/src/ src/contrib/block...
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Wed Sep 30 22:57:30 2009
@@ -22,7 +22,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -37,9 +36,11 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
-import org.apache.hadoop.security.AccessTokenHandler;
/**
* Keeps information related to the blocks stored in the Hadoop cluster.
@@ -237,6 +238,80 @@
}
/**
+ * Commit the last block of the file.
+ *
+ * @param fileINode file inode
+ * @param commitBlock - contains client reported block length and generation
+ * @throws IOException if the block does not have at least a minimal number
+ * of replicas reported from data-nodes.
+ */
+ void commitLastBlock(INodeFileUnderConstruction fileINode,
+ Block commitBlock) throws IOException {
+ if(commitBlock == null)
+ return; // not committing, this is a block allocation retry
+ BlockInfo lastBlock = fileINode.getLastBlock();
+ if(lastBlock == null)
+ return; // no blocks in file yet
+ if(lastBlock.isComplete())
+ return; // already completed (e.g. by syncBlock)
+ assert lastBlock.getNumBytes() <= commitBlock.getNumBytes() :
+ "commitBlock length is less than the stored one "
+ + commitBlock.getNumBytes() + " vs. " + lastBlock.getNumBytes();
+ ((BlockInfoUnderConstruction)lastBlock).commitBlock(commitBlock);
+ }
+
+ /**
+ * Convert a specified block of the file to a complete block.
+ * @param fileINode file
+ * @param blkIndex block index in the file
+ * @throws IOException if the block does not have at least a minimal number
+ * of replicas reported from data-nodes.
+ */
+ BlockInfo completeBlock(INodeFile fileINode, int blkIndex)
+ throws IOException {
+ if(blkIndex < 0)
+ return null;
+ BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
+ if(curBlock.isComplete())
+ return curBlock;
+ BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
+ if(ucBlock.numNodes() < minReplication)
+ throw new IOException("Cannot complete block: " +
+ "block does not satisfy minimal replication requirement.");
+ BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
+ // replace penultimate block in file
+ fileINode.setBlock(blkIndex, completeBlock);
+ // replace block in the blocksMap
+ return blocksMap.replaceBlock(completeBlock);
+ }
+
+ BlockInfo completeBlock(INodeFile fileINode, BlockInfo block)
+ throws IOException {
+ BlockInfo[] fileBlocks = fileINode.getBlocks();
+ for(int idx = 0; idx < fileBlocks.length; idx++)
+ if(fileBlocks[idx] == block) {
+ return completeBlock(fileINode, idx);
+ }
+ return block;
+ }
+
+ /**
+ * Convert the last block of the file to an under construction block.
+ * @param fileINode file
+ * @param targets data-nodes that will form the pipeline for this block
+ */
+ void convertLastBlockToUnderConstruction(
+ INodeFileUnderConstruction fileINode,
+ DatanodeDescriptor[] targets) throws IOException {
+ BlockInfo oldBlock = fileINode.getLastBlock();
+ if(oldBlock == null)
+ return;
+ BlockInfoUnderConstruction ucBlock =
+ fileINode.setLastBlock(oldBlock, targets);
+ blocksMap.replaceBlock(ucBlock);
+ }
+
+ /**
* Get all valid locations of the block
*/
ArrayList<String> getValidLocations(Block block) {
@@ -254,7 +329,7 @@
return machineSet;
}
- List<LocatedBlock> getBlockLocations(Block[] blocks, long offset,
+ List<LocatedBlock> getBlockLocations(BlockInfo[] blocks, long offset,
long length, int nrBlocksToReturn) throws IOException {
int curBlk = 0;
long curPos = 0, blkSize = 0;
@@ -269,43 +344,12 @@
}
if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file
- return null;
+ return Collections.<LocatedBlock>emptyList();
long endOff = offset + length;
List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
do {
- // get block locations
- int numNodes = blocksMap.numNodes(blocks[curBlk]);
- int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
- int numCorruptReplicas = corruptReplicas
- .numCorruptReplicas(blocks[curBlk]);
- if (numCorruptNodes != numCorruptReplicas) {
- FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for "
- + blocks[curBlk] + "blockMap has " + numCorruptNodes
- + " but corrupt replicas map has " + numCorruptReplicas);
- }
- boolean blockCorrupt = (numCorruptNodes == numNodes);
- int numMachineSet = blockCorrupt ? numNodes :
- (numNodes - numCorruptNodes);
- DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
- if (numMachineSet > 0) {
- numNodes = 0;
- for (Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
- DatanodeDescriptor dn = it.next();
- boolean replicaCorrupt =
- corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
- if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
- machineSet[numNodes++] = dn;
- }
- }
- LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
- blockCorrupt);
- if (namesystem.isAccessTokenEnabled) {
- b.setAccessToken(namesystem.accessTokenHandler.generateToken(b.getBlock()
- .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.READ)));
- }
- results.add(b);
+ results.add(getBlockLocation(blocks[curBlk], curPos));
curPos += blocks[curBlk].getNumBytes();
curBlk++;
} while (curPos < endOff
@@ -314,6 +358,41 @@
return results;
}
+ /** @return a LocatedBlock for the given block */
+ LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
+ ) throws IOException {
+ if (!blk.isComplete()) {
+ final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
+ final DatanodeDescriptor[] locations = uc.getExpectedLocations();
+ return namesystem.createLocatedBlock(uc, locations, pos, false);
+ }
+
+ // get block locations
+ final int numCorruptNodes = countNodes(blk).corruptReplicas();
+ final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
+ if (numCorruptNodes != numCorruptReplicas) {
+ FSNamesystem.LOG.warn("Inconsistent number of corrupt replicas for "
+ + blk + " blockMap has " + numCorruptNodes
+ + " but corrupt replicas map has " + numCorruptReplicas);
+ }
+
+ final int numNodes = blocksMap.numNodes(blk);
+ final boolean isCorrupt = numCorruptNodes == numNodes;
+ final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
+ final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
+ if (numMachines > 0) {
+ int j = 0;
+ for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blk);
+ it.hasNext();) {
+ final DatanodeDescriptor d = it.next();
+ final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
+ if (isCorrupt || (!isCorrupt && !replicaCorrupt))
+ machines[j++] = d;
+ }
+ }
+ return namesystem.createLocatedBlock(blk, machines, pos, isCorrupt);
+ }
+
/**
* Check whether the replication parameter is within the range
* determined by system configuration.
@@ -369,7 +448,7 @@
pendingDeletionBlocksCount++;
if (log) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
- + b.getBlockName() + " to " + dn.getName());
+ + b + " to " + dn.getName());
}
}
}
@@ -399,7 +478,7 @@
}
if (datanodes.length() != 0) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
- + b.getBlockName() + " to " + datanodes.toString());
+ + b + " to " + datanodes.toString());
}
}
@@ -457,6 +536,10 @@
addToInvalidates(storedBlock, node);
return;
}
+
+ // Add replica to the data-node if it is not already there
+ node.addBlock(storedBlock);
+
// Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(storedBlock, node);
if (countNodes(storedBlock).liveReplicas() > inode.getReplication()) {
@@ -885,7 +968,8 @@
Collection<Block> toAdd = new LinkedList<Block>();
Collection<Block> toRemove = new LinkedList<Block>();
Collection<Block> toInvalidate = new LinkedList<Block>();
- node.reportDiff(blocksMap, report, toAdd, toRemove, toInvalidate);
+ Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
+ node.reportDiff(this, report, toAdd, toRemove, toInvalidate, toCorrupt);
for (Block b : toRemove) {
removeStoredBlock(b, node);
@@ -899,6 +983,9 @@
+ " does not belong to any file.");
addToInvalidates(b, node);
}
+ for (BlockInfo b : toCorrupt) {
+ markBlockAsCorrupt(b, node);
+ }
}
/**
@@ -908,7 +995,8 @@
*/
private Block addStoredBlock(final Block block,
DatanodeDescriptor node,
- DatanodeDescriptor delNodeHint) {
+ DatanodeDescriptor delNodeHint)
+ throws IOException {
BlockInfo storedBlock = blocksMap.getStoredBlock(block);
if (storedBlock == null || storedBlock.getINode() == null) {
// If this block does not belong to anyfile, then we are done.
@@ -1020,13 +1108,17 @@
int numCurrentReplica = numLiveReplicas
+ pendingReplications.getNumReplicas(storedBlock);
+ if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
+ numLiveReplicas >= minReplication)
+ storedBlock = completeBlock(fileINode, storedBlock);
+
// check whether safe replication is reached for the block
- namesystem.incrementSafeBlockCount(numCurrentReplica);
+ // only complete blocks are counted towards that
+ if(storedBlock.isComplete())
+ namesystem.incrementSafeBlockCount(numCurrentReplica);
- //
- // if file is being actively written to, then do not check
- // replication-factor here. It will be checked when the file is closed.
- //
+ // if file is under construction, then check whether the block
+ // can be completed
if (fileINode.isUnderConstruction()) {
return storedBlock;
}
@@ -1253,7 +1345,30 @@
// Modify the blocks->datanode map and node's map.
//
pendingReplications.remove(block);
- addStoredBlock(block, node, delHintNode);
+
+ // blockReceived reports a finalized block
+ Collection<Block> toAdd = new LinkedList<Block>();
+ Collection<Block> toInvalidate = new LinkedList<Block>();
+ Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
+ node.processReportedBlock(this, block, ReplicaState.FINALIZED,
+ toAdd, toInvalidate, toCorrupt);
+ // the block is only in one of the lists
+ // if it is in none then data-node already has it
+ assert toAdd.size() + toInvalidate.size() <= 1 :
+ "The block should be only in one of the lists.";
+
+ for (Block b : toAdd) {
+ addStoredBlock(b, node, delHintNode);
+ }
+ for (Block b : toInvalidate) {
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.addBlock: block "
+ + b + " on " + node.getName() + " size " + b.getNumBytes()
+ + " does not belong to any file.");
+ addToInvalidates(b, node);
+ }
+ for (BlockInfo b : toCorrupt) {
+ markBlockAsCorrupt(b, node);
+ }
}
/**
@@ -1351,6 +1466,14 @@
return blocksMap.getStoredBlock(block);
}
+ /**
+ * Find the block by block ID.
+ */
+ BlockInfo findStoredBlock(long blockId) {
+ Block wildcardBlock = new Block(blockId, 0, GenerationStamp.WILDCARD_STAMP);
+ return blocksMap.getStoredBlock(wildcardBlock);
+ }
+
/* updates a block in under replication queue */
void updateNeededReplications(Block block, int curReplicasDelta,
int expectedReplicasDelta) {
@@ -1522,7 +1645,7 @@
return Math.max(missingBlocksInPrevIter, missingBlocksInCurIter);
}
- BlockInfo addINode(Block block, INodeFile iNode) {
+ BlockInfo addINode(BlockInfo block, INodeFile iNode) {
return blocksMap.addINode(block, iNode);
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java Wed Sep 30 22:57:30 2009
@@ -75,11 +75,10 @@
/**
* Add block b belonging to the specified file inode to the map.
*/
- BlockInfo addINode(Block b, INodeFile iNode) {
- int replication = iNode.getReplication();
+ BlockInfo addINode(BlockInfo b, INodeFile iNode) {
BlockInfo info = map.get(b);
- if (info == null) {
- info = new BlockInfo(b, replication);
+ if (info != b) {
+ info = b;
map.put(info, info);
}
info.setINode(iNode);
@@ -191,4 +190,23 @@
float getLoadFactor() {
return loadFactor;
}
+
+ /**
+ * Replace a block in the block map by a new block.
+ * The new block and the old one have the same key.
+ * @param newBlock - block for replacement
+ * @return new block
+ */
+ BlockInfo replaceBlock(BlockInfo newBlock) {
+ BlockInfo currentBlock = map.get(newBlock);
+ assert currentBlock != null : "the block if not in blocksMap";
+ // replace block in data-node lists
+ for(int idx = currentBlock.numNodes()-1; idx >= 0; idx--) {
+ DatanodeDescriptor dn = currentBlock.getDatanode(idx);
+ dn.replaceBlock(currentBlock, newBlock);
+ }
+ // replace block in the map itself
+ map.put(newBlock, newBlock);
+ return newBlock;
+ }
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Wed Sep 30 22:57:30 2009
@@ -25,7 +25,11 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
@@ -55,29 +59,36 @@
}
/** A BlockTargetPair queue. */
- private static class BlockQueue {
- private final Queue<BlockTargetPair> blockq = new LinkedList<BlockTargetPair>();
+ private static class BlockQueue<E> {
+ private final Queue<E> blockq = new LinkedList<E>();
/** Size of the queue */
synchronized int size() {return blockq.size();}
/** Enqueue */
- synchronized boolean offer(Block block, DatanodeDescriptor[] targets) {
- return blockq.offer(new BlockTargetPair(block, targets));
+ synchronized boolean offer(E e) {
+ return blockq.offer(e);
}
/** Dequeue */
- synchronized List<BlockTargetPair> poll(int numBlocks) {
+ synchronized List<E> poll(int numBlocks) {
if (numBlocks <= 0 || blockq.isEmpty()) {
return null;
}
- List<BlockTargetPair> results = new ArrayList<BlockTargetPair>();
+ List<E> results = new ArrayList<E>();
for(; !blockq.isEmpty() && numBlocks > 0; numBlocks--) {
results.add(blockq.poll());
}
return results;
}
+
+ /**
+ * Returns <tt>true</tt> if the queue contains the specified element.
+ */
+ boolean contains(E e) {
+ return blockq.contains(e);
+ }
}
private volatile BlockInfo blockList = null;
@@ -87,9 +98,10 @@
protected boolean needKeyUpdate = false;
/** A queue of blocks to be replicated by this datanode */
- private BlockQueue replicateBlocks = new BlockQueue();
+ private BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>();
/** A queue of blocks to be recovered by this datanode */
- private BlockQueue recoverBlocks = new BlockQueue();
+ private BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
+ new BlockQueue<BlockInfoUnderConstruction>();
/** A set of blocks to be invalidated by this datanode */
private Set<Block> invalidateBlocks = new TreeSet<Block>();
@@ -201,6 +213,21 @@
blockList = b.listInsert(blockList, this);
}
+ /**
+ * Replace specified old block with a new one in the DataNodeDescriptor.
+ *
+ * @param oldBlock - block to be replaced
+ * @param newBlock - a replacement block
+ * @return the new block
+ */
+ BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) {
+ boolean done = removeBlock(oldBlock);
+ assert done : "Old block should belong to the data-node when replacing";
+ done = addBlock(newBlock);
+ assert done : "New block should not belong to the data-node when replacing";
+ return newBlock;
+ }
+
void resetBlocks() {
this.capacity = 0;
this.remaining = 0;
@@ -262,15 +289,20 @@
*/
void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
assert(block != null && targets != null && targets.length > 0);
- replicateBlocks.offer(block, targets);
+ replicateBlocks.offer(new BlockTargetPair(block, targets));
}
/**
* Store block recovery work.
*/
- void addBlockToBeRecovered(Block block, DatanodeDescriptor[] targets) {
- assert(block != null && targets != null && targets.length > 0);
- recoverBlocks.offer(block, targets);
+ void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
+ if(recoverBlocks.contains(block)) {
+ // this prevents adding the same block twice to the recovery queue
+ FSNamesystem.LOG.info("Block " + block +
+ " is already in the recovery queue.");
+ return;
+ }
+ recoverBlocks.offer(block);
}
/**
@@ -308,10 +340,16 @@
new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
}
- BlockCommand getLeaseRecoveryCommand(int maxTransfers) {
- List<BlockTargetPair> blocktargetlist = recoverBlocks.poll(maxTransfers);
- return blocktargetlist == null? null:
- new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist);
+ BlockRecoveryCommand getLeaseRecoveryCommand(int maxTransfers) {
+ List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
+ if(blocks == null)
+ return null;
+ BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.size());
+ for(BlockInfoUnderConstruction b : blocks) {
+ brCommand.add(new RecoveringBlock(
+ b, b.getExpectedLocations(), b.getBlockRecoveryId()));
+ }
+ return brCommand;
}
/**
@@ -361,11 +399,12 @@
return blockarray;
}
- void reportDiff(BlocksMap blocksMap,
+ void reportDiff(BlockManager blockManager,
BlockListAsLongs newReport,
Collection<Block> toAdd, // add to DatanodeDescriptor
Collection<Block> toRemove, // remove from DatanodeDescriptor
- Collection<Block> toInvalidate) { // should be removed from DN
+ Collection<Block> toInvalidate, // should be removed from DN
+ Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
// place a delimiter in the list which separates blocks
// that have been reported from those that have not
BlockInfo delimiter = new BlockInfo(new Block(), 1);
@@ -373,29 +412,16 @@
assert added : "Delimiting block cannot be present in the node";
if(newReport == null)
newReport = new BlockListAsLongs();
- // scan the report and collect newly reported blocks
- // Note we are taking special precaution to limit tmp blocks allocated
- // as part this block report - which why block list is stored as longs
- for (Block iblk : newReport) {
- BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
- if(storedBlock == null) {
- // If block is not in blocksMap it does not belong to any file
- toInvalidate.add(new Block(iblk));
- continue;
- }
- if(storedBlock.findDatanode(this) < 0) {// Known block, but not on the DN
- // if the size differs from what is in the blockmap, then return
- // the new block. addStoredBlock will then pick up the right size of this
- // block and will update the block object in the BlocksMap
- if (storedBlock.getNumBytes() != iblk.getNumBytes()) {
- toAdd.add(new Block(iblk));
- } else {
- toAdd.add(storedBlock);
- }
- continue;
- }
+ // scan the report and process newly reported blocks
+ BlockReportIterator itBR = newReport.getBlockReportIterator();
+ while(itBR.hasNext()) {
+ Block iblk = itBR.next();
+ ReplicaState iState = itBR.getCurrentReplicaState();
+ BlockInfo storedBlock = processReportedBlock(blockManager, iblk, iState,
+ toAdd, toInvalidate, toCorrupt);
// move block to the head of the list
- this.moveBlockToHead(storedBlock);
+ if(storedBlock != null && storedBlock.findDatanode(this) >= 0)
+ this.moveBlockToHead(storedBlock);
}
// collect blocks that have not been reported
// all of them are next to the delimiter
@@ -405,6 +431,105 @@
this.removeBlock(delimiter);
}
+ /**
+ * Process a block replica reported by the data-node.
+ *
+ * <ol>
+ * <li>If the block is not known to the system (not in blocksMap) then the
+ * data-node should be notified to invalidate this block.</li>
+ * <li>If the reported replica is valid that is has the same generation stamp
+ * and length as recorded on the name-node, then the replica location is
+ * added to the name-node.</li>
+ * <li>If the reported replica is not valid, then it is marked as corrupt,
+ * which triggers replication of the existing valid replicas.
+ * Corrupt replicas are removed from the system when the block
+ * is fully replicated.</li>
+ * </ol>
+ *
+ * @param blockManager
+ * @param block reported block replica
+ * @param rState reported replica state
+ * @param toAdd add to DatanodeDescriptor
+ * @param toInvalidate missing blocks (not in the blocks map)
+ * should be removed from the data-node
+ * @param toCorrupt replicas with unexpected length or generation stamp;
+ * add to corrupt replicas
+ * @return
+ */
+ BlockInfo processReportedBlock(
+ BlockManager blockManager,
+ Block block, // reported block replica
+ ReplicaState rState, // reported replica state
+ Collection<Block> toAdd, // add to DatanodeDescriptor
+ Collection<Block> toInvalidate, // should be removed from DN
+ Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
+ FSNamesystem.LOG.debug("Reported block " + block
+ + " on " + getName() + " size " + block.getNumBytes()
+ + " replicaState = " + rState);
+
+ // find block by blockId
+ BlockInfo storedBlock = blockManager.findStoredBlock(block.getBlockId());
+ if(storedBlock == null) {
+ // If blocksMap does not contain reported block id,
+ // the replica should be removed from the data-node.
+ toInvalidate.add(new Block(block));
+ return null;
+ }
+
+ FSNamesystem.LOG.debug("In memory blockUCState = " + storedBlock.getBlockUCState());
+
+ // Block is on the DN
+ boolean isCorrupt = false;
+ switch(rState) {
+ case FINALIZED:
+ switch(storedBlock.getBlockUCState()) {
+ case COMPLETE:
+ case COMMITTED:
+ // This is a temporary hack until Block.equals() and compareTo()
+ // are changed not to take into account the generation stamp for searching
+ // in blocksMap
+ if(storedBlock.getGenerationStamp() != block.getGenerationStamp()) {
+ toInvalidate.add(new Block(block));
+ return storedBlock;
+ }
+
+ if(storedBlock.getGenerationStamp() != block.getGenerationStamp()
+ || storedBlock.getNumBytes() != block.getNumBytes())
+ isCorrupt = true;
+ break;
+ case UNDER_CONSTRUCTION:
+ case UNDER_RECOVERY:
+ ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+ this, block, rState);
+ }
+ if(!isCorrupt && storedBlock.findDatanode(this) < 0)
+ if (storedBlock.getNumBytes() != block.getNumBytes()) {
+ toAdd.add(new Block(block));
+ } else {
+ toAdd.add(storedBlock);
+ }
+ break;
+ case RBW:
+ case RWR:
+ if(!storedBlock.isComplete())
+ ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+ this, block, rState);
+ else
+ isCorrupt = true;
+ break;
+ case RUR: // should not be reported
+ case TEMPORARY: // should not be reported
+ default:
+ FSNamesystem.LOG.warn("Unexpected replica state " + rState
+ + " for block: " + storedBlock +
+ " on " + getName() + " size " + storedBlock.getNumBytes());
+ break;
+ }
+ if(isCorrupt)
+ toCorrupt.add(storedBlock);
+ return storedBlock;
+ }
+
/** Serialization for FSEditLog */
void readFieldsFromFSEditLog(DataInput in) throws IOException {
this.name = DeprecatedUTF8.readString(in);
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Sep 30 22:57:30 2009
@@ -36,6 +36,7 @@
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
@@ -191,7 +192,7 @@
*/
INode unprotectedAddFile( String path,
PermissionStatus permissions,
- Block[] blocks,
+ BlockInfo[] blocks,
short replication,
long modificationTime,
long atime,
@@ -261,7 +262,8 @@
// Add file->block mapping
INodeFile newF = (INodeFile)newNode;
for (int i = 0; i < nrBlocks; i++) {
- newF.setBlock(i, getBlockManager().addINode(blocks[i], newF));
+ BlockInfo blockInfo = new BlockInfo(blocks[i], newF.getReplication());
+ newF.setBlock(i, getBlockManager().addINode(blockInfo, newF));
}
}
}
@@ -271,27 +273,39 @@
/**
* Add a block to the file. Returns a reference to the added block.
*/
- Block addBlock(String path, INode[] inodes, Block block
- ) throws QuotaExceededException {
+ BlockInfo addBlock(String path,
+ INode[] inodes,
+ Block block,
+ DatanodeDescriptor targets[]
+ ) throws QuotaExceededException, IOException {
waitForReady();
synchronized (rootDir) {
- INodeFile fileNode = (INodeFile) inodes[inodes.length-1];
+ assert inodes[inodes.length-1].isUnderConstruction() :
+ "INode should correspond to a file under construction";
+ INodeFileUnderConstruction fileINode =
+ (INodeFileUnderConstruction)inodes[inodes.length-1];
// check quota limits and updated space consumed
updateCount(inodes, inodes.length-1, 0,
- fileNode.getPreferredBlockSize()*fileNode.getReplication());
-
- // associate the new list of blocks with this file
- BlockInfo blockInfo = getBlockManager().addINode(block, fileNode);
- fileNode.addBlock(blockInfo);
+ fileINode.getPreferredBlockSize()*fileINode.getReplication());
+
+ // associate new last block for the file
+ BlockInfoUnderConstruction blockInfo =
+ new BlockInfoUnderConstruction(
+ block,
+ fileINode.getReplication(),
+ BlockUCState.UNDER_CONSTRUCTION,
+ targets);
+ getBlockManager().addINode(blockInfo, fileINode);
+ fileINode.addBlock(blockInfo);
NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
+ path + " with " + block
+ " block is added to the in-memory "
+ "file system");
+ return blockInfo;
}
- return block;
}
/**
@@ -335,7 +349,7 @@
synchronized (rootDir) {
// modify file-> block and blocksMap
- fileNode.removeBlock(block);
+ fileNode.removeLastBlock(block);
getBlockManager().removeBlockFromMap(block);
// If block is removed from blocksMap remove it from corruptReplicasMap
getBlockManager().removeFromCorruptReplicasMap(block);
@@ -732,7 +746,7 @@
}
int index = 0;
- for (Block b : newnode.getBlocks()) {
+ for (BlockInfo b : newnode.getBlocks()) {
BlockInfo info = getBlockManager().addINode(b, newnode);
newnode.setBlock(index, info); // inode refers to the block in BlocksMap
index++;
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Sep 30 22:57:30 2009
@@ -461,19 +461,9 @@
blockSize = readLong(in);
}
// get blocks
- Block blocks[] = null;
- if (logVersion <= -14) {
- blocks = readBlocks(in);
- } else {
- BlockTwo oldblk = new BlockTwo();
- int num = in.readInt();
- blocks = new Block[num];
- for (int i = 0; i < num; i++) {
- oldblk.readFields(in);
- blocks[i] = new Block(oldblk.blkid, oldblk.len,
- Block.GRANDFATHER_GENERATION_STAMP);
- }
- }
+ boolean isFileUnderConstruction = (opcode == OP_ADD);
+ BlockInfo blocks[] =
+ readBlocks(in, logVersion, isFileUnderConstruction, replication);
// Older versions of HDFS does not store the block size in inode.
// If the file has more than one block, use the size of the
@@ -521,7 +511,7 @@
path, permissions,
blocks, replication,
mtime, atime, blockSize);
- if (opcode == OP_ADD) {
+ if (isFileUnderConstruction) {
numOpAdd++;
//
// Replace current node with a INodeUnderConstruction.
@@ -538,7 +528,7 @@
clientMachine,
null);
fsDir.replaceNode(path, node, cons);
- fsNamesys.leaseManager.addLease(cons.clientName, path);
+ fsNamesys.leaseManager.addLease(cons.getClientName(), path);
}
break;
}
@@ -1247,12 +1237,26 @@
return Long.parseLong(FSImage.readString(in));
}
- static private Block[] readBlocks(DataInputStream in) throws IOException {
+ static private BlockInfo[] readBlocks(
+ DataInputStream in,
+ int logVersion,
+ boolean isFileUnderConstruction,
+ short replication) throws IOException {
int numBlocks = in.readInt();
- Block[] blocks = new Block[numBlocks];
+ BlockInfo[] blocks = new BlockInfo[numBlocks];
+ Block blk = new Block();
+ BlockTwo oldblk = new BlockTwo();
for (int i = 0; i < numBlocks; i++) {
- blocks[i] = new Block();
- blocks[i].readFields(in);
+ if (logVersion <= -14) {
+ blk.readFields(in);
+ } else {
+ oldblk.readFields(in);
+ blk.set(oldblk.blkid, oldblk.len, Block.GRANDFATHER_GENERATION_STAMP);
+ }
+ if(isFileUnderConstruction && i == numBlocks-1)
+ blocks[i] = new BlockInfoUnderConstruction(blk, replication);
+ else
+ blocks[i] = new BlockInfo(blk, replication);
}
return blocks;
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Sep 30 22:57:30 2009
@@ -55,6 +55,7 @@
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
@@ -1403,7 +1404,7 @@
}
INodeFile oldnode = (INodeFile) old;
fsDir.replaceNode(path, oldnode, cons);
- fs.leaseManager.addLease(cons.clientName, path);
+ fs.leaseManager.addLease(cons.getClientName(), path);
}
}
@@ -1419,10 +1420,17 @@
int numBlocks = in.readInt();
BlockInfo[] blocks = new BlockInfo[numBlocks];
Block blk = new Block();
- for (int i = 0; i < numBlocks; i++) {
+ int i = 0;
+ for (; i < numBlocks-1; i++) {
blk.readFields(in);
blocks[i] = new BlockInfo(blk, blockReplication);
}
+ // last block is UNDER_CONSTRUCTION
+ if(numBlocks > 0) {
+ blk.readFields(in);
+ blocks[i] = new BlockInfoUnderConstruction(
+ blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
+ }
PermissionStatus perm = PermissionStatus.read(in);
String clientName = readString(in);
String clientMachine = readString(in);
@@ -1430,7 +1438,7 @@
// These locations are not used at all
int numLocs = in.readInt();
DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocs];
- for (int i = 0; i < numLocs; i++) {
+ for (i = 0; i < numLocs; i++) {
locations[i] = new DatanodeDescriptor();
locations[i].readFields(in);
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Sep 30 22:57:30 2009
@@ -23,6 +23,7 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -563,13 +564,18 @@
}
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
long totalSize = 0;
+ BlockInfo curBlock;
while(totalSize<size && iter.hasNext()) {
- totalSize += addBlock(iter.next(), results);
+ curBlock = iter.next();
+ if(!curBlock.isComplete()) continue;
+ totalSize += addBlock(curBlock, results);
}
if(totalSize<size) {
iter = node.getBlockIterator(); // start from the beginning
for(int i=0; i<startBlock&&totalSize<size; i++) {
- totalSize += addBlock(iter.next(), results);
+ curBlock = iter.next();
+ if(!curBlock.isComplete()) continue;
+ totalSize += addBlock(curBlock, results);
}
}
@@ -707,14 +713,46 @@
if (doAccessTime && isAccessTimeSupported()) {
dir.setTimes(src, inode, -1, now(), false);
}
- final Block[] blocks = inode.getBlocks();
+ final BlockInfo[] blocks = inode.getBlocks();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
+ }
if (blocks == null) {
return null;
}
- final List<LocatedBlock> results = blocks.length == 0?
- new ArrayList<LocatedBlock>(0):
- blockManager.getBlockLocations(blocks, offset, length, Integer.MAX_VALUE);
- return inode.createLocatedBlocks(results);
+
+ if (blocks.length == 0) {
+ return new LocatedBlocks(0, inode.isUnderConstruction(),
+ Collections.<LocatedBlock>emptyList(), null, false);
+ } else {
+ final long n = inode.computeFileSize(false);
+ final List<LocatedBlock> locatedblocks = blockManager.getBlockLocations(
+ blocks, offset, length, Integer.MAX_VALUE);
+ final BlockInfo last = inode.getLastBlock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("last = " + last);
+ }
+
+ if (!last.isComplete()) {
+ return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+ blockManager.getBlockLocation(last, n), false);
+ }
+ else {
+ return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+ blockManager.getBlockLocation(last, n-last.getNumBytes()), true);
+ }
+ }
+ }
+
+ /** Create a LocatedBlock. */
+ LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
+ final long offset, final boolean corrupt) throws IOException {
+ final LocatedBlock lb = new LocatedBlock(b, locations, offset, corrupt);
+ if (isAccessTokenEnabled) {
+ lb.setAccessToken(accessTokenHandler.generateToken(b.getBlockId(),
+ EnumSet.of(AccessTokenHandler.AccessMode.READ)));
+ }
+ return lb;
}
/**
@@ -912,40 +950,45 @@
// If the file is under construction , then it must be in our
// leases. Find the appropriate lease record.
//
- Lease lease = leaseManager.getLease(holder);
- //
- // We found the lease for this file. And surprisingly the original
- // holder is trying to recreate this file. This should never occur.
- //
- if (lease != null) {
+ Lease lease = leaseManager.getLeaseByPath(src);
+ if (lease == null) {
throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because current leaseholder is trying to recreate file.");
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ " because pendingCreates is non-null but no leases found.");
}
//
- // Find the original holder.
+ // We found the lease for this file. And surprisingly the original
+ // holder is trying to recreate this file. This should never occur.
//
- lease = leaseManager.getLease(pendingFile.clientName);
- if (lease == null) {
+ if (lease.getHolder().equals(holder)) {
throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because pendingCreates is non-null but no leases found.");
- }
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ " because current leaseholder is trying to recreate file.");
+ }
+ assert lease.getHolder().equals(pendingFile.getClientName()) :
+ "Current lease holder " + lease.getHolder() +
+ " does not match file creator " + pendingFile.getClientName();
//
+ // Current lease holder is different from the requester.
// If the original holder has not renewed in the last SOFTLIMIT
- // period, then start lease recovery.
+ // period, then start lease recovery, otherwise fail.
//
if (lease.expiredSoftLimit()) {
LOG.info("startFile: recover lease " + lease + ", src=" + src);
- internalReleaseLease(lease, src);
- }
- throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- ", because this file is already being created by " +
- pendingFile.getClientName() +
- " on " + pendingFile.getClientMachine());
+ boolean isClosed = internalReleaseLease(lease, src, null);
+ if(!isClosed)
+ throw new RecoveryInProgressException(
+ "Failed to close file " + src +
+ ". Lease recovery is in progress. Try again later.");
+
+ } else
+ throw new AlreadyBeingCreatedException("failed to create file " +
+ src + " for " + holder + " on client " + clientMachine +
+ ", because this file is already being created by " +
+ pendingFile.getClientName() +
+ " on " + pendingFile.getClientMachine());
}
try {
@@ -998,7 +1041,7 @@
clientMachine,
clientNode);
dir.replaceNode(src, node, cons);
- leaseManager.addLease(cons.clientName, src);
+ leaseManager.addLease(cons.getClientName(), src);
} else {
// Now we can add the name to the filesystem. This file has no
@@ -1014,7 +1057,7 @@
throw new IOException("DIR* NameSystem.startFile: " +
"Unable to add file to namespace.");
}
- leaseManager.addLease(newNode.clientName, src);
+ leaseManager.addLease(newNode.getClientName(), src);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+"add "+src+" to namespace for "+holder);
@@ -1048,40 +1091,36 @@
LocatedBlock lb = null;
synchronized (this) {
INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src);
-
- BlockInfo[] blocks = file.getBlocks();
- if (blocks != null && blocks.length > 0) {
- BlockInfo last = blocks[blocks.length-1];
- // this is a redundant search in blocksMap
- // should be resolved by the new implementation of append
- BlockInfo storedBlock = blockManager.getStoredBlock(last);
- assert last == storedBlock : "last block should be in the blocksMap";
- if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) {
+ BlockInfo lastBlock = file.getLastBlock();
+ if (lastBlock != null) {
+ assert lastBlock == blockManager.getStoredBlock(lastBlock) :
+ "last block of the file is not in blocksMap";
+ if (file.getPreferredBlockSize() > lastBlock.getNumBytes()) {
long fileLength = file.computeContentSummary().getLength();
- DatanodeDescriptor[] targets = blockManager.getNodes(storedBlock);
+ DatanodeDescriptor[] targets = blockManager.getNodes(lastBlock);
// remove the replica locations of this block from the node
for (int i = 0; i < targets.length; i++) {
- targets[i].removeBlock(storedBlock);
+ targets[i].removeBlock(lastBlock);
}
- // set the locations of the last block in the lease record
- file.setLastBlock(storedBlock, targets);
+ // convert last block to under-construction and set its locations
+ blockManager.convertLastBlockToUnderConstruction(file, targets);
- lb = new LocatedBlock(last, targets,
- fileLength-storedBlock.getNumBytes());
+ lb = new LocatedBlock(lastBlock, targets,
+ fileLength-lastBlock.getNumBytes());
if (isAccessTokenEnabled) {
lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock()
.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
}
// Remove block from replication queue.
- blockManager.updateNeededReplications(last, 0, 0);
+ blockManager.updateNeededReplications(lastBlock, 0, 0);
// remove this block from the list of pending blocks to be deleted.
// This reduces the possibility of triggering HADOOP-1349.
//
for (DatanodeDescriptor dd : targets) {
String datanodeId = dd.getStorageID();
- blockManager.removeFromInvalidates(datanodeId, last);
+ blockManager.removeFromInvalidates(datanodeId, lastBlock);
}
}
}
@@ -1115,7 +1154,8 @@
* client to "try again later".
*/
public LocatedBlock getAdditionalBlock(String src,
- String clientName
+ String clientName,
+ Block previous
) throws IOException {
long fileLength, blockSize;
int replication;
@@ -1135,6 +1175,9 @@
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
+ // commit the last block
+ blockManager.commitLastBlock(pendingFile, previous);
+
//
// If we fail this, bad things happen!
//
@@ -1168,9 +1211,11 @@
throw new NotReplicatedYetException("Not replicated yet:" + src);
}
+ // complete the penultimate block
+ blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
+
// allocate new block record block locations in INode.
- newBlock = allocateBlock(src, pathINodes);
- pendingFile.setTargets(targets);
+ newBlock = allocateBlock(src, pathINodes, targets);
for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
@@ -1250,15 +1295,18 @@
COMPLETE_SUCCESS
}
- public CompleteFileStatus completeFile(String src, String holder) throws IOException {
- CompleteFileStatus status = completeFileInternal(src, holder);
+ public CompleteFileStatus completeFile(String src,
+ String holder,
+ Block last) throws IOException {
+ CompleteFileStatus status = completeFileInternal(src, holder, last);
getEditLog().logSync();
return status;
}
-
- private synchronized CompleteFileStatus completeFileInternal(String src,
- String holder) throws IOException {
+ private synchronized CompleteFileStatus completeFileInternal(
+ String src,
+ String holder,
+ Block last) throws IOException {
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
if (isInSafeMode())
throw new SafeModeException("Cannot complete file " + src, safeMode);
@@ -1279,7 +1327,12 @@
("from " + pendingFile.getClientMachine()))
);
return CompleteFileStatus.OPERATION_FAILED;
- } else if (!checkFileProgress(pendingFile, true)) {
+ }
+
+ // commit the last block
+ blockManager.commitLastBlock(pendingFile, last);
+
+ if (!checkFileProgress(pendingFile, true)) {
return CompleteFileStatus.STILL_WAITING;
}
@@ -1312,13 +1365,15 @@
* @param inodes INode representing each of the components of src.
* <code>inodes[inodes.length-1]</code> is the INode for the file.
*/
- private Block allocateBlock(String src, INode[] inodes) throws IOException {
+ private Block allocateBlock(String src,
+ INode[] inodes,
+ DatanodeDescriptor targets[]) throws IOException {
Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0);
while(isValidBlock(b)) {
b.setBlockId(FSNamesystem.randBlockId.nextLong());
}
b.setGenerationStamp(getGenerationStamp());
- b = dir.addBlock(src, inodes, b);
+ b = dir.addBlock(src, inodes, b, targets);
NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
+src+ ". "+b);
return b;
@@ -1329,12 +1384,12 @@
* replicated. If not, return false. If checkall is true, then check
* all blocks, otherwise check only penultimate block.
*/
- synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
+ synchronized boolean checkFileProgress(INodeFile v, boolean checkall) throws IOException {
if (checkall) {
//
// check all blocks of the file.
//
- for (Block block: v.getBlocks()) {
+ for (BlockInfo block: v.getBlocks()) {
if (!blockManager.checkMinReplication(block)) {
return false;
}
@@ -1343,7 +1398,7 @@
//
// check the penultimate block of this file
//
- Block b = v.getPenultimateBlock();
+ BlockInfo b = v.getPenultimateBlock();
if (b != null && !blockManager.checkMinReplication(b)) {
return false;
}
@@ -1614,20 +1669,31 @@
* Move a file that is being written to be immutable.
* @param src The filename
* @param lease The lease for the client creating the file
- */
- void internalReleaseLease(Lease lease, String src) throws IOException {
+ * @param recoveryLeaseHolder reassign lease to this holder if the last block
+ * needs recovery; keep current holder if null.
+ * @throws AlreadyBeingCreatedException if file is waiting to achieve minimal
+ * replication;<br>
+ * RecoveryInProgressException if lease recovery is in progress.<br>
+ * IOException in case of an error.
+ * @return true if file has been successfully finalized and closed or
+ * false if block recovery has been initiated
+ */
+ boolean internalReleaseLease(
+ Lease lease, String src, String recoveryLeaseHolder)
+ throws AlreadyBeingCreatedException,
+ IOException {
LOG.info("Recovering lease=" + lease + ", src=" + src);
INodeFile iFile = dir.getFileINode(src);
if (iFile == null) {
- final String message = "DIR* NameSystem.internalReleaseCreate: "
+ final String message = "DIR* NameSystem.internalReleaseLease: "
+ "attempt to release a create lock on "
+ src + " file does not exist.";
NameNode.stateChangeLog.warn(message);
throw new IOException(message);
}
if (!iFile.isUnderConstruction()) {
- final String message = "DIR* NameSystem.internalReleaseCreate: "
+ final String message = "DIR* NameSystem.internalReleaseLease: "
+ "attempt to release a create lock on "
+ src + " but file is already closed.";
NameNode.stateChangeLog.warn(message);
@@ -1635,39 +1701,123 @@
}
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
+ int nrBlocks = pendingFile.numBlocks();
+ BlockInfo[] blocks = pendingFile.getBlocks();
- // Initialize lease recovery for pendingFile. If there are no blocks
- // associated with this file, then reap lease immediately. Otherwise
- // renew the lease and trigger lease recovery.
- if (pendingFile.getTargets() == null ||
- pendingFile.getTargets().length == 0) {
- if (pendingFile.getBlocks().length == 0) {
+ int nrCompleteBlocks;
+ BlockInfo curBlock = null;
+ for(nrCompleteBlocks = 0; nrCompleteBlocks < nrBlocks; nrCompleteBlocks++) {
+ curBlock = blocks[nrCompleteBlocks];
+ if(!curBlock.isComplete())
+ break;
+ assert blockManager.checkMinReplication(curBlock) :
+ "A COMPLETE block is not minimally replicated in " + src;
+ }
+
+ // If there are no incomplete blocks associated with this file,
+ // then reap lease immediately and close the file.
+ if(nrCompleteBlocks == nrBlocks) {
+ finalizeINodeFileUnderConstruction(src, pendingFile);
+ NameNode.stateChangeLog.warn("BLOCK*"
+ + " internalReleaseLease: All existing blocks are COMPLETE,"
+ + " lease removed, file closed.");
+ return true; // closed!
+ }
+
+ // Only the last and the penultimate blocks may be in non COMPLETE state.
+ // If the penultimate block is not COMPLETE, then it must be COMMITTED.
+ if(nrCompleteBlocks < nrBlocks - 2 ||
+ nrCompleteBlocks == nrBlocks - 2 &&
+ curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
+ final String message = "DIR* NameSystem.internalReleaseLease: "
+ + "attempt to release a create lock on "
+ + src + " but file is already closed.";
+ NameNode.stateChangeLog.warn(message);
+ throw new IOException(message);
+ }
+
+ // no we know that the last block is not COMPLETE, and
+ // that the penultimate block if exists is either COMPLETE or COMMITTED
+ BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock();
+ BlockUCState lastBlockState = lastBlock.getBlockUCState();
+ BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
+ BlockUCState penultimateBlockState = (penultimateBlock == null ?
+ BlockUCState.COMPLETE : penultimateBlock.getBlockUCState());
+ assert penultimateBlockState == BlockUCState.COMPLETE ||
+ penultimateBlockState == BlockUCState.COMMITTED :
+ "Unexpected state of penultimate block in " + src;
+
+ switch(lastBlockState) {
+ case COMPLETE:
+ assert false : "Already checked that the last block is incomplete";
+ break;
+ case COMMITTED:
+ // Close file if committed blocks are minimally replicated
+ if(blockManager.checkMinReplication(penultimateBlock) &&
+ blockManager.checkMinReplication(lastBlock)) {
finalizeINodeFileUnderConstruction(src, pendingFile);
NameNode.stateChangeLog.warn("BLOCK*"
- + " internalReleaseLease: No blocks found, lease removed.");
- return;
- }
- // setup the Inode.targets for the last block from the blockManager
- //
- BlockInfo[] blocks = pendingFile.getBlocks();
- BlockInfo last = blocks[blocks.length-1];
- DatanodeDescriptor[] targets = blockManager.getNodes(last);
- pendingFile.setTargets(targets);
+ + " internalReleaseLease: Committed blocks are minimally replicated,"
+ + " lease removed, file closed.");
+ return true; // closed!
+ }
+ // Cannot close file right now, since some blocks
+ // are not yet minimally replicated.
+ // This may potentially cause infinite loop in lease recovery
+ // if there are no valid replicas on data-nodes.
+ String message = "DIR* NameSystem.internalReleaseLease: " +
+ "Failed to release lease for file " + src +
+ ". Committed blocks are waiting to be minimally replicated." +
+ " Try again later.";
+ NameNode.stateChangeLog.warn(message);
+ throw new AlreadyBeingCreatedException(message);
+ case UNDER_CONSTRUCTION:
+ case UNDER_RECOVERY:
+ // setup the last block locations from the blockManager if not known
+ if(lastBlock.getNumExpectedLocations() == 0)
+ lastBlock.setExpectedLocations(blockManager.getNodes(lastBlock));
+ // start recovery of the last block for this file
+ long blockRecoveryId = nextGenerationStamp();
+ lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
+ lastBlock.initializeBlockRecovery(blockRecoveryId);
+ leaseManager.renewLease(lease);
+ // Cannot close file right now, since the last block requires recovery.
+ // This may potentially cause infinite loop in lease recovery
+ // if there are no valid replicas on data-nodes.
+ NameNode.stateChangeLog.warn(
+ "DIR* NameSystem.internalReleaseLease: " +
+ "File " + src + " has not been closed." +
+ " Lease recovery is in progress. " +
+ "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
+ break;
}
- // start lease recovery of the last block for this file.
- pendingFile.assignPrimaryDatanode();
- leaseManager.renewLease(lease);
+ return false;
+ }
+
+ Lease reassignLease(Lease lease, String src, String newHolder,
+ INodeFileUnderConstruction pendingFile) {
+ if(newHolder == null)
+ return lease;
+ pendingFile.setClientName(newHolder);
+ return leaseManager.reassignLease(lease, src, newHolder);
}
- private void finalizeINodeFileUnderConstruction(String src,
+
+ private void finalizeINodeFileUnderConstruction(
+ String src,
INodeFileUnderConstruction pendingFile) throws IOException {
- leaseManager.removeLease(pendingFile.clientName, src);
+ leaseManager.removeLease(pendingFile.getClientName(), src);
+
+ // complete the penultimate block
+ blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
// The file is no longer pending.
- // Create permanent INode, update blockmap
+ // Create permanent INode, update blocks
INodeFile newFile = pendingFile.convertToInodeFile();
dir.replaceNode(src, pendingFile, newFile);
+ // complete last block of the file
+ blockManager.completeBlock(newFile, newFile.numBlocks()-1);
// close file and persist block allocations for this file
dir.closeFile(src, newFile);
@@ -1690,11 +1840,20 @@
throw new IOException("Block (=" + lastblock + ") not found");
}
INodeFile iFile = oldblockinfo.getINode();
- if (!iFile.isUnderConstruction()) {
+ if (!iFile.isUnderConstruction() || oldblockinfo.isComplete()) {
throw new IOException("Unexpected block (=" + lastblock
+ ") since the file (=" + iFile.getLocalName()
+ ") is not under construction");
}
+
+ long recoveryId =
+ ((BlockInfoUnderConstruction)oldblockinfo).getBlockRecoveryId();
+ if(recoveryId != newgenerationstamp) {
+ throw new IOException("The recovery id " + newgenerationstamp
+ + " does not match current recovery id "
+ + recoveryId + " for block " + lastblock);
+ }
+
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
@@ -1703,12 +1862,15 @@
blockManager.removeBlockFromMap(oldblockinfo);
if (deleteblock) {
- pendingFile.removeBlock(lastblock);
+ pendingFile.removeLastBlock(lastblock);
}
else {
// update last block, construct newblockinfo and add it to the blocks map
lastblock.set(lastblock.getBlockId(), newlength, newgenerationstamp);
- final BlockInfo newblockinfo = blockManager.addINode(lastblock, pendingFile);
+ BlockInfoUnderConstruction newblockinfo =
+ new BlockInfoUnderConstruction(
+ lastblock, pendingFile.getReplication());
+ blockManager.addINode(newblockinfo, pendingFile);
// find the DatanodeDescriptor objects
// There should be no locations in the blockManager till now because the
@@ -1727,11 +1889,9 @@
for (int i = 0; i < descriptors.length; i++) {
descriptors[i].addBlock(newblockinfo);
}
- pendingFile.setLastBlock(newblockinfo, null);
- } else {
- // add locations into the INodeUnderConstruction
- pendingFile.setLastBlock(newblockinfo, descriptors);
}
+ // add locations into the INodeUnderConstruction
+ pendingFile.setLastBlock(newblockinfo, descriptors);
}
// If this commit does not want to close the file, persist
@@ -1745,7 +1905,10 @@
LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
return;
}
-
+
+ // commit the last block
+ blockManager.commitLastBlock(pendingFile, lastblock);
+
//remove lease, close file
finalizeINodeFileUnderConstruction(src, pendingFile);
getEditLog().logSync();
@@ -3342,7 +3505,7 @@
void setBlockTotal() {
if (safeMode == null)
return;
- safeMode.setBlockTotal((int)getBlocksTotal());
+ safeMode.setBlockTotal((int)getCompleteBlocksTotal());
}
/**
@@ -3353,6 +3516,33 @@
}
/**
+ * Get the total number of COMPLETE blocks in the system.
+ * For safe mode only complete blocks are counted.
+ */
+ long getCompleteBlocksTotal() {
+ // Calculate number of blocks under construction
+ long numUCBlocks = 0;
+ for (Lease lease : leaseManager.getSortedLeases()) {
+ for(String path : lease.getPaths()) {
+ INode node = dir.getFileINode(path);
+ assert node != null : "Found a lease for nonexisting file.";
+ assert node.isUnderConstruction() :
+ "Found a lease for file that is not under construction.";
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
+ BlockInfo[] blocks = cons.getBlocks();
+ if(blocks == null)
+ continue;
+ for(BlockInfo b : blocks) {
+ if(!b.isComplete())
+ numUCBlocks++;
+ }
+ }
+ }
+ LOG.info("Number of blocks under construction: " + numUCBlocks);
+ return getBlocksTotal() - numUCBlocks;
+ }
+
+ /**
* Enter safe mode manually.
* @throws IOException
*/
@@ -3671,29 +3861,129 @@
return gs;
}
+ private INodeFileUnderConstruction checkUCBlock(Block block, String clientName)
+ throws IOException {
+ // check safe mode
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot get a new generation stamp and an " +
+ "access token for block " + block, safeMode);
+
+ // check stored block state
+ BlockInfo storedBlock = blockManager.getStoredBlock(block);
+ if (storedBlock == null ||
+ storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) {
+ throw new IOException(block +
+ " does not exist or is not under Construction" + storedBlock);
+ }
+
+ // check file inode
+ INodeFile file = storedBlock.getINode();
+ if (file==null || !file.isUnderConstruction()) {
+ throw new IOException("The file " + storedBlock +
+ " is belonged to does not exist or it is not under construction.");
+ }
+
+ // check lease
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
+ if (clientName == null || !clientName.equals(pendingFile.getClientName())) {
+ throw new LeaseExpiredException("Lease mismatch: " + block +
+ " is accessed by a non lease holder " + clientName);
+ }
+
+ return pendingFile;
+ }
+
/**
- * Verifies that the block is associated with a file that has a lease.
- * Increments, logs and then returns the stamp
+ * Get a new generation stamp together with an access token for
+ * a block under construction
+ *
+ * This method is called for recovering a failed pipeline or setting up
+ * a pipeline to append to a block.
+ *
+ * @param block a block
+ * @param clientName the name of a client
+ * @return a located block with a new generation stamp and an access token
+ * @throws IOException if any error occurs
+ */
+ synchronized LocatedBlock updateBlockForPipeline(Block block,
+ String clientName) throws IOException {
+ // check vadility of parameters
+ checkUCBlock(block, clientName);
+
+ // get a new generation stamp and an access token
+ block.setGenerationStamp(nextGenerationStamp());
+ LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
+ if (isAccessTokenEnabled) {
+ locatedBlock.setAccessToken(accessTokenHandler.generateToken(
+ block.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+ }
+ return locatedBlock;
+ }
+
+
+ /**
+ * Update a pipeline for a block under construction
+ *
+ * @param clientName the name of the client
+ * @param oldblock and old block
+ * @param newBlock a new block with a new generation stamp and length
+ * @param newNodes datanodes in the pipeline
+ * @throws IOException if any error occurs
*/
- synchronized long nextGenerationStampForBlock(Block block) throws IOException {
- BlockInfo storedBlock = blockManager.getStoredBlock(block);
- if (storedBlock == null) {
- String msg = block + " is already commited, storedBlock == null.";
- LOG.info(msg);
+ synchronized void updatePipeline(String clientName, Block oldBlock,
+ Block newBlock, DatanodeID[] newNodes)
+ throws IOException {
+ LOG.info("updatePipeline(block=" + oldBlock
+ + ", newGenerationStamp=" + newBlock.getGenerationStamp()
+ + ", newLength=" + newBlock.getNumBytes()
+ + ", newNodes=" + Arrays.asList(newNodes)
+ + ", clientName=" + clientName
+ + ")");
+
+ // check the vadility of the block and lease holder name
+ final INodeFileUnderConstruction pendingFile =
+ checkUCBlock(oldBlock, clientName);
+ final BlockInfo oldblockinfo = pendingFile.getLastBlock();
+
+ // check new GS & length: this is not expected
+ if (newBlock.getGenerationStamp() <= oldblockinfo.getGenerationStamp() ||
+ newBlock.getNumBytes() < oldblockinfo.getNumBytes()) {
+ String msg = "Update " + oldBlock + " (len = " +
+ oldblockinfo.getNumBytes() + ") to an older state: " + newBlock +
+ " (len = " + newBlock.getNumBytes() +")";
+ LOG.warn(msg);
throw new IOException(msg);
}
- INodeFile fileINode = storedBlock.getINode();
- if (!fileINode.isUnderConstruction()) {
- String msg = block + " is already commited, !fileINode.isUnderConstruction().";
- LOG.info(msg);
- throw new IOException(msg);
+
+ // Remove old block from blocks map. This always have to be done
+ // because the generation stamp of this block is changing.
+ blockManager.removeBlockFromMap(oldblockinfo);
+
+ // update last block, construct newblockinfo and add it to the blocks map
+ BlockInfoUnderConstruction newblockinfo =
+ new BlockInfoUnderConstruction(
+ newBlock, pendingFile.getReplication());
+ blockManager.addINode(newblockinfo, pendingFile);
+
+ // find the DatanodeDescriptor objects
+ DatanodeDescriptor[] descriptors = null;
+ if (newNodes.length > 0) {
+ descriptors = new DatanodeDescriptor[newNodes.length];
+ for(int i = 0; i < newNodes.length; i++) {
+ descriptors[i] = getDatanode(newNodes[i]);
+ }
}
- if (!((INodeFileUnderConstruction)fileINode).setLastRecoveryTime(now())) {
- String msg = block + " is beening recovered, ignoring this request.";
- LOG.info(msg);
- throw new IOException(msg);
+ // add locations into the INodeUnderConstruction
+ pendingFile.setLastBlock(newblockinfo, descriptors);
+
+ // persist blocks only if append is supported
+ String src = leaseManager.findPath(pendingFile);
+ if (supportAppends) {
+ dir.persistBlocks(src, pendingFile);
+ getEditLog().logSync();
}
- return nextGenerationStamp();
+ LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
+ return;
}
// rename was successful. If any part of the renamed subtree had
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Wed Sep 30 22:57:30 2009
@@ -25,8 +25,6 @@
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.permission.*;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
/**
* We keep an in-memory representation of the file/block hierarchy.
@@ -423,10 +421,4 @@
}
return null;
}
-
-
- LocatedBlocks createLocatedBlocks(List<LocatedBlock> blocks) {
- return new LocatedBlocks(computeContentSummary().getLength(), blocks,
- isUnderConstruction());
- }
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Wed Sep 30 22:57:30 2009
@@ -122,16 +122,29 @@
/** {@inheritDoc} */
long[] computeContentSummary(long[] summary) {
- long bytes = 0;
- for(Block blk : blocks) {
- bytes += blk.getNumBytes();
- }
- summary[0] += bytes;
+ summary[0] += computeFileSize(true);
summary[1]++;
summary[3] += diskspaceConsumed();
return summary;
}
+ /** Compute file size.
+ * May or may not include BlockInfoUnderConstruction.
+ */
+ long computeFileSize(boolean includesBlockInfoUnderConstruction) {
+ if (blocks == null || blocks.length == 0) {
+ return 0;
+ }
+ final int last = blocks.length - 1;
+ //check if the last block is BlockInfoUnderConstruction
+ long bytes = blocks[last] instanceof BlockInfoUnderConstruction
+ && !includesBlockInfoUnderConstruction?
+ 0: blocks[last].getNumBytes();
+ for(int i = 0; i < last; i++) {
+ bytes += blocks[i].getNumBytes();
+ }
+ return bytes;
+ }
@Override
@@ -173,13 +186,14 @@
/**
* Return the penultimate allocated block for this file.
*/
- Block getPenultimateBlock() {
+ BlockInfo getPenultimateBlock() {
if (blocks == null || blocks.length <= 1) {
return null;
}
return blocks[blocks.length - 2];
}
+ // SHV !!! this is not used anywhere - remove
INodeFileUnderConstruction toINodeFileUnderConstruction(
String clientName, String clientMachine, DatanodeDescriptor clientNode
) throws IOException {
@@ -191,4 +205,27 @@
blocks, getPermissionStatus(),
clientName, clientMachine, clientNode);
}
+
+ /**
+ * Get the last block of the file.
+ * Make sure it has the right type.
+ */
+ <T extends BlockInfo> T getLastBlock() throws IOException {
+ if (blocks == null || blocks.length == 0)
+ return null;
+ T returnBlock = null;
+ try {
+ @SuppressWarnings("unchecked") // ClassCastException is caught below
+ T tBlock = (T)blocks[blocks.length - 1];
+ returnBlock = tBlock;
+ } catch(ClassCastException cce) {
+ throw new IOException("Unexpected last block type: "
+ + blocks[blocks.length - 1].getClass().getSimpleName());
+ }
+ return returnBlock;
+ }
+
+ int numBlocks() {
+ return blocks == null ? 0 : blocks.length;
+ }
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Wed Sep 30 22:57:30 2009
@@ -21,16 +21,13 @@
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
class INodeFileUnderConstruction extends INodeFile {
- final String clientName; // lease holder
+ private String clientName; // lease holder
private final String clientMachine;
private final DatanodeDescriptor clientNode; // if client is a cluster node too.
-
- private int primaryNodeIndex = -1; //the node working on lease recovery
- private DatanodeDescriptor[] targets = null; //locations for last block
- private long lastRecoveryTime = 0;
INodeFileUnderConstruction(PermissionStatus permissions,
short replication,
@@ -67,6 +64,10 @@
return clientName;
}
+ void setClientName(String clientName) {
+ this.clientName = clientName;
+ }
+
String getClientMachine() {
return clientMachine;
}
@@ -83,15 +84,6 @@
return true;
}
- DatanodeDescriptor[] getTargets() {
- return targets;
- }
-
- void setTargets(DatanodeDescriptor[] targets) {
- this.targets = targets;
- this.primaryNodeIndex = -1;
- }
-
//
// converts a INodeFileUnderConstruction into a INodeFile
// use the modification time as the access time
@@ -108,10 +100,10 @@
}
/**
- * remove a block from the block list. This block should be
+ * Remove a block from the block list. This block should be
* the last one on the list.
*/
- void removeBlock(Block oldblock) throws IOException {
+ void removeLastBlock(Block oldblock) throws IOException {
if (blocks == null) {
throw new IOException("Trying to delete non-existant block " + oldblock);
}
@@ -124,57 +116,24 @@
BlockInfo[] newlist = new BlockInfo[size_1];
System.arraycopy(blocks, 0, newlist, 0, size_1);
blocks = newlist;
-
- // Remove the block locations for the last block.
- targets = null;
- }
-
- synchronized void setLastBlock(BlockInfo newblock, DatanodeDescriptor[] newtargets
- ) throws IOException {
- if (blocks == null) {
- throw new IOException("Trying to update non-existant block (newblock="
- + newblock + ")");
- }
- blocks[blocks.length - 1] = newblock;
- setTargets(newtargets);
- lastRecoveryTime = 0;
}
/**
- * Initialize lease recovery for this object
+ * Convert the last block of the file to an under-construction block.
+ * Set its locations.
*/
- void assignPrimaryDatanode() {
- //assign the first alive datanode as the primary datanode
-
- if (targets.length == 0) {
- NameNode.stateChangeLog.warn("BLOCK*"
- + " INodeFileUnderConstruction.initLeaseRecovery:"
- + " No blocks found, lease removed.");
- }
-
- int previous = primaryNodeIndex;
- //find an alive datanode beginning from previous
- for(int i = 1; i <= targets.length; i++) {
- int j = (previous + i)%targets.length;
- if (targets[j].isAlive) {
- DatanodeDescriptor primary = targets[primaryNodeIndex = j];
- primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets);
- NameNode.stateChangeLog.info("BLOCK* " + blocks[blocks.length - 1]
- + " recovery started, primary=" + primary);
- return;
- }
- }
- }
-
- /**
- * Update lastRecoveryTime if expired.
- * @return true if lastRecoveryTimeis updated.
- */
- synchronized boolean setLastRecoveryTime(long now) {
- boolean expired = now - lastRecoveryTime > NameNode.LEASE_RECOVER_PERIOD;
- if (expired) {
- lastRecoveryTime = now;
- }
- return expired;
+ BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
+ DatanodeDescriptor[] targets)
+ throws IOException {
+ if (blocks == null || blocks.length == 0) {
+ throw new IOException("Trying to update non-existant block. " +
+ "File is empty.");
+ }
+ BlockInfoUnderConstruction ucBlock =
+ lastBlock.convertToBlockUnderConstruction(
+ BlockUCState.UNDER_CONSTRUCTION, targets);
+ ucBlock.setINode(this);
+ setBlock(numBlocks()-1, ucBlock);
+ return ucBlock;
}
}
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Wed Sep 30 22:57:30 2009
@@ -102,7 +102,7 @@
/**
* Adds (or re-adds) the lease for the specified file.
*/
- synchronized void addLease(String holder, String src) {
+ synchronized Lease addLease(String holder, String src) {
Lease lease = getLease(holder);
if (lease == null) {
lease = new Lease(holder);
@@ -113,6 +113,7 @@
}
sortedLeasesByPath.put(src, lease);
lease.paths.add(src);
+ return lease;
}
/**
@@ -143,11 +144,22 @@
}
/**
+ * Reassign lease for file src to the new holder.
+ */
+ synchronized Lease reassignLease(Lease lease, String src, String newHolder) {
+ assert newHolder != null : "new lease holder is null";
+ if (lease != null) {
+ removeLease(lease, src);
+ }
+ return addLease(newHolder, src);
+ }
+
+ /**
* Finds the pathname for the specified pendingFile
*/
synchronized String findPath(INodeFileUnderConstruction pendingFile
) throws IOException {
- Lease lease = getLease(pendingFile.clientName);
+ Lease lease = getLease(pendingFile.getClientName());
if (lease != null) {
String src = lease.findPath(pendingFile);
if (src != null) {
@@ -265,7 +277,11 @@
Collection<String> getPaths() {
return paths;
}
-
+
+ String getHolder() {
+ return holder;
+ }
+
void replacePath(String oldpath, String newpath) {
paths.remove(oldpath);
paths.add(newpath);
@@ -376,7 +392,13 @@
oldest.getPaths().toArray(leasePaths);
for(String p : leasePaths) {
try {
- fsnamesystem.internalReleaseLease(oldest, p);
+ if(fsnamesystem.internalReleaseLease(oldest, p, "HDFS_NameNode")) {
+ LOG.info("Lease recovery for file " + p +
+ " is complete. File closed.");
+ removing.add(p);
+ } else
+ LOG.info("Started block recovery for file " + p +
+ " lease " + oldest);
} catch (IOException e) {
LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
removing.add(p);
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Sep 30 22:57:30 2009
@@ -610,11 +610,12 @@
/**
*/
- public LocatedBlock addBlock(String src,
- String clientName) throws IOException {
+ public LocatedBlock addBlock(String src, String clientName,
+ Block previous) throws IOException {
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
+src+" for "+clientName);
- LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, clientName);
+ LocatedBlock locatedBlock =
+ namesystem.getAdditionalBlock(src, clientName, previous);
if (locatedBlock != null)
myMetrics.numAddBlockOps.inc();
return locatedBlock;
@@ -633,9 +634,11 @@
}
/** {@inheritDoc} */
- public boolean complete(String src, String clientName) throws IOException {
+ public boolean complete(String src, String clientName,
+ Block last) throws IOException {
stateChangeLog.debug("*DIR* NameNode.complete: " + src + " for " + clientName);
- CompleteFileStatus returnCode = namesystem.completeFile(src, clientName);
+ CompleteFileStatus returnCode =
+ namesystem.completeFile(src, clientName, last);
if (returnCode == CompleteFileStatus.STILL_WAITING) {
return false;
} else if (returnCode == CompleteFileStatus.COMPLETE_SUCCESS) {
@@ -664,10 +667,20 @@
}
/** {@inheritDoc} */
- public long nextGenerationStamp(Block block) throws IOException{
- return namesystem.nextGenerationStampForBlock(block);
+ @Override
+ public LocatedBlock updateBlockForPipeline(Block block, String clientName)
+ throws IOException {
+ return namesystem.updateBlockForPipeline(block, clientName);
}
+
+ @Override
+ public void updatePipeline(String clientName, Block oldBlock,
+ Block newBlock, DatanodeID[] newNodes)
+ throws IOException {
+ namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
+ }
+
/** {@inheritDoc} */
public void commitBlockSynchronization(Block block,
long newgenerationstamp, long newlength,
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Wed Sep 30 22:57:30 2009
@@ -35,10 +35,9 @@
**********************************************************************/
public interface DatanodeProtocol extends VersionedProtocol {
/**
- * 20: SendHeartbeat may return KeyUpdateCommand
- * Register returns access keys inside DatanodeRegistration object
+ * 23: nextGenerationStamp() removed.
*/
- public static final long versionID = 20L;
+ public static final long versionID = 23L;
// error code
final static int NOTIFY = 0;
@@ -143,12 +142,6 @@
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
/**
- * @return the next GenerationStamp to be associated with the specified
- * block.
- */
- public long nextGenerationStamp(Block block) throws IOException;
-
- /**
* Commit block synchronization in lease recovery
*/
public void commitBlockSynchronization(Block block,
Modified: hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java?rev=820487&r1=820486&r2=820487&view=diff
==============================================================================
--- hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java Wed Sep 30 22:57:30 2009
@@ -23,6 +23,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.ipc.VersionedProtocol;
/** An inter-datanode protocol for updating generation stamp
@@ -31,17 +32,36 @@
public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class);
/**
- * 3: added a finalize parameter to updateBlock
+ * 4: initReplicaRecovery(), updateReplicaUnderRecovery() added.
*/
- public static final long versionID = 3L;
+ public static final long versionID = 4L;
/** @return the BlockMetaDataInfo of a block;
* null if the block is not found
*/
+ @Deprecated
BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
/**
* Update the block to the new generation stamp and length.
*/
- void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException;
+ @Deprecated
+ void updateBlock(Block oldblock, Block newblock, boolean finalize)
+ throws IOException;
+
+ /**
+ * Initialize a replica recovery.
+ *
+ * @return actual state of the replica on this data-node or
+ * null if data-node does not have the replica.
+ */
+ ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+ throws IOException;
+
+ /**
+ * Update replica with the new generation stamp and length.
+ */
+ Block updateReplicaUnderRecovery(Block oldBlock,
+ long recoveryId,
+ long newLength) throws IOException;
}