You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/10/01 01:39:33 UTC
svn commit: r820497 [5/7] - in /hadoop/hdfs/trunk: ./
.eclipse.templates/.launches/ src/contrib/hdfsproxy/
src/docs/src/documentation/content/xdocs/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apach...
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Sep 30 23:39:30 2009
@@ -23,6 +23,7 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -563,13 +564,18 @@
}
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
long totalSize = 0;
+ BlockInfo curBlock;
while(totalSize<size && iter.hasNext()) {
- totalSize += addBlock(iter.next(), results);
+ curBlock = iter.next();
+ if(!curBlock.isComplete()) continue;
+ totalSize += addBlock(curBlock, results);
}
if(totalSize<size) {
iter = node.getBlockIterator(); // start from the beginning
for(int i=0; i<startBlock&&totalSize<size; i++) {
- totalSize += addBlock(iter.next(), results);
+ curBlock = iter.next();
+ if(!curBlock.isComplete()) continue;
+ totalSize += addBlock(curBlock, results);
}
}
@@ -707,14 +713,46 @@
if (doAccessTime && isAccessTimeSupported()) {
dir.setTimes(src, inode, -1, now(), false);
}
- final Block[] blocks = inode.getBlocks();
+ final BlockInfo[] blocks = inode.getBlocks();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
+ }
if (blocks == null) {
return null;
}
- final List<LocatedBlock> results = blocks.length == 0?
- new ArrayList<LocatedBlock>(0):
- blockManager.getBlockLocations(blocks, offset, length, Integer.MAX_VALUE);
- return inode.createLocatedBlocks(results);
+
+ if (blocks.length == 0) {
+ return new LocatedBlocks(0, inode.isUnderConstruction(),
+ Collections.<LocatedBlock>emptyList(), null, false);
+ } else {
+ final long n = inode.computeFileSize(false);
+ final List<LocatedBlock> locatedblocks = blockManager.getBlockLocations(
+ blocks, offset, length, Integer.MAX_VALUE);
+ final BlockInfo last = inode.getLastBlock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("last = " + last);
+ }
+
+ if (!last.isComplete()) {
+ return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+ blockManager.getBlockLocation(last, n), false);
+ }
+ else {
+ return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
+ blockManager.getBlockLocation(last, n-last.getNumBytes()), true);
+ }
+ }
+ }
+
+ /** Create a LocatedBlock. */
+ LocatedBlock createLocatedBlock(final Block b, final DatanodeInfo[] locations,
+ final long offset, final boolean corrupt) throws IOException {
+ final LocatedBlock lb = new LocatedBlock(b, locations, offset, corrupt);
+ if (isAccessTokenEnabled) {
+ lb.setAccessToken(accessTokenHandler.generateToken(b.getBlockId(),
+ EnumSet.of(AccessTokenHandler.AccessMode.READ)));
+ }
+ return lb;
}
/**
@@ -912,40 +950,45 @@
// If the file is under construction , then it must be in our
// leases. Find the appropriate lease record.
//
- Lease lease = leaseManager.getLease(holder);
- //
- // We found the lease for this file. And surprisingly the original
- // holder is trying to recreate this file. This should never occur.
- //
- if (lease != null) {
+ Lease lease = leaseManager.getLeaseByPath(src);
+ if (lease == null) {
throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because current leaseholder is trying to recreate file.");
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ " because pendingCreates is non-null but no leases found.");
}
//
- // Find the original holder.
+ // We found the lease for this file. And surprisingly the original
+ // holder is trying to recreate this file. This should never occur.
//
- lease = leaseManager.getLease(pendingFile.clientName);
- if (lease == null) {
+ if (lease.getHolder().equals(holder)) {
throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because pendingCreates is non-null but no leases found.");
- }
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ " because current leaseholder is trying to recreate file.");
+ }
+ assert lease.getHolder().equals(pendingFile.getClientName()) :
+ "Current lease holder " + lease.getHolder() +
+ " does not match file creator " + pendingFile.getClientName();
//
+ // Current lease holder is different from the requester.
// If the original holder has not renewed in the last SOFTLIMIT
- // period, then start lease recovery.
+ // period, then start lease recovery, otherwise fail.
//
if (lease.expiredSoftLimit()) {
LOG.info("startFile: recover lease " + lease + ", src=" + src);
- internalReleaseLease(lease, src);
- }
- throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- ", because this file is already being created by " +
- pendingFile.getClientName() +
- " on " + pendingFile.getClientMachine());
+ boolean isClosed = internalReleaseLease(lease, src, null);
+ if(!isClosed)
+ throw new RecoveryInProgressException(
+ "Failed to close file " + src +
+ ". Lease recovery is in progress. Try again later.");
+
+ } else
+ throw new AlreadyBeingCreatedException("failed to create file " +
+ src + " for " + holder + " on client " + clientMachine +
+ ", because this file is already being created by " +
+ pendingFile.getClientName() +
+ " on " + pendingFile.getClientMachine());
}
try {
@@ -998,7 +1041,7 @@
clientMachine,
clientNode);
dir.replaceNode(src, node, cons);
- leaseManager.addLease(cons.clientName, src);
+ leaseManager.addLease(cons.getClientName(), src);
} else {
// Now we can add the name to the filesystem. This file has no
@@ -1014,7 +1057,7 @@
throw new IOException("DIR* NameSystem.startFile: " +
"Unable to add file to namespace.");
}
- leaseManager.addLease(newNode.clientName, src);
+ leaseManager.addLease(newNode.getClientName(), src);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+"add "+src+" to namespace for "+holder);
@@ -1048,40 +1091,36 @@
LocatedBlock lb = null;
synchronized (this) {
INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src);
-
- BlockInfo[] blocks = file.getBlocks();
- if (blocks != null && blocks.length > 0) {
- BlockInfo last = blocks[blocks.length-1];
- // this is a redundant search in blocksMap
- // should be resolved by the new implementation of append
- BlockInfo storedBlock = blockManager.getStoredBlock(last);
- assert last == storedBlock : "last block should be in the blocksMap";
- if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) {
+ BlockInfo lastBlock = file.getLastBlock();
+ if (lastBlock != null) {
+ assert lastBlock == blockManager.getStoredBlock(lastBlock) :
+ "last block of the file is not in blocksMap";
+ if (file.getPreferredBlockSize() > lastBlock.getNumBytes()) {
long fileLength = file.computeContentSummary().getLength();
- DatanodeDescriptor[] targets = blockManager.getNodes(storedBlock);
+ DatanodeDescriptor[] targets = blockManager.getNodes(lastBlock);
// remove the replica locations of this block from the node
for (int i = 0; i < targets.length; i++) {
- targets[i].removeBlock(storedBlock);
+ targets[i].removeBlock(lastBlock);
}
- // set the locations of the last block in the lease record
- file.setLastBlock(storedBlock, targets);
+ // convert last block to under-construction and set its locations
+ blockManager.convertLastBlockToUnderConstruction(file, targets);
- lb = new LocatedBlock(last, targets,
- fileLength-storedBlock.getNumBytes());
+ lb = new LocatedBlock(lastBlock, targets,
+ fileLength-lastBlock.getNumBytes());
if (isAccessTokenEnabled) {
lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock()
.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
}
// Remove block from replication queue.
- blockManager.updateNeededReplications(last, 0, 0);
+ blockManager.updateNeededReplications(lastBlock, 0, 0);
// remove this block from the list of pending blocks to be deleted.
// This reduces the possibility of triggering HADOOP-1349.
//
for (DatanodeDescriptor dd : targets) {
String datanodeId = dd.getStorageID();
- blockManager.removeFromInvalidates(datanodeId, last);
+ blockManager.removeFromInvalidates(datanodeId, lastBlock);
}
}
}
@@ -1115,7 +1154,8 @@
* client to "try again later".
*/
public LocatedBlock getAdditionalBlock(String src,
- String clientName
+ String clientName,
+ Block previous
) throws IOException {
long fileLength, blockSize;
int replication;
@@ -1135,6 +1175,9 @@
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
+ // commit the last block
+ blockManager.commitLastBlock(pendingFile, previous);
+
//
// If we fail this, bad things happen!
//
@@ -1168,9 +1211,11 @@
throw new NotReplicatedYetException("Not replicated yet:" + src);
}
+ // complete the penultimate block
+ blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
+
// allocate new block record block locations in INode.
- newBlock = allocateBlock(src, pathINodes);
- pendingFile.setTargets(targets);
+ newBlock = allocateBlock(src, pathINodes, targets);
for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
@@ -1250,15 +1295,18 @@
COMPLETE_SUCCESS
}
- public CompleteFileStatus completeFile(String src, String holder) throws IOException {
- CompleteFileStatus status = completeFileInternal(src, holder);
+ public CompleteFileStatus completeFile(String src,
+ String holder,
+ Block last) throws IOException {
+ CompleteFileStatus status = completeFileInternal(src, holder, last);
getEditLog().logSync();
return status;
}
-
- private synchronized CompleteFileStatus completeFileInternal(String src,
- String holder) throws IOException {
+ private synchronized CompleteFileStatus completeFileInternal(
+ String src,
+ String holder,
+ Block last) throws IOException {
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
if (isInSafeMode())
throw new SafeModeException("Cannot complete file " + src, safeMode);
@@ -1279,7 +1327,12 @@
("from " + pendingFile.getClientMachine()))
);
return CompleteFileStatus.OPERATION_FAILED;
- } else if (!checkFileProgress(pendingFile, true)) {
+ }
+
+ // commit the last block
+ blockManager.commitLastBlock(pendingFile, last);
+
+ if (!checkFileProgress(pendingFile, true)) {
return CompleteFileStatus.STILL_WAITING;
}
@@ -1312,13 +1365,15 @@
* @param inodes INode representing each of the components of src.
* <code>inodes[inodes.length-1]</code> is the INode for the file.
*/
- private Block allocateBlock(String src, INode[] inodes) throws IOException {
+ private Block allocateBlock(String src,
+ INode[] inodes,
+ DatanodeDescriptor targets[]) throws IOException {
Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0);
while(isValidBlock(b)) {
b.setBlockId(FSNamesystem.randBlockId.nextLong());
}
b.setGenerationStamp(getGenerationStamp());
- b = dir.addBlock(src, inodes, b);
+ b = dir.addBlock(src, inodes, b, targets);
NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
+src+ ". "+b);
return b;
@@ -1329,12 +1384,12 @@
* replicated. If not, return false. If checkall is true, then check
* all blocks, otherwise check only penultimate block.
*/
- synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
+ synchronized boolean checkFileProgress(INodeFile v, boolean checkall) throws IOException {
if (checkall) {
//
// check all blocks of the file.
//
- for (Block block: v.getBlocks()) {
+ for (BlockInfo block: v.getBlocks()) {
if (!blockManager.checkMinReplication(block)) {
return false;
}
@@ -1343,7 +1398,7 @@
//
// check the penultimate block of this file
//
- Block b = v.getPenultimateBlock();
+ BlockInfo b = v.getPenultimateBlock();
if (b != null && !blockManager.checkMinReplication(b)) {
return false;
}
@@ -1614,20 +1669,31 @@
* Move a file that is being written to be immutable.
* @param src The filename
* @param lease The lease for the client creating the file
- */
- void internalReleaseLease(Lease lease, String src) throws IOException {
+ * @param recoveryLeaseHolder reassign lease to this holder if the last block
+ * needs recovery; keep current holder if null.
+ * @throws AlreadyBeingCreatedException if file is waiting to achieve minimal
+ * replication;<br>
+ * RecoveryInProgressException if lease recovery is in progress.<br>
+ * IOException in case of an error.
+ * @return true if file has been successfully finalized and closed or
+ * false if block recovery has been initiated
+ */
+ boolean internalReleaseLease(
+ Lease lease, String src, String recoveryLeaseHolder)
+ throws AlreadyBeingCreatedException,
+ IOException {
LOG.info("Recovering lease=" + lease + ", src=" + src);
INodeFile iFile = dir.getFileINode(src);
if (iFile == null) {
- final String message = "DIR* NameSystem.internalReleaseCreate: "
+ final String message = "DIR* NameSystem.internalReleaseLease: "
+ "attempt to release a create lock on "
+ src + " file does not exist.";
NameNode.stateChangeLog.warn(message);
throw new IOException(message);
}
if (!iFile.isUnderConstruction()) {
- final String message = "DIR* NameSystem.internalReleaseCreate: "
+ final String message = "DIR* NameSystem.internalReleaseLease: "
+ "attempt to release a create lock on "
+ src + " but file is already closed.";
NameNode.stateChangeLog.warn(message);
@@ -1635,39 +1701,123 @@
}
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
+ int nrBlocks = pendingFile.numBlocks();
+ BlockInfo[] blocks = pendingFile.getBlocks();
- // Initialize lease recovery for pendingFile. If there are no blocks
- // associated with this file, then reap lease immediately. Otherwise
- // renew the lease and trigger lease recovery.
- if (pendingFile.getTargets() == null ||
- pendingFile.getTargets().length == 0) {
- if (pendingFile.getBlocks().length == 0) {
+ int nrCompleteBlocks;
+ BlockInfo curBlock = null;
+ for(nrCompleteBlocks = 0; nrCompleteBlocks < nrBlocks; nrCompleteBlocks++) {
+ curBlock = blocks[nrCompleteBlocks];
+ if(!curBlock.isComplete())
+ break;
+ assert blockManager.checkMinReplication(curBlock) :
+ "A COMPLETE block is not minimally replicated in " + src;
+ }
+
+ // If there are no incomplete blocks associated with this file,
+ // then reap lease immediately and close the file.
+ if(nrCompleteBlocks == nrBlocks) {
+ finalizeINodeFileUnderConstruction(src, pendingFile);
+ NameNode.stateChangeLog.warn("BLOCK*"
+ + " internalReleaseLease: All existing blocks are COMPLETE,"
+ + " lease removed, file closed.");
+ return true; // closed!
+ }
+
+ // Only the last and the penultimate blocks may be in non COMPLETE state.
+ // If the penultimate block is not COMPLETE, then it must be COMMITTED.
+ if(nrCompleteBlocks < nrBlocks - 2 ||
+ nrCompleteBlocks == nrBlocks - 2 &&
+ curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
+ final String message = "DIR* NameSystem.internalReleaseLease: "
+ + "attempt to release a create lock on "
+ + src + " but file is already closed.";
+ NameNode.stateChangeLog.warn(message);
+ throw new IOException(message);
+ }
+
+ // no we know that the last block is not COMPLETE, and
+ // that the penultimate block if exists is either COMPLETE or COMMITTED
+ BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock();
+ BlockUCState lastBlockState = lastBlock.getBlockUCState();
+ BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
+ BlockUCState penultimateBlockState = (penultimateBlock == null ?
+ BlockUCState.COMPLETE : penultimateBlock.getBlockUCState());
+ assert penultimateBlockState == BlockUCState.COMPLETE ||
+ penultimateBlockState == BlockUCState.COMMITTED :
+ "Unexpected state of penultimate block in " + src;
+
+ switch(lastBlockState) {
+ case COMPLETE:
+ assert false : "Already checked that the last block is incomplete";
+ break;
+ case COMMITTED:
+ // Close file if committed blocks are minimally replicated
+ if(blockManager.checkMinReplication(penultimateBlock) &&
+ blockManager.checkMinReplication(lastBlock)) {
finalizeINodeFileUnderConstruction(src, pendingFile);
NameNode.stateChangeLog.warn("BLOCK*"
- + " internalReleaseLease: No blocks found, lease removed.");
- return;
- }
- // setup the Inode.targets for the last block from the blockManager
- //
- BlockInfo[] blocks = pendingFile.getBlocks();
- BlockInfo last = blocks[blocks.length-1];
- DatanodeDescriptor[] targets = blockManager.getNodes(last);
- pendingFile.setTargets(targets);
+ + " internalReleaseLease: Committed blocks are minimally replicated,"
+ + " lease removed, file closed.");
+ return true; // closed!
+ }
+ // Cannot close file right now, since some blocks
+ // are not yet minimally replicated.
+ // This may potentially cause infinite loop in lease recovery
+ // if there are no valid replicas on data-nodes.
+ String message = "DIR* NameSystem.internalReleaseLease: " +
+ "Failed to release lease for file " + src +
+ ". Committed blocks are waiting to be minimally replicated." +
+ " Try again later.";
+ NameNode.stateChangeLog.warn(message);
+ throw new AlreadyBeingCreatedException(message);
+ case UNDER_CONSTRUCTION:
+ case UNDER_RECOVERY:
+ // setup the last block locations from the blockManager if not known
+ if(lastBlock.getNumExpectedLocations() == 0)
+ lastBlock.setExpectedLocations(blockManager.getNodes(lastBlock));
+ // start recovery of the last block for this file
+ long blockRecoveryId = nextGenerationStamp();
+ lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
+ lastBlock.initializeBlockRecovery(blockRecoveryId);
+ leaseManager.renewLease(lease);
+ // Cannot close file right now, since the last block requires recovery.
+ // This may potentially cause infinite loop in lease recovery
+ // if there are no valid replicas on data-nodes.
+ NameNode.stateChangeLog.warn(
+ "DIR* NameSystem.internalReleaseLease: " +
+ "File " + src + " has not been closed." +
+ " Lease recovery is in progress. " +
+ "RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
+ break;
}
- // start lease recovery of the last block for this file.
- pendingFile.assignPrimaryDatanode();
- leaseManager.renewLease(lease);
+ return false;
+ }
+
+ Lease reassignLease(Lease lease, String src, String newHolder,
+ INodeFileUnderConstruction pendingFile) {
+ if(newHolder == null)
+ return lease;
+ pendingFile.setClientName(newHolder);
+ return leaseManager.reassignLease(lease, src, newHolder);
}
- private void finalizeINodeFileUnderConstruction(String src,
+
+ private void finalizeINodeFileUnderConstruction(
+ String src,
INodeFileUnderConstruction pendingFile) throws IOException {
- leaseManager.removeLease(pendingFile.clientName, src);
+ leaseManager.removeLease(pendingFile.getClientName(), src);
+
+ // complete the penultimate block
+ blockManager.completeBlock(pendingFile, pendingFile.numBlocks()-2);
// The file is no longer pending.
- // Create permanent INode, update blockmap
+ // Create permanent INode, update blocks
INodeFile newFile = pendingFile.convertToInodeFile();
dir.replaceNode(src, pendingFile, newFile);
+ // complete last block of the file
+ blockManager.completeBlock(newFile, newFile.numBlocks()-1);
// close file and persist block allocations for this file
dir.closeFile(src, newFile);
@@ -1690,11 +1840,20 @@
throw new IOException("Block (=" + lastblock + ") not found");
}
INodeFile iFile = oldblockinfo.getINode();
- if (!iFile.isUnderConstruction()) {
+ if (!iFile.isUnderConstruction() || oldblockinfo.isComplete()) {
throw new IOException("Unexpected block (=" + lastblock
+ ") since the file (=" + iFile.getLocalName()
+ ") is not under construction");
}
+
+ long recoveryId =
+ ((BlockInfoUnderConstruction)oldblockinfo).getBlockRecoveryId();
+ if(recoveryId != newgenerationstamp) {
+ throw new IOException("The recovery id " + newgenerationstamp
+ + " does not match current recovery id "
+ + recoveryId + " for block " + lastblock);
+ }
+
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
@@ -1703,12 +1862,15 @@
blockManager.removeBlockFromMap(oldblockinfo);
if (deleteblock) {
- pendingFile.removeBlock(lastblock);
+ pendingFile.removeLastBlock(lastblock);
}
else {
// update last block, construct newblockinfo and add it to the blocks map
lastblock.set(lastblock.getBlockId(), newlength, newgenerationstamp);
- final BlockInfo newblockinfo = blockManager.addINode(lastblock, pendingFile);
+ BlockInfoUnderConstruction newblockinfo =
+ new BlockInfoUnderConstruction(
+ lastblock, pendingFile.getReplication());
+ blockManager.addINode(newblockinfo, pendingFile);
// find the DatanodeDescriptor objects
// There should be no locations in the blockManager till now because the
@@ -1727,11 +1889,9 @@
for (int i = 0; i < descriptors.length; i++) {
descriptors[i].addBlock(newblockinfo);
}
- pendingFile.setLastBlock(newblockinfo, null);
- } else {
- // add locations into the INodeUnderConstruction
- pendingFile.setLastBlock(newblockinfo, descriptors);
}
+ // add locations into the INodeUnderConstruction
+ pendingFile.setLastBlock(newblockinfo, descriptors);
}
// If this commit does not want to close the file, persist
@@ -1745,7 +1905,10 @@
LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
return;
}
-
+
+ // commit the last block
+ blockManager.commitLastBlock(pendingFile, lastblock);
+
//remove lease, close file
finalizeINodeFileUnderConstruction(src, pendingFile);
getEditLog().logSync();
@@ -3342,7 +3505,7 @@
void setBlockTotal() {
if (safeMode == null)
return;
- safeMode.setBlockTotal((int)getBlocksTotal());
+ safeMode.setBlockTotal((int)getCompleteBlocksTotal());
}
/**
@@ -3353,6 +3516,33 @@
}
/**
+ * Get the total number of COMPLETE blocks in the system.
+ * For safe mode only complete blocks are counted.
+ */
+ long getCompleteBlocksTotal() {
+ // Calculate number of blocks under construction
+ long numUCBlocks = 0;
+ for (Lease lease : leaseManager.getSortedLeases()) {
+ for(String path : lease.getPaths()) {
+ INode node = dir.getFileINode(path);
+ assert node != null : "Found a lease for nonexisting file.";
+ assert node.isUnderConstruction() :
+ "Found a lease for file that is not under construction.";
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
+ BlockInfo[] blocks = cons.getBlocks();
+ if(blocks == null)
+ continue;
+ for(BlockInfo b : blocks) {
+ if(!b.isComplete())
+ numUCBlocks++;
+ }
+ }
+ }
+ LOG.info("Number of blocks under construction: " + numUCBlocks);
+ return getBlocksTotal() - numUCBlocks;
+ }
+
+ /**
* Enter safe mode manually.
* @throws IOException
*/
@@ -3671,29 +3861,129 @@
return gs;
}
+ private INodeFileUnderConstruction checkUCBlock(Block block, String clientName)
+ throws IOException {
+ // check safe mode
+ if (isInSafeMode())
+ throw new SafeModeException("Cannot get a new generation stamp and an " +
+ "access token for block " + block, safeMode);
+
+ // check stored block state
+ BlockInfo storedBlock = blockManager.getStoredBlock(block);
+ if (storedBlock == null ||
+ storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) {
+ throw new IOException(block +
+ " does not exist or is not under Construction" + storedBlock);
+ }
+
+ // check file inode
+ INodeFile file = storedBlock.getINode();
+ if (file==null || !file.isUnderConstruction()) {
+ throw new IOException("The file " + storedBlock +
+ " is belonged to does not exist or it is not under construction.");
+ }
+
+ // check lease
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
+ if (clientName == null || !clientName.equals(pendingFile.getClientName())) {
+ throw new LeaseExpiredException("Lease mismatch: " + block +
+ " is accessed by a non lease holder " + clientName);
+ }
+
+ return pendingFile;
+ }
+
/**
- * Verifies that the block is associated with a file that has a lease.
- * Increments, logs and then returns the stamp
+ * Get a new generation stamp together with an access token for
+ * a block under construction
+ *
+ * This method is called for recovering a failed pipeline or setting up
+ * a pipeline to append to a block.
+ *
+ * @param block a block
+ * @param clientName the name of a client
+ * @return a located block with a new generation stamp and an access token
+ * @throws IOException if any error occurs
+ */
+ synchronized LocatedBlock updateBlockForPipeline(Block block,
+ String clientName) throws IOException {
+ // check vadility of parameters
+ checkUCBlock(block, clientName);
+
+ // get a new generation stamp and an access token
+ block.setGenerationStamp(nextGenerationStamp());
+ LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
+ if (isAccessTokenEnabled) {
+ locatedBlock.setAccessToken(accessTokenHandler.generateToken(
+ block.getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+ }
+ return locatedBlock;
+ }
+
+
+ /**
+ * Update a pipeline for a block under construction
+ *
+ * @param clientName the name of the client
+ * @param oldblock and old block
+ * @param newBlock a new block with a new generation stamp and length
+ * @param newNodes datanodes in the pipeline
+ * @throws IOException if any error occurs
*/
- synchronized long nextGenerationStampForBlock(Block block) throws IOException {
- BlockInfo storedBlock = blockManager.getStoredBlock(block);
- if (storedBlock == null) {
- String msg = block + " is already commited, storedBlock == null.";
- LOG.info(msg);
+ synchronized void updatePipeline(String clientName, Block oldBlock,
+ Block newBlock, DatanodeID[] newNodes)
+ throws IOException {
+ LOG.info("updatePipeline(block=" + oldBlock
+ + ", newGenerationStamp=" + newBlock.getGenerationStamp()
+ + ", newLength=" + newBlock.getNumBytes()
+ + ", newNodes=" + Arrays.asList(newNodes)
+ + ", clientName=" + clientName
+ + ")");
+
+ // check the vadility of the block and lease holder name
+ final INodeFileUnderConstruction pendingFile =
+ checkUCBlock(oldBlock, clientName);
+ final BlockInfo oldblockinfo = pendingFile.getLastBlock();
+
+ // check new GS & length: this is not expected
+ if (newBlock.getGenerationStamp() <= oldblockinfo.getGenerationStamp() ||
+ newBlock.getNumBytes() < oldblockinfo.getNumBytes()) {
+ String msg = "Update " + oldBlock + " (len = " +
+ oldblockinfo.getNumBytes() + ") to an older state: " + newBlock +
+ " (len = " + newBlock.getNumBytes() +")";
+ LOG.warn(msg);
throw new IOException(msg);
}
- INodeFile fileINode = storedBlock.getINode();
- if (!fileINode.isUnderConstruction()) {
- String msg = block + " is already commited, !fileINode.isUnderConstruction().";
- LOG.info(msg);
- throw new IOException(msg);
+
+ // Remove old block from blocks map. This always have to be done
+ // because the generation stamp of this block is changing.
+ blockManager.removeBlockFromMap(oldblockinfo);
+
+ // update last block, construct newblockinfo and add it to the blocks map
+ BlockInfoUnderConstruction newblockinfo =
+ new BlockInfoUnderConstruction(
+ newBlock, pendingFile.getReplication());
+ blockManager.addINode(newblockinfo, pendingFile);
+
+ // find the DatanodeDescriptor objects
+ DatanodeDescriptor[] descriptors = null;
+ if (newNodes.length > 0) {
+ descriptors = new DatanodeDescriptor[newNodes.length];
+ for(int i = 0; i < newNodes.length; i++) {
+ descriptors[i] = getDatanode(newNodes[i]);
+ }
}
- if (!((INodeFileUnderConstruction)fileINode).setLastRecoveryTime(now())) {
- String msg = block + " is beening recovered, ignoring this request.";
- LOG.info(msg);
- throw new IOException(msg);
+ // add locations into the INodeUnderConstruction
+ pendingFile.setLastBlock(newblockinfo, descriptors);
+
+ // persist blocks only if append is supported
+ String src = leaseManager.findPath(pendingFile);
+ if (supportAppends) {
+ dir.persistBlocks(src, pendingFile);
+ getEditLog().logSync();
}
- return nextGenerationStamp();
+ LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
+ return;
}
// rename was successful. If any part of the renamed subtree had
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Wed Sep 30 23:39:30 2009
@@ -25,8 +25,6 @@
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.permission.*;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
/**
* We keep an in-memory representation of the file/block hierarchy.
@@ -423,10 +421,4 @@
}
return null;
}
-
-
- LocatedBlocks createLocatedBlocks(List<LocatedBlock> blocks) {
- return new LocatedBlocks(computeContentSummary().getLength(), blocks,
- isUnderConstruction());
- }
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Wed Sep 30 23:39:30 2009
@@ -122,16 +122,29 @@
/** {@inheritDoc} */
long[] computeContentSummary(long[] summary) {
- long bytes = 0;
- for(Block blk : blocks) {
- bytes += blk.getNumBytes();
- }
- summary[0] += bytes;
+ summary[0] += computeFileSize(true);
summary[1]++;
summary[3] += diskspaceConsumed();
return summary;
}
+ /** Compute file size.
+ * May or may not include BlockInfoUnderConstruction.
+ */
+ long computeFileSize(boolean includesBlockInfoUnderConstruction) {
+ if (blocks == null || blocks.length == 0) {
+ return 0;
+ }
+ final int last = blocks.length - 1;
+ //check if the last block is BlockInfoUnderConstruction
+ long bytes = blocks[last] instanceof BlockInfoUnderConstruction
+ && !includesBlockInfoUnderConstruction?
+ 0: blocks[last].getNumBytes();
+ for(int i = 0; i < last; i++) {
+ bytes += blocks[i].getNumBytes();
+ }
+ return bytes;
+ }
@Override
@@ -173,13 +186,14 @@
/**
* Return the penultimate allocated block for this file.
*/
- Block getPenultimateBlock() {
+ BlockInfo getPenultimateBlock() {
if (blocks == null || blocks.length <= 1) {
return null;
}
return blocks[blocks.length - 2];
}
+ // SHV !!! this is not used anywhere - remove
INodeFileUnderConstruction toINodeFileUnderConstruction(
String clientName, String clientMachine, DatanodeDescriptor clientNode
) throws IOException {
@@ -191,4 +205,27 @@
blocks, getPermissionStatus(),
clientName, clientMachine, clientNode);
}
+
+ /**
+ * Get the last block of the file.
+ * Make sure it has the right type.
+ */
+ <T extends BlockInfo> T getLastBlock() throws IOException {
+ if (blocks == null || blocks.length == 0)
+ return null;
+ T returnBlock = null;
+ try {
+ @SuppressWarnings("unchecked") // ClassCastException is caught below
+ T tBlock = (T)blocks[blocks.length - 1];
+ returnBlock = tBlock;
+ } catch(ClassCastException cce) {
+ throw new IOException("Unexpected last block type: "
+ + blocks[blocks.length - 1].getClass().getSimpleName());
+ }
+ return returnBlock;
+ }
+
+ int numBlocks() {
+ return blocks == null ? 0 : blocks.length;
+ }
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Wed Sep 30 23:39:30 2009
@@ -21,16 +21,13 @@
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
class INodeFileUnderConstruction extends INodeFile {
- final String clientName; // lease holder
+ private String clientName; // lease holder
private final String clientMachine;
private final DatanodeDescriptor clientNode; // if client is a cluster node too.
-
- private int primaryNodeIndex = -1; //the node working on lease recovery
- private DatanodeDescriptor[] targets = null; //locations for last block
- private long lastRecoveryTime = 0;
INodeFileUnderConstruction(PermissionStatus permissions,
short replication,
@@ -67,6 +64,10 @@
return clientName;
}
+ void setClientName(String clientName) {
+ this.clientName = clientName;
+ }
+
String getClientMachine() {
return clientMachine;
}
@@ -83,15 +84,6 @@
return true;
}
- DatanodeDescriptor[] getTargets() {
- return targets;
- }
-
- void setTargets(DatanodeDescriptor[] targets) {
- this.targets = targets;
- this.primaryNodeIndex = -1;
- }
-
//
// converts a INodeFileUnderConstruction into a INodeFile
// use the modification time as the access time
@@ -108,10 +100,10 @@
}
/**
- * remove a block from the block list. This block should be
+ * Remove a block from the block list. This block should be
* the last one on the list.
*/
- void removeBlock(Block oldblock) throws IOException {
+ void removeLastBlock(Block oldblock) throws IOException {
if (blocks == null) {
throw new IOException("Trying to delete non-existant block " + oldblock);
}
@@ -124,57 +116,24 @@
BlockInfo[] newlist = new BlockInfo[size_1];
System.arraycopy(blocks, 0, newlist, 0, size_1);
blocks = newlist;
-
- // Remove the block locations for the last block.
- targets = null;
- }
-
- synchronized void setLastBlock(BlockInfo newblock, DatanodeDescriptor[] newtargets
- ) throws IOException {
- if (blocks == null) {
- throw new IOException("Trying to update non-existant block (newblock="
- + newblock + ")");
- }
- blocks[blocks.length - 1] = newblock;
- setTargets(newtargets);
- lastRecoveryTime = 0;
}
/**
- * Initialize lease recovery for this object
+ * Convert the last block of the file to an under-construction block.
+ * Set its locations.
*/
- void assignPrimaryDatanode() {
- //assign the first alive datanode as the primary datanode
-
- if (targets.length == 0) {
- NameNode.stateChangeLog.warn("BLOCK*"
- + " INodeFileUnderConstruction.initLeaseRecovery:"
- + " No blocks found, lease removed.");
- }
-
- int previous = primaryNodeIndex;
- //find an alive datanode beginning from previous
- for(int i = 1; i <= targets.length; i++) {
- int j = (previous + i)%targets.length;
- if (targets[j].isAlive) {
- DatanodeDescriptor primary = targets[primaryNodeIndex = j];
- primary.addBlockToBeRecovered(blocks[blocks.length - 1], targets);
- NameNode.stateChangeLog.info("BLOCK* " + blocks[blocks.length - 1]
- + " recovery started, primary=" + primary);
- return;
- }
- }
- }
-
- /**
- * Update lastRecoveryTime if expired.
- * @return true if lastRecoveryTimeis updated.
- */
- synchronized boolean setLastRecoveryTime(long now) {
- boolean expired = now - lastRecoveryTime > NameNode.LEASE_RECOVER_PERIOD;
- if (expired) {
- lastRecoveryTime = now;
- }
- return expired;
+ BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
+ DatanodeDescriptor[] targets)
+ throws IOException {
+ if (blocks == null || blocks.length == 0) {
+ throw new IOException("Trying to update non-existant block. " +
+ "File is empty.");
+ }
+ BlockInfoUnderConstruction ucBlock =
+ lastBlock.convertToBlockUnderConstruction(
+ BlockUCState.UNDER_CONSTRUCTION, targets);
+ ucBlock.setINode(this);
+ setBlock(numBlocks()-1, ucBlock);
+ return ucBlock;
}
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Wed Sep 30 23:39:30 2009
@@ -102,7 +102,7 @@
/**
* Adds (or re-adds) the lease for the specified file.
*/
- synchronized void addLease(String holder, String src) {
+ synchronized Lease addLease(String holder, String src) {
Lease lease = getLease(holder);
if (lease == null) {
lease = new Lease(holder);
@@ -113,6 +113,7 @@
}
sortedLeasesByPath.put(src, lease);
lease.paths.add(src);
+ return lease;
}
/**
@@ -143,11 +144,22 @@
}
/**
+ * Reassign lease for file src to the new holder.
+ */
+ synchronized Lease reassignLease(Lease lease, String src, String newHolder) {
+ assert newHolder != null : "new lease holder is null";
+ if (lease != null) {
+ removeLease(lease, src);
+ }
+ return addLease(newHolder, src);
+ }
+
+ /**
* Finds the pathname for the specified pendingFile
*/
synchronized String findPath(INodeFileUnderConstruction pendingFile
) throws IOException {
- Lease lease = getLease(pendingFile.clientName);
+ Lease lease = getLease(pendingFile.getClientName());
if (lease != null) {
String src = lease.findPath(pendingFile);
if (src != null) {
@@ -265,7 +277,11 @@
Collection<String> getPaths() {
return paths;
}
-
+
+ String getHolder() {
+ return holder;
+ }
+
void replacePath(String oldpath, String newpath) {
paths.remove(oldpath);
paths.add(newpath);
@@ -376,7 +392,13 @@
oldest.getPaths().toArray(leasePaths);
for(String p : leasePaths) {
try {
- fsnamesystem.internalReleaseLease(oldest, p);
+ if(fsnamesystem.internalReleaseLease(oldest, p, "HDFS_NameNode")) {
+ LOG.info("Lease recovery for file " + p +
+ " is complete. File closed.");
+ removing.add(p);
+ } else
+ LOG.info("Started block recovery for file " + p +
+ " lease " + oldest);
} catch (IOException e) {
LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
removing.add(p);
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Sep 30 23:39:30 2009
@@ -610,11 +610,12 @@
/**
*/
- public LocatedBlock addBlock(String src,
- String clientName) throws IOException {
+ public LocatedBlock addBlock(String src, String clientName,
+ Block previous) throws IOException {
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
+src+" for "+clientName);
- LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, clientName);
+ LocatedBlock locatedBlock =
+ namesystem.getAdditionalBlock(src, clientName, previous);
if (locatedBlock != null)
myMetrics.numAddBlockOps.inc();
return locatedBlock;
@@ -633,9 +634,11 @@
}
/** {@inheritDoc} */
- public boolean complete(String src, String clientName) throws IOException {
+ public boolean complete(String src, String clientName,
+ Block last) throws IOException {
stateChangeLog.debug("*DIR* NameNode.complete: " + src + " for " + clientName);
- CompleteFileStatus returnCode = namesystem.completeFile(src, clientName);
+ CompleteFileStatus returnCode =
+ namesystem.completeFile(src, clientName, last);
if (returnCode == CompleteFileStatus.STILL_WAITING) {
return false;
} else if (returnCode == CompleteFileStatus.COMPLETE_SUCCESS) {
@@ -664,10 +667,20 @@
}
/** {@inheritDoc} */
- public long nextGenerationStamp(Block block) throws IOException{
- return namesystem.nextGenerationStampForBlock(block);
+ @Override
+ public LocatedBlock updateBlockForPipeline(Block block, String clientName)
+ throws IOException {
+ return namesystem.updateBlockForPipeline(block, clientName);
}
+
+ @Override
+ public void updatePipeline(String clientName, Block oldBlock,
+ Block newBlock, DatanodeID[] newNodes)
+ throws IOException {
+ namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
+ }
+
/** {@inheritDoc} */
public void commitBlockSynchronization(Block block,
long newgenerationstamp, long newlength,
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.ArrayList;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * BlockRecoveryCommand is an instruction to a data-node to recover
+ * the specified blocks.
+ *
+ * The data-node that receives this command treats itself as a primary
+ * data-node in the recover process.
+ *
+ * Block recovery is identified by a recoveryId, which is also the new
+ * generation stamp, which the block will have after the recovery succeeds.
+ */
+public class BlockRecoveryCommand extends DatanodeCommand {
+ Collection<RecoveringBlock> recoveringBlocks;
+
+ /**
+ * This is a block with locations from which it should be recovered
+ * and the new generation stamp, which the block will have after
+ * successful recovery.
+ *
+ * The new generation stamp of the block, also plays role of the recovery id.
+ */
+ public static class RecoveringBlock extends LocatedBlock {
+ private long newGenerationStamp;
+
+ /**
+ * Create empty RecoveringBlock.
+ */
+ public RecoveringBlock() {
+ super();
+ newGenerationStamp = -1L;
+ }
+
+ /**
+ * Create RecoveringBlock.
+ */
+ public RecoveringBlock(Block b, DatanodeInfo[] locs, long newGS) {
+ super(b, locs, -1, false); // startOffset is unknown
+ this.newGenerationStamp = newGS;
+ }
+
+ /**
+ * Return the new generation stamp of the block,
+ * which also plays role of the recovery id.
+ */
+ public long getNewGenerationStamp() {
+ return newGenerationStamp;
+ }
+
+ ///////////////////////////////////////////
+ // Writable
+ ///////////////////////////////////////////
+ static { // register a ctor
+ WritableFactories.setFactory
+ (RecoveringBlock.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new RecoveringBlock(); }
+ });
+ }
+
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeLong(newGenerationStamp);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ newGenerationStamp = in.readLong();
+ }
+ }
+
+ /**
+ * Create empty BlockRecoveryCommand.
+ */
+ public BlockRecoveryCommand() {
+ this(0);
+ }
+
+ /**
+ * Create BlockRecoveryCommand with
+ * the specified capacity for recovering blocks.
+ */
+ public BlockRecoveryCommand(int capacity) {
+ super(DatanodeProtocol.DNA_RECOVERBLOCK);
+ recoveringBlocks = new ArrayList<RecoveringBlock>(capacity);
+ }
+
+ /**
+ * Return the list of recovering blocks.
+ */
+ public Collection<RecoveringBlock> getRecoveringBlocks() {
+ return recoveringBlocks;
+ }
+
+ /**
+ * Add recovering block to the command.
+ */
+ public void add(RecoveringBlock block) {
+ recoveringBlocks.add(block);
+ }
+
+ ///////////////////////////////////////////
+ // Writable
+ ///////////////////////////////////////////
+ static { // register a ctor
+ WritableFactories.setFactory
+ (BlockRecoveryCommand.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new BlockRecoveryCommand(); }
+ });
+ }
+
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ out.writeInt(recoveringBlocks.size());
+ for(RecoveringBlock block : recoveringBlocks) {
+ block.write(out);
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ int numBlocks = in.readInt();
+ recoveringBlocks = new ArrayList<RecoveringBlock>(numBlocks);
+ for(int i = 0; i < numBlocks; i++) {
+ RecoveringBlock b = new RecoveringBlock();
+ b.readFields(in);
+ add(b);
+ }
+ }
+}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Wed Sep 30 23:39:30 2009
@@ -35,10 +35,9 @@
**********************************************************************/
public interface DatanodeProtocol extends VersionedProtocol {
/**
- * 20: SendHeartbeat may return KeyUpdateCommand
- * Register returns access keys inside DatanodeRegistration object
+ * 23: nextGenerationStamp() removed.
*/
- public static final long versionID = 20L;
+ public static final long versionID = 23L;
// error code
final static int NOTIFY = 0;
@@ -143,12 +142,6 @@
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException;
/**
- * @return the next GenerationStamp to be associated with the specified
- * block.
- */
- public long nextGenerationStamp(Block block) throws IOException;
-
- /**
* Commit block synchronization in lease recovery
*/
public void commitBlockSynchronization(Block block,
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java Wed Sep 30 23:39:30 2009
@@ -23,6 +23,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.ipc.VersionedProtocol;
/** An inter-datanode protocol for updating generation stamp
@@ -31,17 +32,36 @@
public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class);
/**
- * 3: added a finalize parameter to updateBlock
+ * 4: initReplicaRecovery(), updateReplicaUnderRecovery() added.
*/
- public static final long versionID = 3L;
+ public static final long versionID = 4L;
/** @return the BlockMetaDataInfo of a block;
* null if the block is not found
*/
+ @Deprecated
BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
/**
* Update the block to the new generation stamp and length.
*/
- void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException;
+ @Deprecated
+ void updateBlock(Block oldblock, Block newblock, boolean finalize)
+ throws IOException;
+
+ /**
+ * Initialize a replica recovery.
+ *
+ * @return actual state of the replica on this data-node or
+ * null if data-node does not have the replica.
+ */
+ ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+ throws IOException;
+
+ /**
+ * Update replica with the new generation stamp and length.
+ */
+ Block updateReplicaUnderRecovery(Block oldBlock,
+ long recoveryId,
+ long newLength) throws IOException;
}
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * Replica recovery information.
+ */
+public class ReplicaRecoveryInfo extends Block {
+ private ReplicaState originalState;
+
+ public ReplicaRecoveryInfo() {
+ }
+
+ public ReplicaRecoveryInfo(Block r, ReplicaState rState) {
+ super(r);
+ originalState = rState;
+ }
+
+ public ReplicaState getOriginalReplicaState() {
+ return originalState;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ ///////////////////////////////////////////
+ // Writable
+ ///////////////////////////////////////////
+ static { // register a ctor
+ WritableFactories.setFactory
+ (ReplicaRecoveryInfo.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new ReplicaRecoveryInfo(); }
+ });
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ originalState = ReplicaState.read(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ originalState.write(out);
+ }
+}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Wed Sep 30 23:39:30 2009
@@ -96,7 +96,7 @@
class ImageLoaderCurrent implements ImageLoader {
protected final DateFormat dateFormat =
new SimpleDateFormat("yyyy-MM-dd HH:mm");
- private static int [] versions = {-16, -17, -18, -19};
+ private static int [] versions = {-16, -17, -18, -19, -20};
private int imageVersion = 0;
/* (non-Javadoc)
Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fi;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fi.FiTestUtil.ActionContainer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+
+/** Helper methods and actions for hflush() fault injection tests */
+public class FiHFlushTestUtil extends DataTransferTestUtil {
+
+ /** {@inheritDoc} */
+ public static PipelineTest initTest() {
+ return thepipelinetest = new HFlushTest();
+ }
+
+ /** Disk error action for fault injection tests */
+ public static class DerrAction extends DataTransferTestUtil.DataNodeAction {
+ /**
+ * @param currentTest The name of the test
+ * @param index The index of the datanode
+ */
+ public DerrAction(String currentTest, int index) {
+ super(currentTest, index);
+ }
+
+ /** {@inheritDoc} */
+ public void run(DatanodeID id) throws IOException {
+ final Pipeline p = getPipelineTest().getPipeline(id);
+ if (p == null) {
+ FiTestUtil.LOG.info("FI: couldn't find a pipeline for " + id);
+ return;
+ }
+ if (p.contains(index, id)) {
+ final String s = super.toString(id);
+ FiTestUtil.LOG.info(s);
+ throw new DiskErrorException(s);
+ }
+ }
+ }
+
+ /** Class adds new type of action */
+ public static class HFlushTest extends DataTransferTest {
+ public final ActionContainer<DatanodeID> fiCallHFlush =
+ new ActionContainer<DatanodeID>();
+ }
+}
\ No newline at end of file
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj Wed Sep 30 23:39:30 2009
@@ -22,6 +22,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream.DataStreamer;
import org.junit.Assert;
@@ -35,7 +36,7 @@
before(DataStreamer datastreamer) : callCreateBlockOutputStream(datastreamer) {
Assert.assertFalse(datastreamer.hasError);
- Assert.assertEquals(0, datastreamer.errorIndex);
+ Assert.assertEquals(-1, datastreamer.errorIndex);
}
pointcut pipelineInitNonAppend(DataStreamer datastreamer):
@@ -48,8 +49,9 @@
+ datastreamer.hasError + " errorIndex=" + datastreamer.errorIndex);
try {
if (datastreamer.hasError) {
- DataTransferTestUtil.getDataTransferTest().fiPipelineInitErrorNonAppend
- .run(datastreamer.errorIndex);
+ DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+ if (dtTest != null )
+ dtTest.fiPipelineInitErrorNonAppend.run(datastreamer.errorIndex);
}
} catch (IOException e) {
throw new RuntimeException(e);
@@ -66,19 +68,18 @@
+ " errorIndex=" + datastreamer.errorIndex);
}
- pointcut pipelineErrorAfterInit(boolean onError, boolean isAppend,
- DataStreamer datastreamer):
- call(* processDatanodeError(boolean, boolean))
- && args(onError, isAppend)
- && target(datastreamer)
- && if(onError && !isAppend);
+ pointcut pipelineErrorAfterInit(DataStreamer datastreamer):
+ call(* processDatanodeError())
+ && within (DFSClient.DFSOutputStream.DataStreamer)
+ && target(datastreamer);
- before(DataStreamer datastreamer) : pipelineErrorAfterInit(boolean, boolean, datastreamer) {
+ before(DataStreamer datastreamer) : pipelineErrorAfterInit(datastreamer) {
LOG.info("FI: before pipelineErrorAfterInit: errorIndex="
+ datastreamer.errorIndex);
try {
- DataTransferTestUtil.getDataTransferTest().fiPipelineErrorAfterInit
- .run(datastreamer.errorIndex);
+ DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+ if (dtTest != null )
+ dtTest.fiPipelineErrorAfterInit.run(datastreamer.errorIndex);
} catch (IOException e) {
throw new RuntimeException(e);
}
Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/HFlushAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/HFlushAspects.aj?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/HFlushAspects.aj (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/HFlushAspects.aj Wed Sep 30 23:39:30 2009
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.FiHFlushTestUtil.HFlushTest;
+import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+
+public aspect HFlushAspects {
+ public static final Log LOG = LogFactory.getLog(HFlushAspects.class);
+
+ pointcut hflushCall (DFSOutputStream outstream) :
+ execution(void DFSOutputStream.hflush(..))
+ && target (outstream);
+
+ /** This advise is suppose to initiate a call to the action (fiCallHFlush)
+ * which will throw DiskErrorException if a pipeline has been created
+ * and datanodes used are belong to that very pipeline
+ */
+ after (DFSOutputStream streamer) throws IOException : hflushCall(streamer) {
+ LOG.info("FI: hflush for any datanode");
+ LOG.info("FI: hflush " + thisJoinPoint.getThis());
+ DatanodeInfo[] nodes = streamer.getPipeline();
+ if (nodes == null) {
+ LOG.info("No pipeline is built");
+ return;
+ }
+ if (DataTransferTestUtil.getPipelineTest() == null) {
+ LOG.info("No test has been initialized");
+ return;
+ }
+ for (int i=0; i<nodes.length; i++) {
+ ((HFlushTest)DataTransferTestUtil.getPipelineTest()).fiCallHFlush.run(nodes[i]);
+ }
+ }
+}
Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/TestFiHFlush.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/TestFiHFlush.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/TestFiHFlush.java (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/TestFiHFlush.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fi.FiHFlushTestUtil;
+import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.FiHFlushTestUtil.DerrAction;
+import org.apache.hadoop.fi.FiHFlushTestUtil.HFlushTest;
+import org.eclipse.jdt.core.dom.ThisExpression;
+
+import java.io.IOException;
+
+/** Class provides basic fault injection tests according to the test plan
+ of HDFS-265
+ */
+public class TestFiHFlush {
+
+ /** Methods initializes a test and sets required actions to be used later by
+ * an injected advice
+ * @param conf mini cluster configuration
+ * @param methodName String representation of a test method invoking this
+ * method
+ * @param block_size needed size of file's block
+ * @param a is an action to be set for the set
+ * @throws IOException in case of any errors
+ */
+ private static void runDiskErrorTest (final Configuration conf,
+ final String methodName, final int block_size, DerrAction a)
+ throws IOException {
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final HFlushTest hft = (HFlushTest) FiHFlushTestUtil.initTest();
+ hft.fiCallHFlush.set(a);
+ TestHFlush.doTheJob(conf, methodName, block_size, (short)3);
+ }
+
+ /** The tests calls
+ * {@link #runDiskErrorTest(Configuration, String, int, DerrAction)}
+ * to make a number of writes within a block boundaries.
+ * Although hflush() is called the test shouldn't expect an IOException
+ * in this case because the invocation is happening after write() call
+ * is complete when pipeline doesn't exist anymore.
+ * Thus, injected fault won't be triggered for 0th datanode
+ */
+ @Test
+ public void hFlushFi01_a() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runDiskErrorTest(new Configuration(), methodName,
+ AppendTestUtil.BLOCK_SIZE, new DerrAction(methodName, 0));
+ }
+
+ /** The tests calls
+ * {@link #runDiskErrorTest(Configuration, String, int, DerrAction)}
+ * to make a number of writes across a block boundaries.
+ * hflush() is called after each write() during a pipeline life time.
+ * Thus, injected fault ought to be triggered for 0th datanode
+ */
+ @Test(expected = IOException.class)
+ public void hFlushFi01_b() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ Configuration conf = new Configuration();
+ int customPerChecksumSize = 512;
+ int customBlockSize = customPerChecksumSize * 3;
+ conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+ conf.setLong("dfs.block.size", customBlockSize);
+ runDiskErrorTest(conf, methodName,
+ customBlockSize, new DerrAction(methodName, 0));
+ }
+
+ /** Similar to {@link #hFlushFi01_b()} but writing happens
+ * across block and checksum's boundaries
+ */
+ @Test(expected = IOException.class)
+ public void hFlushFi01_c() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ Configuration conf = new Configuration();
+ int customPerChecksumSize = 400;
+ int customBlockSize = customPerChecksumSize * 3;
+ conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+ conf.setLong("dfs.block.size", customBlockSize);
+ runDiskErrorTest(conf, methodName,
+ customBlockSize, new DerrAction(methodName, 0));
+ }
+
+ /** Similar to {@link #hFlushFi01_a()} but for a pipeline's 1st datanode
+ */
+ @Test
+ public void hFlushFi02_a() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runDiskErrorTest(new Configuration(), methodName,
+ AppendTestUtil.BLOCK_SIZE, new DerrAction(methodName, 1));
+ }
+
+ /** Similar to {@link #hFlushFi01_b()} but for a pipeline's 1st datanode
+ */
+@Test(expected = IOException.class)
+ public void hFlushFi02_b() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ Configuration conf = new Configuration();
+ int customPerChecksumSize = 512;
+ int customBlockSize = customPerChecksumSize * 3;
+ conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+ conf.setLong("dfs.block.size", customBlockSize);
+ runDiskErrorTest(conf, methodName,
+ customBlockSize, new DerrAction(methodName, 1));
+ }
+
+ /** Similar to {@link #hFlushFi01_c()} but for a pipeline's 1st datanode
+ */
+ @Test(expected = IOException.class)
+ public void hFlushFi02_c() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ Configuration conf = new Configuration();
+ int customPerChecksumSize = 400;
+ int customBlockSize = customPerChecksumSize * 3;
+ conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+ conf.setLong("dfs.block.size", customBlockSize);
+ runDiskErrorTest(conf, methodName,
+ customBlockSize, new DerrAction(methodName, 1));
+ }
+
+ /** Similar to {@link #hFlushFi01_a()} but for a pipeline's 2nd datanode
+ */
+ @Test
+ public void hFlushFi03_a() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runDiskErrorTest(new Configuration(), methodName,
+ AppendTestUtil.BLOCK_SIZE, new DerrAction(methodName, 2));
+ }
+
+ /** Similar to {@link #hFlushFi01_b()} but for a pipeline's 2nd datanode
+ */
+ @Test(expected = IOException.class)
+ public void hFlushFi03_b() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ Configuration conf = new Configuration();
+ int customPerChecksumSize = 512;
+ int customBlockSize = customPerChecksumSize * 3;
+ conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+ conf.setLong("dfs.block.size", customBlockSize);
+ runDiskErrorTest(conf, methodName,
+ customBlockSize, new DerrAction(methodName, 2));
+ }
+
+ /** Similar to {@link #hFlushFi01_c()} but for a pipeline's 2nd datanode
+ */
+ @Test(expected = IOException.class)
+ public void hFlushFi03_c() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ Configuration conf = new Configuration();
+ int customPerChecksumSize = 400;
+ int customBlockSize = customPerChecksumSize * 3;
+ conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+ conf.setLong("dfs.block.size", customBlockSize);
+ runDiskErrorTest(conf, methodName,
+ customBlockSize, new DerrAction(methodName, 2));
+ }
+}
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj Wed Sep 30 23:39:30 2009
@@ -27,7 +27,7 @@
public static final Log LOG = LogFactory.getLog(ClientProtocolAspects.class);
pointcut addBlock():
- call(LocatedBlock ClientProtocol.addBlock(String, String));
+ call(LocatedBlock ClientProtocol.addBlock(String, String,..));
after() returning(LocatedBlock lb): addBlock() {
PipelineTest pipelineTest = DataTransferTestUtil.getPipelineTest();
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj Wed Sep 30 23:39:30 2009
@@ -60,8 +60,9 @@
final DataNode d = dataxceiver.getDataNode();
LOG.info("FI: statusRead " + status + ", datanode="
+ d.getDatanodeRegistration().getName());
- DataTransferTestUtil.getDataTransferTest().fiStatusRead.run(
- d.getDatanodeRegistration());
+ DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+ if (dtTest != null)
+ dtTest.fiStatusRead.run(d.getDatanodeRegistration());
}
pointcut receiverOpWriteBlock(DataXceiver dataxceiver):
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj Wed Sep 30 23:39:30 2009
@@ -35,11 +35,9 @@
// the following will inject faults inside of the method in question
execution (* FSDataset.getBlockFile(..)) && !within(FSDatasetAspects +);
- // the following will inject faults before the actual call of the method
- // call (* FSDataset.getBlockFile(..)) && !within(FSDatasetAspects +);
-
- pointcut callCreateBlockWriteStream() :
- call (BlockWriteStreams FSDataset.createBlockWriteStreams(..))
+ pointcut callCreateBlockWriteStream(ReplicaInPipeline repl) :
+ call (BlockWriteStreams createStreams())
+ && target (repl)
&& !within(FSDatasetAspects +);
// This aspect specifies the logic of our fault point.
@@ -54,7 +52,7 @@
}
}
- before() throws DiskOutOfSpaceException : callCreateBlockWriteStream() {
+ before(ReplicaInPipeline repl) throws DiskOutOfSpaceException : callCreateBlockWriteStream(repl) {
if (ProbabilityModel.injectCriteria(FSDataset.class.getSimpleName())) {
LOG.info("Before the injection point");
Thread.dumpStack();
Modified: hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml Wed Sep 30 23:39:30 2009
@@ -208,17 +208,22 @@
</Match>
<!--
- CreateBlockWriteStreams and getTmpInputStreams are pretty much like a stream constructor.
+ getTmpInputStreams is pretty much like a stream constructor.
The newly created streams are not supposed to be closed in the constructor. So ignore
the OBL warning.
-->
<Match>
<Class name="org.apache.hadoop.hdfs.server.datanode.FSDataset" />
- <Or>
- <Method name="createBlockWriteStreams" />
- <Method name="getTmpInputStreams" />
- </Or>
+ <Method name="getTmpInputStreams" />
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
</Match>
+ <!--
+ ResponseProccessor is thread that is designed to catch RuntimeException.
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer$ResponseProcessor" />
+ <Method name="run" />
+ <Bug pattern="REC_CATCH_EXCEPTION" />
+ </Match>
</FindBugsFilter>
Propchange: hadoop/hdfs/trunk/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 30 23:39:30 2009
@@ -1,2 +1,4 @@
/hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs:713112
/hadoop/core/trunk/src/test/hdfs:776175-785643
+/hadoop/hdfs/branches/HDFS-265/src/test/hdfs:796829-820463
+/hadoop/hdfs/branches/branch-0.21/src/test/hdfs:820487
Propchange: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java Wed Sep 30 23:39:30 2009
@@ -33,7 +33,7 @@
import org.apache.hadoop.security.UserGroupInformation;
/** Utilities for append-related tests */
-class AppendTestUtil {
+public class AppendTestUtil {
/** For specifying the random number generator seed,
* change the following value:
*/
@@ -84,8 +84,15 @@
LOG.info("ms=" + ms, e);
}
}
-
- static FileSystem createHdfsWithDifferentUsername(Configuration conf
+
+ /**
+ * Returns the reference to a new instance of FileSystem created
+ * with different user name
+ * @param conf current Configuration
+ * @return FileSystem instance
+ * @throws IOException
+ */
+ public static FileSystem createHdfsWithDifferentUsername(Configuration conf
) throws IOException {
Configuration conf2 = new Configuration(conf);
String username = UserGroupInformation.getCurrentUGI().getUserName()+"_XXX";
@@ -134,7 +141,7 @@
* Make sure to call close() on the returned stream
* @throws IOException an exception might be thrown
*/
- static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+ public static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
throws IOException {
return fileSys.create(name, true,
fileSys.getConf().getInt("io.file.buffer.size", 4096),
@@ -146,7 +153,7 @@
* the specified byte[] buffer's content
* @throws IOException an exception might be thrown
*/
- static void checkFullFile(FileSystem fs, Path name, int len,
+ public static void checkFullFile(FileSystem fs, Path name, int len,
final byte[] compareContent, String message) throws IOException {
FSDataInputStream stm = fs.open(name);
byte[] actual = new byte[len];
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Wed Sep 30 23:39:30 2009
@@ -110,7 +110,7 @@
/** create nFiles with random names and directory hierarchies
* with random (but reproducible) data in them.
*/
- void createFiles(FileSystem fs, String topdir,
+ public void createFiles(FileSystem fs, String topdir,
short replicationFactor) throws IOException {
files = new MyFile[nFiles];
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java Wed Sep 30 23:39:30 2009
@@ -21,6 +21,7 @@
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
+import java.util.Arrays;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
@@ -200,7 +201,7 @@
}
for (int i = 1; i <= replication; ++i) {
// inject blocks for dn_i into dn_i and replica in dn_i's neighbors
- mc.injectBlocks((i_dn + i- 1)% numDataNodes, blocks);
+ mc.injectBlocks((i_dn + i- 1)% numDataNodes, Arrays.asList(blocks));
System.out.println("Injecting blocks of dn " + i_dn + " into dn" +
((i_dn + i- 1)% numDataNodes));
}