You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2010/06/11 00:25:39 UTC
svn commit: r953482 [1/2] - in /hadoop/common/branches/branch-0.20-append:
./ src/hdfs/org/apache/hadoop/hdfs/
src/hdfs/org/apache/hadoop/hdfs/server/common/
src/hdfs/org/apache/hadoop/hdfs/server/datanode/
src/hdfs/org/apache/hadoop/hdfs/server/nameno...
Author: dhruba
Date: Thu Jun 10 22:25:39 2010
New Revision: 953482
URL: http://svn.apache.org/viewvc?rev=953482&view=rev
Log:
HDFS-142. Blocks that are being written by a client are stored in the
blocksBeingWritten directory.
(Dhruba Borthakur, Nicolas Spiegelberg, Todd Lipcon via dhruba)
Added:
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/namenode/FSImageAdapter.java
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/namenode/TestDFSConcurrentFileOperations.java
Modified:
hadoop/common/branches/branch-0.20-append/CHANGES.txt
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Thu Jun 10 22:25:39 2010
@@ -15,6 +15,10 @@ Release 0.20-append - Unreleased
HDFS-826. Allow a mechanism for an application to detect that
datanode(s) have died in the write pipeline. (dhruba)
+ HDFS-142. Blocks that are being written by a client are stored in the
+ blocksBeingWritten directory.
+ (Dhruba Borthakur, Nicolas Spiegelberg, Todd Lipcon via dhruba)
+
IMPROVEMENTS
BUG FIXES
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Thu Jun 10 22:25:39 2010
@@ -2803,16 +2803,24 @@ public class DFSClient implements FSCons
bytesPerChecksum);
}
- // setup pipeline to append to the last block XXX retries??
+ // setup pipeline to append to the last block
nodes = lastBlock.getLocations();
errorIndex = -1; // no errors yet.
if (nodes.length < 1) {
throw new IOException("Unable to retrieve blocks locations " +
" for last block " + block +
"of file " + src);
-
}
- processDatanodeError(true, true);
+ // keep trying to setup a pipeline until you know all DNs are dead
+ while (processDatanodeError(true, true)) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ if (lastException != null) {
+ throw lastException;
+ }
streamer.start();
}
else {
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Thu Jun 10 22:25:39 2010
@@ -51,5 +51,9 @@ public interface HdfsConstants {
public static int WRITE_TIMEOUT = 8 * 60 * 1000;
public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline
+
+ // The lease holder for recovery initiated by the NameNode
+ public static final String NN_RECOVERY_LEASEHOLDER = "NN_Recovery";
+
}
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Jun 10 22:25:39 2010
@@ -96,7 +96,8 @@ class BlockReceiver implements java.io.C
//
// Open local disk out
//
- streams = datanode.data.writeToBlock(block, isRecovery);
+ streams = datanode.data.writeToBlock(block, isRecovery,
+ clientName == null || clientName.length() == 0);
this.finalized = false;
if (streams != null) {
this.out = streams.dataOut;
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Jun 10 22:25:39 2010
@@ -1425,7 +1425,7 @@ public class DataNode extends Configured
+ "), datanode=" + dnRegistration.getName());
data.updateBlock(oldblock, newblock);
if (finalize) {
- data.finalizeBlock(newblock);
+ data.finalizeBlockIfNeeded(newblock);
myMetrics.blocksWritten.inc();
notifyNamenodeReceivedBlock(newblock, EMPTY_DEL_HINT);
LOG.info("Received block " + newblock +
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Jun 10 22:25:39 2010
@@ -43,6 +43,24 @@ import org.apache.hadoop.hdfs.server.pro
***************************************************/
public class FSDataset implements FSConstants, FSDatasetInterface {
+ /**
+ * A data structure than encapsulates a Block along with the full pathname
+ * of the block file
+ */
+ static class BlockAndFile implements Comparable<BlockAndFile> {
+ final Block block;
+ final File pathfile;
+
+ BlockAndFile(File fullpathname, Block block) {
+ this.pathfile = fullpathname;
+ this.block = block;
+ }
+
+ public int compareTo(BlockAndFile o)
+ {
+ return this.block.compareTo(o.block);
+ }
+ }
/**
* A node type that can be built into a tree reflecting the
@@ -193,6 +211,28 @@ public class FSDataset implements FSCons
}
}
+ /**
+ * Populate the given blockSet with any child blocks
+ * found at this node. With each block, return the full path
+ * of the block file.
+ */
+ void getBlockAndFileInfo(TreeSet<BlockAndFile> blockSet) {
+ if (children != null) {
+ for (int i = 0; i < children.length; i++) {
+ children[i].getBlockAndFileInfo(blockSet);
+ }
+ }
+
+ File blockFiles[] = dir.listFiles();
+ for (int i = 0; i < blockFiles.length; i++) {
+ if (Block.isBlockFilename(blockFiles[i])) {
+ long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
+ Block block = new Block(blockFiles[i], blockFiles[i].length(), genStamp);
+ blockSet.add(new BlockAndFile(blockFiles[i].getAbsoluteFile(), block));
+ }
+ }
+ }
+
void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap, FSVolume volume) {
if (children != null) {
for (int i = 0; i < children.length; i++) {
@@ -289,6 +329,7 @@ public class FSDataset implements FSCons
class FSVolume {
private FSDir dataDir;
private File tmpDir;
+ private File blocksBeingWritten; // clients write here
private File detachDir; // copy on write for blocks in snapshot
private DF usage;
private DU dfsUsage;
@@ -297,6 +338,7 @@ public class FSDataset implements FSCons
FSVolume(File currentDir, Configuration conf) throws IOException {
this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
+ this.dataDir = new FSDir(currentDir);
boolean supportAppends = conf.getBoolean("dfs.support.append", false);
File parent = currentDir.getParentFile();
@@ -305,20 +347,30 @@ public class FSDataset implements FSCons
recoverDetachedBlocks(currentDir, detachDir);
}
- // Files that were being written when the datanode was last shutdown
- // are now moved back to the data directory. It is possible that
- // in the future, we might want to do some sort of datanode-local
- // recovery for these blocks. For example, crc validation.
- //
+ // remove all blocks from "tmp" directory. These were either created
+ // by pre-append clients (0.18.x) or are part of replication request.
+ // They can be safely removed.
this.tmpDir = new File(parent, "tmp");
if (tmpDir.exists()) {
+ FileUtil.fullyDelete(tmpDir);
+ }
+
+ // Files that were being written when the datanode was last shutdown
+ // should not be deleted.
+ blocksBeingWritten = new File(parent, "blocksBeingWritten");
+ if (blocksBeingWritten.exists()) {
if (supportAppends) {
- recoverDetachedBlocks(currentDir, tmpDir);
+ recoverBlocksBeingWritten(blocksBeingWritten);
} else {
- FileUtil.fullyDelete(tmpDir);
+ FileUtil.fullyDelete(blocksBeingWritten);
+ }
+ }
+
+ if (!blocksBeingWritten.mkdirs()) {
+ if (!blocksBeingWritten.isDirectory()) {
+ throw new IOException("Mkdirs failed to create " + blocksBeingWritten.toString());
}
}
- this.dataDir = new FSDir(currentDir);
if (!tmpDir.mkdirs()) {
if (!tmpDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + tmpDir.toString());
@@ -371,8 +423,13 @@ public class FSDataset implements FSCons
* Temporary files. They get moved to the real block directory either when
* the block is finalized or the datanode restarts.
*/
- File createTmpFile(Block b) throws IOException {
- File f = new File(tmpDir, b.getBlockName());
+ File createTmpFile(Block b, boolean replicationRequest) throws IOException {
+ File f= null;
+ if (!replicationRequest) {
+ f = new File(blocksBeingWritten, b.getBlockName());
+ } else {
+ f = new File(tmpDir, b.getBlockName());
+ }
return createTmpFile(b, f);
}
@@ -404,6 +461,7 @@ public class FSDataset implements FSCons
try {
fileCreated = f.createNewFile();
} catch (IOException ioe) {
+ DataNode.LOG.warn("createTmpFile failed for file " + f + " Block " + b);
throw (IOException)new IOException(DISK_ERROR +f).initCause(ioe);
}
if (!fileCreated) {
@@ -423,12 +481,13 @@ public class FSDataset implements FSCons
void checkDirs() throws DiskErrorException {
dataDir.checkDirTree();
DiskChecker.checkDir(tmpDir);
+ DiskChecker.checkDir(blocksBeingWritten);
}
void getBlockInfo(TreeSet<Block> blockSet) {
dataDir.getBlockInfo(blockSet);
}
-
+
void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
dataDir.getVolumeMap(volumeMap, this);
}
@@ -442,6 +501,29 @@ public class FSDataset implements FSCons
}
/**
+ * Recover blocks that were being written when the datanode
+ * was earlier shut down. These blocks get re-inserted into
+ * ongoingCreates. Also, send a blockreceived message to the NN
+ * for each of these blocks because these are not part of a
+ * block report.
+ */
+ private void recoverBlocksBeingWritten(File bbw) throws IOException {
+ FSDir fsd = new FSDir(bbw);
+ TreeSet<BlockAndFile> blockSet = new TreeSet<BlockAndFile>();
+ fsd.getBlockAndFileInfo(blockSet);
+ for (BlockAndFile b : blockSet) {
+ File f = b.pathfile; // full path name of block file
+ volumeMap.put(b.block, new DatanodeBlockInfo(this, f));
+ ongoingCreates.put(b.block, new ActiveFile(f));
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("recoverBlocksBeingWritten for block " + b.block);
+ }
+ DataNode.getDataNode().notifyNamenodeReceivedBlock(b.block,
+ DataNode.EMPTY_DEL_HINT);
+ }
+ }
+
+ /**
* Recover detached files on datanode restart. If a detached block
* does not exist in the original directory, then it is moved to the
* original directory.
@@ -572,6 +654,11 @@ public class FSDataset implements FSCons
}
threads.add(Thread.currentThread());
}
+
+ // no active threads associated with this ActiveFile
+ ActiveFile(File f) {
+ file = f;
+ }
public String toString() {
return getClass().getSimpleName() + "(file=" + file
@@ -592,7 +679,7 @@ public class FSDataset implements FSCons
}
/** Find the corresponding meta data file from a given block file */
- private static File findMetaFile(final File blockFile) throws IOException {
+ static File findMetaFile(final File blockFile) throws IOException {
final String prefix = blockFile.getName() + "_";
final File parent = blockFile.getParentFile();
File[] matches = parent.listFiles(new FilenameFilter() {
@@ -676,7 +763,7 @@ public class FSDataset implements FSCons
FSVolumeSet volumes;
private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
private int maxBlocksPerDir = 0;
- private HashMap<Block,DatanodeBlockInfo> volumeMap = null;
+ HashMap<Block,DatanodeBlockInfo> volumeMap = new HashMap<Block, DatanodeBlockInfo>();;
static Random random = new Random();
/**
@@ -689,7 +776,6 @@ public class FSDataset implements FSCons
volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
}
volumes = new FSVolumeSet(volArray);
- volumeMap = new HashMap<Block, DatanodeBlockInfo>();
volumes.getVolumeMap(volumeMap);
registerMBean(storage.getStorageID());
}
@@ -761,7 +847,10 @@ public class FSDataset implements FSCons
throw new IOException("Block " + b + " does not exist in volumeMap.");
}
FSVolume v = info.getVolume();
- File blockFile = v.getTmpFile(b);
+ File blockFile = info.getFile();
+ if (blockFile == null) {
+ blockFile = v.getTmpFile(b);
+ }
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
if (blkOffset > 0) {
blockInFile.seek(blkOffset);
@@ -813,6 +902,22 @@ public class FSDataset implements FSCons
throw new IOException("Cannot update oldblock (=" + oldblock
+ ") to newblock (=" + newblock + ").");
}
+
+
+ // Protect against a straggler updateblock call moving a block backwards
+ // in time.
+ boolean isValidUpdate =
+ (newblock.getGenerationStamp() > oldblock.getGenerationStamp()) ||
+ (newblock.getGenerationStamp() == oldblock.getGenerationStamp() &&
+ newblock.getNumBytes() == oldblock.getNumBytes());
+
+ if (!isValidUpdate) {
+ throw new IOException(
+ "Cannot update oldblock=" + oldblock +
+ " to newblock=" + newblock + " since generation stamps must " +
+ "increase, or else length must not change.");
+ }
+
for(;;) {
final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
@@ -829,6 +934,7 @@ public class FSDataset implements FSCons
t.join();
} catch (InterruptedException e) {
DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
+ break; // retry with new threadlist from the beginning
}
}
}
@@ -907,13 +1013,13 @@ public class FSDataset implements FSCons
return null;
}
- static private void truncateBlock(File blockFile, File metaFile,
+ static void truncateBlock(File blockFile, File metaFile,
long oldlen, long newlen) throws IOException {
if (newlen == oldlen) {
return;
}
if (newlen > oldlen) {
- throw new IOException("Cannout truncate block to from oldlen (=" + oldlen
+ throw new IOException("Cannot truncate block to from oldlen (=" + oldlen
+ ") to newlen (=" + newlen + ")");
}
@@ -973,8 +1079,11 @@ public class FSDataset implements FSCons
volumeMap.put(b, v);
volumeMap.put(b, v);
* other threads that might be writing to this block, and then reopen the file.
+ * If replicationRequest is true, then this operation is part of a block
+ * replication request.
*/
- public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException {
+ public BlockWriteStreams writeToBlock(Block b, boolean isRecovery,
+ boolean replicationRequest) throws IOException {
//
// Make sure the block isn't a valid one - we're still creating it!
//
@@ -1019,8 +1128,7 @@ public class FSDataset implements FSCons
if (!isRecovery) {
v = volumes.getNextVolume(blockSize);
// create temporary file to hold block in the designated volume
- f = createTmpFile(v, b);
- volumeMap.put(b, new DatanodeBlockInfo(v, f));
+ f = createTmpFile(v, b, replicationRequest);
} else if (f != null) {
DataNode.LOG.info("Reopen already-open Block for append " + b);
// create or reuse temporary file to hold block in the designated volume
@@ -1030,7 +1138,7 @@ public class FSDataset implements FSCons
// reopening block for appending to it.
DataNode.LOG.info("Reopen Block for append " + b);
v = volumeMap.get(b).getVolume();
- f = createTmpFile(v, b);
+ f = createTmpFile(v, b, replicationRequest);
File blkfile = getBlockFile(b);
File oldmeta = getMetaFile(b);
File newmeta = getMetaFile(f, b);
@@ -1056,7 +1164,6 @@ public class FSDataset implements FSCons
" to tmp dir " + f);
}
}
- volumeMap.put(b, new DatanodeBlockInfo(v, f));
}
if (f == null) {
DataNode.LOG.warn("Block " + b + " reopen failed " +
@@ -1064,6 +1171,14 @@ public class FSDataset implements FSCons
throw new IOException("Block " + b + " reopen failed " +
" Unable to locate tmp file.");
}
+ // If this is a replication request, then this is not a permanent
+ // block yet, it could get removed if the datanode restarts. If this
+ // is a write or append request, then it is a valid block.
+ if (replicationRequest) {
+ volumeMap.put(b, new DatanodeBlockInfo(v));
+ } else {
+ volumeMap.put(b, new DatanodeBlockInfo(v, f));
+ }
ongoingCreates.put(b, new ActiveFile(f, threads));
}
@@ -1105,32 +1220,29 @@ public class FSDataset implements FSCons
public void setChannelPosition(Block b, BlockWriteStreams streams,
long dataOffset, long ckOffset)
throws IOException {
- long size = 0;
- synchronized (this) {
- FSVolume vol = volumeMap.get(b).getVolume();
- size = vol.getTmpFile(b).length();
- }
- if (size < dataOffset) {
+ FileOutputStream file = (FileOutputStream) streams.dataOut;
+ if (file.getChannel().size() < dataOffset) {
String msg = "Trying to change block file offset of block " + b +
+ " file " + volumeMap.get(b).getVolume().getTmpFile(b) +
" to " + dataOffset +
" but actual size of file is " +
- size;
+ file.getChannel().size();
throw new IOException(msg);
}
- FileOutputStream file = (FileOutputStream) streams.dataOut;
file.getChannel().position(dataOffset);
file = (FileOutputStream) streams.checksumOut;
file.getChannel().position(ckOffset);
}
- synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {
+ synchronized File createTmpFile( FSVolume vol, Block blk,
+ boolean replicationRequest) throws IOException {
if ( vol == null ) {
vol = volumeMap.get( blk ).getVolume();
if ( vol == null ) {
throw new IOException("Could not find volume for block " + blk);
}
}
- return vol.createTmpFile(blk);
+ return vol.createTmpFile(blk, replicationRequest);
}
//
@@ -1141,13 +1253,29 @@ public class FSDataset implements FSCons
// we can GC it safely.
//
+
+ @Override
+ public void finalizeBlock(Block b) throws IOException {
+ finalizeBlockInternal(b, false);
+ }
+
+ @Override
+ public void finalizeBlockIfNeeded(Block b) throws IOException {
+ finalizeBlockInternal(b, true);
+ }
+
/**
* Complete the block write!
*/
- public synchronized void finalizeBlock(Block b) throws IOException {
+ private synchronized void finalizeBlockInternal(Block b, boolean reFinalizeOk)
+ throws IOException {
ActiveFile activeFile = ongoingCreates.get(b);
if (activeFile == null) {
- throw new IOException("Block " + b + " is already finalized.");
+ if (reFinalizeOk) {
+ return;
+ } else {
+ throw new IOException("Block " + b + " is already finalized.");
+ }
}
File f = activeFile.file;
if (f == null || !f.exists()) {
@@ -1166,6 +1294,28 @@ public class FSDataset implements FSCons
}
/**
+ * is this block finalized? Returns true if the block is already
+ * finalized, otherwise returns false.
+ */
+ private synchronized boolean isFinalized(Block b) {
+ FSVolume v = volumeMap.get(b).getVolume();
+ if (v == null) {
+ DataNode.LOG.warn("No volume for block " + b);
+ return false; // block is not finalized
+ }
+ ActiveFile activeFile = ongoingCreates.get(b);
+ if (activeFile == null) {
+ return true; // block is already finalized
+ }
+ File f = activeFile.file;
+ if (f == null || !f.exists()) {
+ // we shud never get into this position.
+ DataNode.LOG.warn("No temporary file " + f + " for block " + b);
+ }
+ return false; // block is not finalized
+ }
+
+ /**
* Remove the temporary block file (if any)
*/
public synchronized void unfinalizeBlock(Block b) throws IOException {
@@ -1226,7 +1376,8 @@ public class FSDataset implements FSCons
* Check whether the given block is a valid one.
*/
public boolean isValidBlock(Block b) {
- return validateBlockFile(b) != null;
+ File f = validateBlockFile(b);
+ return ((f != null) ? isFinalized(b) : false);
}
/**
@@ -1244,7 +1395,7 @@ public class FSDataset implements FSCons
}
/** {@inheritDoc} */
- public void validateBlockMetadata(Block b) throws IOException {
+ public synchronized void validateBlockMetadata(Block b) throws IOException {
DatanodeBlockInfo info = volumeMap.get(b);
if (info == null) {
throw new IOException("Block " + b + " does not exist in volumeMap.");
@@ -1289,6 +1440,30 @@ public class FSDataset implements FSCons
" does not match meta file stamp " +
stamp);
}
+ // verify that checksum file has an integral number of checkum values.
+ DataChecksum dcs = BlockMetadataHeader.readHeader(meta).getChecksum();
+ int checksumsize = dcs.getChecksumSize();
+ long actual = meta.length() - BlockMetadataHeader.getHeaderSize();
+ long numChunksInMeta = actual/checksumsize;
+ if (actual % checksumsize != 0) {
+ throw new IOException("Block " + b +
+ " has a checksum file of size " + meta.length() +
+ " but it does not align with checksum size of " +
+ checksumsize);
+ }
+ int bpc = dcs.getBytesPerChecksum();
+ long minDataSize = (numChunksInMeta - 1) * bpc;
+ long maxDataSize = numChunksInMeta * bpc;
+ if (f.length() > maxDataSize || f.length() <= minDataSize) {
+ throw new IOException("Block " + b +
+ " is of size " + f.length() +
+ " but has " + (numChunksInMeta + 1) +
+ " checksums and each checksum size is " +
+ checksumsize + " bytes.");
+ }
+ // We could crc-check the entire block here, but it will be a costly
+ // operation. Instead we rely on the above check (file length mismatch)
+ // to detect corrupt blocks.
}
/**
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Thu Jun 10 22:25:39 2010
@@ -169,12 +169,14 @@ public interface FSDatasetInterface exte
/**
* Creates the block and returns output streams to write data and CRC
* @param b
- * @param isRecovery True if this is part of erro recovery, otherwise false
+ * @param isRecovery True if this is part of error recovery, otherwise false
+ * @param isReplicationRequest True if this is part of block replication request
* @return a BlockWriteStreams object to allow writing the block data
* and CRC
* @throws IOException
*/
- public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException;
+ public BlockWriteStreams writeToBlock(Block b, boolean isRecovery,
+ boolean isReplicationRequest) throws IOException;
/**
* Update the block to the new generation stamp and length.
@@ -191,6 +193,14 @@ public interface FSDatasetInterface exte
public void finalizeBlock(Block b) throws IOException;
/**
+ * Finalizes the block previously opened for writing using writeToBlock
+ * if not already finalized
+ * @param b
+ * @throws IOException
+ */
+ public void finalizeBlockIfNeeded(Block b) throws IOException;
+
+ /**
* Unfinalizes the block previously opened for writing using writeToBlock.
* The temporary file associated with this block is deleted.
* @param b
@@ -200,6 +210,7 @@ public interface FSDatasetInterface exte
/**
* Returns the block report - the full list of blocks stored
+ * Returns only finalized blocks
* @return - the block report - the full list of blocks stored
*/
public Block[] getBlockReport();
@@ -228,7 +239,7 @@ public interface FSDatasetInterface exte
* Stringifies the name of the storage
*/
public String toString();
-
+
/**
* Shutdown the FSDataset
*/
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java Thu Jun 10 22:25:39 2010
@@ -367,6 +367,11 @@ class BlocksMap {
return map.get(b);
}
+ /** Return the block object without matching against generation stamp. */
+ BlockInfo getStoredBlockWithoutMatchingGS(Block b) {
+ return map.get(new Block(b.getBlockId()));
+ }
+
/** Returned Iterator does not support. */
Iterator<DatanodeDescriptor> nodeIterator(Block b) {
return new NodeIterator(map.get(b));
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Thu Jun 10 22:25:39 2010
@@ -421,8 +421,13 @@ public class DatanodeDescriptor extends
// collect blocks that have not been reported
// all of them are next to the delimiter
Iterator<Block> it = new BlockIterator(delimiter.getNext(0), this);
- while(it.hasNext())
- toRemove.add(it.next());
+ while(it.hasNext()) {
+ BlockInfo storedBlock = (BlockInfo)it.next();
+ INodeFile file = storedBlock.getINode();
+ if (file == null || !file.isUnderConstruction()) {
+ toRemove.add(storedBlock);
+ }
+ }
this.removeBlock(delimiter);
}
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Jun 10 22:25:39 2010
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.*;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -236,6 +237,8 @@ public class FSNamesystem implements FSC
private int minReplication;
// Default replication
private int defaultReplication;
+ // Variable to stall new replication checks for testing purposes
+ private boolean stallReplicationWork = false;
// heartbeatRecheckInterval is how often namenode checks for expired datanodes
private long heartbeatRecheckInterval;
// heartbeatExpireInterval is how long namenode waits for datanode to report
@@ -1072,7 +1075,8 @@ public class FSNamesystem implements FSC
// period, then start lease recovery.
//
if (lease.expiredSoftLimit()) {
- LOG.info("startFile: recover lease " + lease + ", src=" + src);
+ LOG.info("startFile: recover lease " + lease + ", src=" + src +
+ " from client " + pendingFile.clientName);
internalReleaseLease(lease, src);
}
throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
@@ -1174,7 +1178,9 @@ public class FSNamesystem implements FSC
//
LocatedBlock lb = null;
synchronized (this) {
- INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src);
+ // Need to re-check existence here, since the file may have been deleted
+ // in between the synchronized blocks
+ INodeFileUnderConstruction file = checkLease(src, holder);
Block[] blocks = file.getBlocks();
if (blocks != null && blocks.length > 0) {
@@ -1203,9 +1209,15 @@ public class FSNamesystem implements FSC
// remove this block from the list of pending blocks to be deleted.
// This reduces the possibility of triggering HADOOP-1349.
//
- for(Collection<Block> v : recentInvalidateSets.values()) {
+ for (Iterator<Collection<Block>> iter = recentInvalidateSets.values().iterator();
+ iter.hasNext();
+ ) {
+ Collection<Block> v = iter.next();
if (v.remove(last)) {
pendingDeletionBlocksCount--;
+ if (v.isEmpty()) {
+ iter.remove();
+ }
}
}
}
@@ -1887,7 +1899,7 @@ public class FSNamesystem implements FSC
if (pendingFile.getBlocks().length == 0) {
finalizeINodeFileUnderConstruction(src, pendingFile);
NameNode.stateChangeLog.warn("BLOCK*"
- + " internalReleaseLease: No blocks found, lease removed.");
+ + " internalReleaseLease: No blocks found, lease removed for " + src);
return;
}
// setup the Inode.targets for the last block from the blocksMap
@@ -1904,11 +1916,24 @@ public class FSNamesystem implements FSC
}
// start lease recovery of the last block for this file.
pendingFile.assignPrimaryDatanode();
- leaseManager.renewLease(lease);
+ Lease reassignedLease = reassignLease(
+ lease, src, HdfsConstants.NN_RECOVERY_LEASEHOLDER, pendingFile);
+ leaseManager.renewLease(reassignedLease);
+ }
+
+ private Lease reassignLease(Lease lease, String src, String newHolder,
+ INodeFileUnderConstruction pendingFile) {
+ if(newHolder == null)
+ return lease;
+ pendingFile.setClientName(newHolder);
+ return leaseManager.reassignLease(lease, src, newHolder);
}
+
private void finalizeINodeFileUnderConstruction(String src,
INodeFileUnderConstruction pendingFile) throws IOException {
+ NameNode.stateChangeLog.info("Removing lease on file " + src +
+ " from client " + pendingFile.clientName);
leaseManager.removeLease(pendingFile.clientName, src);
// The file is no longer pending.
@@ -1968,13 +1993,22 @@ public class FSNamesystem implements FSC
// There should be no locations in the blocksMap till now because the
// file is underConstruction
DatanodeDescriptor[] descriptors = null;
- if (newtargets.length > 0) {
- descriptors = new DatanodeDescriptor[newtargets.length];
- for(int i = 0; i < newtargets.length; i++) {
- descriptors[i] = getDatanode(newtargets[i]);
- descriptors[i].addBlock(newblockinfo);
+ List<DatanodeDescriptor> descriptorsList =
+ new ArrayList<DatanodeDescriptor>(newtargets.length);
+ for(int i = 0; i < newtargets.length; i++) {
+ DatanodeDescriptor node =
+ datanodeMap.get(newtargets[i].getStorageID());
+ if (node != null) {
+ node.addBlock(newblockinfo);
+ descriptorsList.add(node);
+ } else {
+ LOG.warn("commitBlockSynchronization included a target DN " +
+ newtargets[i] + " which is not known to DN. Ignoring.");
}
}
+ if (!descriptorsList.isEmpty()) {
+ descriptors = descriptorsList.toArray(new DatanodeDescriptor[0]);
+ }
// add locations into the INodeUnderConstruction
pendingFile.setLastBlock(newblockinfo, descriptors);
}
@@ -2437,6 +2471,11 @@ public class FSNamesystem implements FSC
*/
private int computeReplicationWork(
int blocksToProcess) throws IOException {
+ // stall only useful for unit tests (see TestFileAppend4.java)
+ if (stallReplicationWork) {
+ return 0;
+ }
+
// Choose the blocks to be replicated
List<List<Block>> blocksToReplicate =
chooseUnderReplicatedBlocks(blocksToProcess);
@@ -2559,7 +2598,7 @@ public class FSNamesystem implements FSC
}
}
- // choose replication targets: NOT HODING THE GLOBAL LOCK
+ // choose replication targets: NOT HOLDING THE GLOBAL LOCK
DatanodeDescriptor targets[] = replicator.chooseTarget(
requiredReplication - numEffectiveReplicas,
srcNode, containingNodes, null, block.getNumBytes());
@@ -2970,12 +3009,14 @@ public class FSNamesystem implements FSC
DatanodeDescriptor delNodeHint) {
BlockInfo storedBlock = blocksMap.getStoredBlock(block);
if (storedBlock == null) {
- // if the block with a WILDCARD generation stamp matches and the
- // corresponding file is under construction, then accept this block.
- // This block has a diferent generation stamp on the datanode
- // because of a lease-recovery-attempt.
- Block nblk = new Block(block.getBlockId());
- storedBlock = blocksMap.getStoredBlock(nblk);
+ // If we have a block in the block map with the same ID, but a different
+ // generation stamp, and the corresponding file is under construction,
+ // then we need to do some special processing.
+ storedBlock = blocksMap.getStoredBlockWithoutMatchingGS(block);
+
+ // If the block ID is valid, and it either (a) belongs to a file under
+ // construction, or (b) the reported genstamp is higher than what we
+ // know about, then we accept the block.
if (storedBlock != null && storedBlock.getINode() != null &&
(storedBlock.getGenerationStamp() <= block.getGenerationStamp() ||
storedBlock.getINode().isUnderConstruction())) {
@@ -2994,9 +3035,8 @@ public class FSNamesystem implements FSC
+ block + " on " + node.getName()
+ " size " + block.getNumBytes()
+ " But it does not belong to any file.");
- // we could add this block to invalidate set of this datanode.
- // it will happen in next block report otherwise.
- return block;
+ addToInvalidates(block, node);
+ return block;
}
// add block to the data-node
@@ -3116,6 +3156,18 @@ public class FSNamesystem implements FSC
INodeFile fileINode = null;
fileINode = storedBlock.getINode();
if (fileINode.isUnderConstruction()) {
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction) fileINode;
+ Block[] blocks = fileINode.getBlocks();
+ // If this is the last block of this
+ // file, then set targets. This enables lease recovery to occur.
+ // This is especially important after a restart of the NN.
+ Block last = blocks[blocks.length-1];
+ if (last.equals(storedBlock)) {
+ Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
+ for (int i = 0; it != null && it.hasNext(); i++) {
+ cons.addTarget(it.next());
+ }
+ }
return block;
}
@@ -3721,6 +3773,9 @@ public class FSNamesystem implements FSC
short getMaxReplication() { return (short)maxReplication; }
short getMinReplication() { return (short)minReplication; }
short getDefaultReplication() { return (short)defaultReplication; }
+
+ public void stallReplicationWork() { stallReplicationWork = true; }
+ public void restartReplicationWork() { stallReplicationWork = false; }
/**
* A immutable object that stores the number of live replicas and
@@ -4750,7 +4805,7 @@ public class FSNamesystem implements FSC
throw new IOException(msg);
}
if (!((INodeFileUnderConstruction)fileINode).setLastRecoveryTime(now())) {
- String msg = block + " is beening recovered, ignoring this request.";
+ String msg = block + " is already being recovered, ignoring this request.";
LOG.info(msg);
throw new IOException(msg);
}
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Thu Jun 10 22:25:39 2010
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -25,7 +27,7 @@ import org.apache.hadoop.hdfs.server.nam
class INodeFileUnderConstruction extends INodeFile {
- final String clientName; // lease holder
+ String clientName; // lease holder
private final String clientMachine;
private final DatanodeDescriptor clientNode; // if client is a cluster node too.
@@ -68,6 +70,10 @@ class INodeFileUnderConstruction extends
return clientName;
}
+ void setClientName(String newName) {
+ clientName = newName;
+ }
+
String getClientMachine() {
return clientMachine;
}
@@ -93,6 +99,30 @@ class INodeFileUnderConstruction extends
this.primaryNodeIndex = -1;
}
+ /**
+ * add this target if it does not already exists
+ */
+ void addTarget(DatanodeDescriptor node) {
+ if (this.targets == null) {
+ this.targets = new DatanodeDescriptor[0];
+ }
+
+ for (int j = 0; j < this.targets.length; j++) {
+ if (this.targets[j].equals(node)) {
+ return; // target already exists
+ }
+ }
+
+ // allocate new data structure to store additional target
+ DatanodeDescriptor[] newt = new DatanodeDescriptor[targets.length + 1];
+ for (int i = 0; i < targets.length; i++) {
+ newt[i] = this.targets[i];
+ }
+ newt[targets.length] = node;
+ this.targets = newt;
+ this.primaryNodeIndex = -1;
+ }
+
//
// converts a INodeFileUnderConstruction into a INodeFile
// use the modification time as the access time
@@ -132,10 +162,29 @@ class INodeFileUnderConstruction extends
synchronized void setLastBlock(BlockInfo newblock, DatanodeDescriptor[] newtargets
) throws IOException {
- if (blocks == null) {
+ if (blocks == null || blocks.length == 0) {
throw new IOException("Trying to update non-existant block (newblock="
+ newblock + ")");
}
+ BlockInfo oldLast = blocks[blocks.length - 1];
+ if (oldLast.getBlockId() != newblock.getBlockId()) {
+ // This should not happen - this means that we're performing recovery
+ // on an internal block in the file!
+ NameNode.stateChangeLog.error(
+ "Trying to commit block synchronization for an internal block on"
+ + " inode=" + this
+ + " newblock=" + newblock + " oldLast=" + oldLast);
+ throw new IOException("Trying to update an internal block of " +
+ "pending file " + this);
+ }
+
+ if (oldLast.getGenerationStamp() > newblock.getGenerationStamp()) {
+ NameNode.stateChangeLog.warn(
+ "Updating last block " + oldLast + " of inode " +
+ "under construction " + this + " with a block that " +
+ "has an older generation stamp: " + newblock);
+ }
+
blocks[blocks.length - 1] = newblock;
setTargets(newtargets);
lastRecoveryTime = 0;
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Thu Jun 10 22:25:39 2010
@@ -102,7 +102,7 @@ public class LeaseManager {
/**
* Adds (or re-adds) the lease for the specified file.
*/
- synchronized void addLease(String holder, String src) {
+ synchronized Lease addLease(String holder, String src) {
Lease lease = getLease(holder);
if (lease == null) {
lease = new Lease(holder);
@@ -113,6 +113,7 @@ public class LeaseManager {
}
sortedLeasesByPath.put(src, lease);
lease.paths.add(src);
+ return lease;
}
/**
@@ -133,6 +134,17 @@ public class LeaseManager {
}
/**
+ * Reassign lease for file src to the new holder.
+ */
+ synchronized Lease reassignLease(Lease lease, String src, String newHolder) {
+ assert newHolder != null : "new lease holder is null";
+ if (lease != null) {
+ removeLease(lease, src);
+ }
+ return addLease(newHolder, src);
+ }
+
+ /**
* Remove the lease for the specified holder and src
*/
synchronized void removeLease(String holder, String src) {
Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java Thu Jun 10 22:25:39 2010
@@ -267,4 +267,15 @@ public class DFSTestUtil extends TestCas
IOUtils.copyBytes(conn.getInputStream(), out, 4096, true);
return out.toString();
}
+
+
+ public static byte[] generateSequentialBytes(int start, int length) {
+ byte[] result = new byte[length];
+
+ for (int i = 0; i < length; i++) {
+ result[i] = (byte)((start + i) % 127);
+ }
+
+ return result;
+ }
}
Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Thu Jun 10 22:25:39 2010
@@ -31,6 +31,7 @@ import javax.security.auth.login.LoginEx
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.*;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -725,17 +726,56 @@ public class MiniDFSCluster {
getNameNodePort());
DFSClient client = new DFSClient(addr, conf);
- // make sure all datanodes are alive
- while(client.datanodeReport(DatanodeReportType.LIVE).length
- != numDataNodes) {
+ // make sure all datanodes are alive and sent heartbeat
+ while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE))) {
try {
- Thread.sleep(500);
- } catch (Exception e) {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
}
}
client.close();
}
+
+ private synchronized boolean shouldWait(DatanodeInfo[] dnInfo) {
+ if (dnInfo.length != numDataNodes) {
+ return true;
+ }
+ // make sure all datanodes have sent first heartbeat to namenode,
+ // using (capacity == 0) as proxy.
+ for (DatanodeInfo dn : dnInfo) {
+ if (dn.getCapacity() == 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Wait for the given datanode to heartbeat once.
+ */
+ public void waitForDNHeartbeat(int dnIndex, long timeoutMillis)
+ throws IOException, InterruptedException {
+ DataNode dn = getDataNodes().get(dnIndex);
+ InetSocketAddress addr = new InetSocketAddress("localhost",
+ getNameNodePort());
+ DFSClient client = new DFSClient(addr, conf);
+
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() < startTime + timeoutMillis) {
+ DatanodeInfo report[] = client.datanodeReport(DatanodeReportType.LIVE);
+
+ for (DatanodeInfo thisReport : report) {
+ if (thisReport.getStorageID().equals(
+ dn.dnRegistration.getStorageID())) {
+ if (thisReport.getLastUpdate() > startTime)
+ return;
+ }
+ }
+
+ Thread.sleep(500);
+ }
+ }
public void formatDataNodeDirs() throws IOException {
base_dir = new File(System.getProperty("test.build.data", "build/test/data"), "dfs/");
Added: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=953482&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java (added)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java Thu Jun 10 22:25:39 2010
@@ -0,0 +1,944 @@
+package org.apache.hadoop.hdfs;
+
+import junit.framework.TestCase;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Level;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+/* File Append tests for HDFS-200 & HDFS-142, specifically focused on:
+ * using append()/sync() to recover block information
+ */
+public class TestFileAppend4 extends TestCase {
+ static final Log LOG = LogFactory.getLog(TestFileAppend4.class);
+ static final long BLOCK_SIZE = 1024;
+ static final long BBW_SIZE = 500; // don't align on bytes/checksum
+
+ static final Object [] NO_ARGS = new Object []{};
+
+ Configuration conf;
+ MiniDFSCluster cluster;
+ Path file1;
+ FSDataOutputStream stm;
+ boolean simulatedStorage = false;
+
+ {
+ ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ this.conf = new Configuration();
+ if (simulatedStorage) {
+ conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+ }
+ conf.setBoolean("dfs.support.append", true);
+
+ // lower heartbeat interval for fast recognition of DN death
+ conf.setInt("heartbeat.recheck.interval", 1000);
+ conf.setInt("dfs.heartbeat.interval", 1);
+ conf.setInt("dfs.socket.timeout", 5000);
+ // handle under-replicated blocks quickly (for replication asserts)
+// conf.set("dfs.replication.pending.timeout.sec", Integer.toString(5));
+ conf.setInt("dfs.replication.pending.timeout.sec", 5);
+ conf.setInt("dfs.replication.interval", 1);
+ // handle failures in the DFSClient pipeline quickly
+ // (for cluster.shutdown(); fs.close() idiom)
+ conf.setInt("ipc.client.connect.max.retries", 1);
+ conf.setInt("dfs.client.block.recovery.retries", 1);
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+
+ }
+
+ private void createFile(FileSystem whichfs, String filename,
+ int rep, long fileSize) throws Exception {
+ file1 = new Path(filename);
+ stm = whichfs.create(file1, true, (int)fileSize+1, (short)rep, BLOCK_SIZE);
+ LOG.info("Created file " + filename);
+ LOG.info("Writing " + fileSize + " bytes to " + file1);
+ AppendTestUtil.write(stm, 0, (int)fileSize);
+ }
+
+ private void assertFileSize(FileSystem whichfs, long expectedSize) throws Exception {
+ LOG.info("reading length of " + file1.getName() + " on namenode");
+ long realSize = whichfs.getFileStatus(file1).getLen();
+ assertTrue("unexpected file size! received=" + realSize
+ + " , expected=" + expectedSize,
+ realSize == expectedSize);
+ }
+
+ private void assertNumCurrentReplicas(short rep) throws Exception {
+ OutputStream hdfs_out = stm.getWrappedStream();
+ Method r = hdfs_out.getClass().getMethod("getNumCurrentReplicas",
+ new Class<?> []{});
+ r.setAccessible(true);
+ int actualRepl = ((Integer)r.invoke(hdfs_out, NO_ARGS)).intValue();
+ assertTrue(file1 + " should be replicated to " + rep + " datanodes, not " +
+ actualRepl + ".", actualRepl == rep);
+ }
+
+ private void loseLeases(FileSystem whichfs) throws Exception {
+ LOG.info("leasechecker.interruptAndJoin()");
+ // lose the lease on the client
+ DistributedFileSystem dfs = (DistributedFileSystem)whichfs;
+ dfs.dfs.leasechecker.interruptAndJoin();
+ }
+
+ /*
+ * Recover file.
+ * Try and open file in append mode.
+ * Doing this, we get a hold of the file that crashed writer
+ * was writing to. Once we have it, close it. This will
+ * allow subsequent reader to see up to last sync.
+ * NOTE: This is the same algorithm that HBase uses for file recovery
+ * @param fs
+ * @throws Exception
+ */
+ private void recoverFile(final FileSystem fs) throws Exception {
+ LOG.info("Recovering File Lease");
+
+ // set the soft limit to be 1 second so that the
+ // namenode triggers lease recovery upon append request
+ cluster.setLeasePeriod(1000, FSConstants.LEASE_HARDLIMIT_PERIOD);
+
+ // Trying recovery
+ int tries = 60;
+ boolean recovered = false;
+ FSDataOutputStream out = null;
+ while (!recovered && tries-- > 0) {
+ try {
+ out = fs.append(file1);
+ LOG.info("Successfully opened for appends");
+ recovered = true;
+ } catch (IOException e) {
+ LOG.info("Failed open for append, waiting on lease recovery");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ // ignore it and try again
+ }
+ }
+ }
+ if (out != null) {
+ try {
+ out.close();
+ LOG.info("Successfully obtained lease");
+ } catch (IOException e) {
+ LOG.info("Unable to close file after opening for appends. " + e);
+ recovered = false;
+ }
+// out.close();
+ }
+ if (!recovered) {
+ fail((tries > 0) ? "Recovery failed" : "Recovery should take < 1 min");
+ }
+ LOG.info("Past out lease recovery");
+ }
+
+ // Waits for all of the blocks to have expected replication
+ private void waitForBlockReplication(FileSystem whichfs, String filename,
+ int expected, long maxWaitSec)
+ throws IOException {
+ long start = System.currentTimeMillis();
+
+ //wait for all the blocks to be replicated;
+ LOG.info("Checking for block replication for " + filename);
+ int iters = 0;
+ while (true) {
+ boolean replOk = true;
+
+ BlockLocation[] bl = whichfs.getFileBlockLocations(
+ whichfs.getFileStatus(file1), 0, BLOCK_SIZE);
+ if(bl.length == 0) {
+ replOk = false;
+ }
+ for (BlockLocation b : bl) {
+
+ int actual = b.getNames().length;
+ if ( actual < expected ) {
+ if (true || iters > 0) {
+ LOG.info("Not enough replicas for " + b +
+ " yet. Expecting " + expected + ", got " +
+ actual + ".");
+ }
+ replOk = false;
+ break;
+ }
+ }
+
+ if (replOk) {
+ return;
+ }
+
+ iters++;
+
+ if (maxWaitSec > 0 &&
+ (System.currentTimeMillis() - start) > (maxWaitSec * 1000)) {
+ throw new IOException("Timedout while waiting for all blocks to " +
+ " be replicated for " + filename);
+ }
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {}
+ }
+ }
+
+ private void checkFile(FileSystem whichfs, long fileSize) throws Exception {
+ LOG.info("validating content from datanodes...");
+ AppendTestUtil.check(whichfs, file1, fileSize);
+ }
+
+ private void corruptDatanode(int dnNumber) throws Exception {
+ // get the FS data of the 2nd datanode
+ File data_dir = new File(System.getProperty("test.build.data"),
+ "dfs/data/data" +
+ Integer.toString(dnNumber*2 + 1) +
+ "/blocksBeingWritten");
+ int corrupted = 0;
+ for (File block : data_dir.listFiles()) {
+ // only touch the actual data, not the metadata (with CRC)
+ if (block.getName().startsWith("blk_") &&
+ !block.getName().endsWith("meta")) {
+ RandomAccessFile file = new RandomAccessFile(block, "rw");
+ FileChannel channel = file.getChannel();
+
+ Random r = new Random();
+ long lastBlockSize = channel.size() % 512;
+ long position = channel.size() - lastBlockSize;
+ int length = r.nextInt((int)(channel.size() - position + 1));
+ byte[] buffer = new byte[length];
+ r.nextBytes(buffer);
+
+ channel.write(ByteBuffer.wrap(buffer), position);
+ System.out.println("Deliberately corrupting file " + block.getName() +
+ " at offset " + position +
+ " length " + length);
+ file.close();
+ ++corrupted;
+ }
+ }
+ assertTrue("Should have some data in bbw to corrupt", corrupted > 0);
+ }
+
+ // test [1 bbw, 0 HDFS block]
+ public void testAppendSyncBbw() throws Exception {
+ LOG.info("START");
+ cluster = new MiniDFSCluster(conf, 1, true, null);
+ FileSystem fs1 = cluster.getFileSystem();;
+ FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
+ try {
+ createFile(fs1, "/bbw.test", 1, BBW_SIZE);
+ stm.sync();
+ // empty before close()
+ assertFileSize(fs1, 0);
+ loseLeases(fs1);
+ recoverFile(fs2);
+ // close() should write recovered bbw to HDFS block
+ assertFileSize(fs2, BBW_SIZE);
+ checkFile(fs2, BBW_SIZE);
+ } finally {
+ fs2.close();
+ fs1.close();
+ cluster.shutdown();
+ }
+ LOG.info("STOP");
+ }
+
+ // test [1 bbw, 0 HDFS block] with cluster restart
+ public void testAppendSyncBbwClusterRestart() throws Exception {
+ LOG.info("START");
+ cluster = new MiniDFSCluster(conf, 1, true, null);
+ FileSystem fs1 = cluster.getFileSystem();
+ FileSystem fs2 = null;
+ try {
+ createFile(fs1, "/bbwRestart.test", 1, BBW_SIZE);
+ stm.sync();
+ // empty before close()
+ assertFileSize(fs1, 0);
+
+ cluster.shutdown();
+ fs1.close(); // same as: loseLeases()
+ LOG.info("STOPPED first instance of the cluster");
+
+ cluster = new MiniDFSCluster(conf, 1, false, null);
+ cluster.waitActive();
+ LOG.info("START second instance.");
+
+ fs2 = cluster.getFileSystem();
+
+ recoverFile(fs2);
+
+ // close() should write recovered bbw to HDFS block
+ assertFileSize(fs2, BBW_SIZE);
+ checkFile(fs2, BBW_SIZE);
+
+ } finally {
+ if(fs2 != null) {
+ fs2.close();
+ }
+ fs1.close();
+ cluster.shutdown();
+ }
+ LOG.info("STOP");
+ }
+
+
+ // test [3 bbw, 0 HDFS block] with cluster restart
+ // ** previous HDFS-142 patches hit an problem with multiple outstanding bbw on a single disk**
+ public void testAppendSync2XBbwClusterRestart() throws Exception {
+ LOG.info("START");
+ cluster = new MiniDFSCluster(conf, 1, true, null);
+ // assumption: this MiniDFS starts up 1 datanode with 2 dirs to load balance
+ assertTrue(cluster.getDataNodes().get(0).getConf().get("dfs.data.dir").matches("[^,]+,[^,]*"));
+ FileSystem fs1 = cluster.getFileSystem();
+ FileSystem fs2 = null;
+ try {
+ // create 3 bbw files [so at least one dir has 2 files]
+ int[] files = new int[]{0,1,2};
+ Path[] paths = new Path[files.length];
+ FSDataOutputStream[] stms = new FSDataOutputStream[files.length];
+ for (int i : files ) {
+ createFile(fs1, "/bbwRestart" + i + ".test", 1, BBW_SIZE);
+ stm.sync();
+ assertFileSize(fs1, 0);
+ paths[i] = file1;
+ stms[i] = stm;
+ }
+
+ cluster.shutdown();
+ fs1.close(); // same as: loseLeases()
+ LOG.info("STOPPED first instance of the cluster");
+
+ cluster = new MiniDFSCluster(conf, 1, false, null);
+ cluster.waitActive();
+ LOG.info("START second instance.");
+
+ fs2 = cluster.getFileSystem();
+
+ // recover 3 bbw files
+ for (int i : files) {
+ file1 = paths[i];
+ recoverFile(fs2);
+ assertFileSize(fs2, BBW_SIZE);
+ checkFile(fs2, BBW_SIZE);
+ }
+ } finally {
+ if(fs2 != null) {
+ fs2.close();
+ }
+ fs1.close();
+ cluster.shutdown();
+ }
+ LOG.info("STOP");
+ }
+ // test [1 bbw, 1 HDFS block]
+ public void testAppendSyncBlockPlusBbw() throws Exception {
+ LOG.info("START");
+ cluster = new MiniDFSCluster(conf, 1, true, null);
+ FileSystem fs1 = cluster.getFileSystem();;
+ FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
+ try {
+ createFile(fs1, "/blockPlusBbw.test", 1, BLOCK_SIZE + BBW_SIZE);
+ // 0 before sync()
+ assertFileSize(fs1, 0);
+ stm.sync();
+ // BLOCK_SIZE after sync()
+ assertFileSize(fs1, BLOCK_SIZE);
+ loseLeases(fs1);
+ recoverFile(fs2);
+ // close() should write recovered bbw to HDFS block
+ assertFileSize(fs2, BLOCK_SIZE + BBW_SIZE);
+ checkFile(fs2, BLOCK_SIZE + BBW_SIZE);
+ } finally {
+ stm = null;
+ fs2.close();
+ fs1.close();
+ cluster.shutdown();
+ }
+ LOG.info("STOP");
+ }
+
+ // we test different datanodes restarting to exercise
+ // the start, middle, & end of the DFSOutputStream pipeline
+ public void testAppendSyncReplication0() throws Exception {
+ replicationTest(0);
+ }
+ public void testAppendSyncReplication1() throws Exception {
+ replicationTest(1);
+ }
+ public void testAppendSyncReplication2() throws Exception {
+ replicationTest(2);
+ }
+
+ void replicationTest(int badDN) throws Exception {
+ LOG.info("START");
+ cluster = new MiniDFSCluster(conf, 3, true, null);
+ FileSystem fs1 = cluster.getFileSystem();
+ try {
+ int halfBlock = (int)BLOCK_SIZE/2;
+ short rep = 3; // replication
+ assertTrue(BLOCK_SIZE%4 == 0);
+
+ file1 = new Path("/appendWithReplication.dat");
+
+ // write 1/2 block & sync
+ stm = fs1.create(file1, true, (int)BLOCK_SIZE*2, rep, BLOCK_SIZE);
+ AppendTestUtil.write(stm, 0, halfBlock);
+ stm.sync();
+ assertNumCurrentReplicas(rep);
+
+ // close one of the datanodes
+ cluster.stopDataNode(badDN);
+
+ // write 1/4 block & sync
+ AppendTestUtil.write(stm, halfBlock, (int)BLOCK_SIZE/4);
+ stm.sync();
+ assertNumCurrentReplicas((short)(rep - 1));
+
+ // restart the cluster
+ /*
+ * we put the namenode in safe mode first so he doesn't process
+ * recoverBlock() commands from the remaining DFSClient as datanodes
+ * are serially shutdown
+ */
+ cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+ cluster.shutdown();
+ fs1.close();
+ LOG.info("STOPPED first instance of the cluster");
+ cluster = new MiniDFSCluster(conf, 3, false, null);
+ cluster.getNameNode().getNamesystem().stallReplicationWork();
+ cluster.waitActive();
+ fs1 = cluster.getFileSystem();
+ LOG.info("START second instance.");
+
+ recoverFile(fs1);
+
+ // the 2 DNs with the larger sequence number should win
+ BlockLocation[] bl = fs1.getFileBlockLocations(
+ fs1.getFileStatus(file1), 0, BLOCK_SIZE);
+ assertTrue("Should have one block", bl.length == 1);
+ assertTrue("Should have 2 replicas for that block, not " +
+ bl[0].getNames().length, bl[0].getNames().length == 2);
+
+ assertFileSize(fs1, BLOCK_SIZE*3/4);
+ checkFile(fs1, BLOCK_SIZE*3/4);
+
+ // verify that, over time, the block has been replicated to 3 DN
+ cluster.getNameNode().getNamesystem().restartReplicationWork();
+ waitForBlockReplication(fs1, file1.toString(), 3, 20);
+ } finally {
+ fs1.close();
+ cluster.shutdown();
+ }
+ }
+
+ // we test different datanodes restarting to exercise
+ // the start, middle, & end of the DFSOutputStream pipeline
+ public void testAppendSyncChecksum0() throws Exception {
+ checksumTest(0);
+ }
+ public void testAppendSyncChecksum1() throws Exception {
+ checksumTest(1);
+ }
+ public void testAppendSyncChecksum2() throws Exception {
+ checksumTest(2);
+ }
+
+ void checksumTest(int goodDN) throws Exception {
+ int deadDN = (goodDN + 1) % 3;
+ int corruptDN = (goodDN + 2) % 3;
+
+ LOG.info("START");
+ cluster = new MiniDFSCluster(conf, 3, true, null);
+ FileSystem fs1 = cluster.getFileSystem();
+ try {
+ int halfBlock = (int)BLOCK_SIZE/2;
+ short rep = 3; // replication
+ assertTrue(BLOCK_SIZE%8 == 0);
+
+ file1 = new Path("/appendWithReplication.dat");
+
+ // write 1/2 block & sync
+ stm = fs1.create(file1, true, (int)BLOCK_SIZE*2, rep, BLOCK_SIZE);
+ AppendTestUtil.write(stm, 0, halfBlock);
+ stm.sync();
+ assertNumCurrentReplicas(rep);
+
+ // close one of the datanodes
+ cluster.stopDataNode(deadDN);
+
+ // write 1/4 block & sync
+ AppendTestUtil.write(stm, halfBlock, (int)BLOCK_SIZE/4);
+ stm.sync();
+ assertNumCurrentReplicas((short)(rep - 1));
+
+ // stop the cluster
+ cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+ cluster.shutdown();
+ fs1.close();
+ LOG.info("STOPPED first instance of the cluster");
+
+ // give the second datanode a bad CRC
+ corruptDatanode(corruptDN);
+
+ // restart the cluster
+ cluster = new MiniDFSCluster(conf, 3, false, null);
+ cluster.getNameNode().getNamesystem().stallReplicationWork();
+ cluster.waitActive();
+ fs1 = cluster.getFileSystem();
+ LOG.info("START second instance.");
+
+ // verify that only the good datanode's file is used
+ recoverFile(fs1);
+
+ BlockLocation[] bl = fs1.getFileBlockLocations(
+ fs1.getFileStatus(file1), 0, BLOCK_SIZE);
+ assertTrue("Should have one block", bl.length == 1);
+ assertTrue("Should have 1 replica for that block, not " +
+ bl[0].getNames().length, bl[0].getNames().length == 1);
+
+ assertTrue("The replica should be the datanode with the correct CRC",
+ cluster.getDataNodes().get(goodDN).getSelfAddr().toString()
+ .endsWith(bl[0].getNames()[0]) );
+ assertFileSize(fs1, BLOCK_SIZE*3/4);
+
+ // should fail checkFile() if data with the bad CRC was used
+ checkFile(fs1, BLOCK_SIZE*3/4);
+
+ // ensure proper re-replication
+ cluster.getNameNode().getNamesystem().restartReplicationWork();
+ waitForBlockReplication(fs1, file1.toString(), 3, 20);
+ } finally {
+ cluster.shutdown();
+ fs1.close();
+ }
+ }
+
+ // we test different datanodes dying and not coming back
+ public void testDnDeath0() throws Exception {
+ dnDeathTest(0);
+ }
+ public void testDnDeath1() throws Exception {
+ dnDeathTest(1);
+ }
+ public void testDnDeath2() throws Exception {
+ dnDeathTest(2);
+ }
+
+ /**
+ * Test case that writes and completes a file, and then
+ * tries to recover the file after the old primary
+ * DN has failed.
+ */
+ void dnDeathTest(int badDN) throws Exception {
+ LOG.info("START");
+ cluster = new MiniDFSCluster(conf, 3, true, null);
+ FileSystem fs1 = cluster.getFileSystem();
+ try {
+ int halfBlock = (int)BLOCK_SIZE/2;
+ short rep = 3; // replication
+ assertTrue(BLOCK_SIZE%4 == 0);
+
+ file1 = new Path("/dnDeath.dat");
+
+ // write 1/2 block & close
+ stm = fs1.create(file1, true, (int)BLOCK_SIZE*2, rep, BLOCK_SIZE);
+ AppendTestUtil.write(stm, 0, halfBlock);
+ stm.close();
+
+ // close one of the datanodes
+ cluster.stopDataNode(badDN);
+
+ // Recover the lease
+ recoverFile(fs1);
+ checkFile(fs1, halfBlock);
+ } finally {
+ fs1.close();
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test case that stops a writer after finalizing a block but
+ * before calling completeFile, and then tries to recover
+ * the lease.
+ */
+ public void testRecoverFinalizedBlock() throws Throwable {
+ cluster = new MiniDFSCluster(conf, 3, true, null);
+
+ try {
+ cluster.waitActive();
+ NameNode preSpyNN = cluster.getNameNode();
+ NameNode spyNN = spy(preSpyNN);
+
+ // Delay completeFile
+ DelayAnswer delayer = new DelayAnswer();
+ doAnswer(delayer).when(spyNN).complete(anyString(), anyString());
+
+ DFSClient client = new DFSClient(null, spyNN, conf, null);
+ file1 = new Path("/testRecoverFinalized");
+ final OutputStream stm = client.create("/testRecoverFinalized", true);
+
+ // write 1/2 block
+ AppendTestUtil.write(stm, 0, 4096);
+ final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ stm.close();
+ } catch (Throwable t) {
+ err.set(t);
+ }
+ }};
+ t.start();
+ LOG.info("Waiting for close to get to latch...");
+ delayer.waitForCall();
+
+ // At this point, the block is finalized on the DNs, but the file
+ // has not been completed in the NN.
+ // Lose the leases
+ LOG.info("Killing lease checker");
+ client.leasechecker.interruptAndJoin();
+
+ FileSystem fs1 = cluster.getFileSystem();
+ FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
+ fs1.getConf());
+
+ LOG.info("Recovering file");
+ recoverFile(fs2);
+
+ LOG.info("Telling close to proceed.");
+ delayer.proceed();
+ LOG.info("Waiting for close to finish.");
+ t.join();
+ LOG.info("Close finished.");
+
+ // We expect that close will get a "Could not complete file"
+ // error.
+ Throwable thrownByClose = err.get();
+ assertNotNull(thrownByClose);
+ assertTrue(thrownByClose instanceof IOException);
+ if (!thrownByClose.getMessage().contains("Could not complete write")) {
+ throw thrownByClose;
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test for an intermittent failure of commitBlockSynchronization.
+ * This could happen if the DN crashed between calling updateBlocks
+ * and commitBlockSynchronization.
+ */
+ public void testDatanodeFailsToCommit() throws Throwable {
+ LOG.info("START");
+ cluster = new MiniDFSCluster(conf, 1, true, null);
+ FileSystem fs1 = cluster.getFileSystem();;
+ FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
+ try {
+ createFile(fs1, "/datanodeFailsCommit.test", 1, BBW_SIZE);
+ stm.sync();
+ loseLeases(fs1);
+
+ // Make the NN fail to commitBlockSynchronization one time
+ NameNode nn = cluster.getNameNode();
+ nn.namesystem = spy(nn.namesystem);
+ doAnswer(new ThrowNTimesAnswer(IOException.class, 1)).
+ when(nn.namesystem).
+ commitBlockSynchronization((Block)anyObject(), anyInt(), anyInt(),
+ anyBoolean(), anyBoolean(),
+ (DatanodeID[])anyObject());
+
+ recoverFile(fs2);
+ // close() should write recovered bbw to HDFS block
+ assertFileSize(fs2, BBW_SIZE);
+ checkFile(fs2, BBW_SIZE);
+ } finally {
+ fs2.close();
+ fs1.close();
+ cluster.shutdown();
+ }
+ LOG.info("STOP");
+ }
+
+ /**
+ * Test that when a DN starts up with bbws from a file that got
+ * removed or finalized when it was down, the block gets deleted.
+ */
+ public void testBBWCleanupOnStartup() throws Throwable {
+ LOG.info("START");
+ cluster = new MiniDFSCluster(conf, 3, true, null);
+ FileSystem fs1 = cluster.getFileSystem();
+ try {
+ int halfBlock = (int) BLOCK_SIZE / 2;
+ short rep = 3; // replication
+ assertTrue(BLOCK_SIZE % 4 == 0);
+
+ file1 = new Path("/bbwCleanupOnStartup.dat");
+
+ // write 1/2 block & sync
+ stm = fs1.create(file1, true, (int) BLOCK_SIZE * 2, rep, BLOCK_SIZE);
+ AppendTestUtil.write(stm, 0, halfBlock);
+ stm.sync();
+
+ String dataDirs = cluster.getDataNodes().get(0).getConf().get("dfs.data.dir");
+ // close one of the datanodes
+ MiniDFSCluster.DataNodeProperties dnprops = cluster.stopDataNode(0);
+
+ stm.close();
+
+ List<File> bbwFilesAfterShutdown = getBBWFiles(dataDirs);
+ assertEquals(1, bbwFilesAfterShutdown.size());
+
+ assertTrue(cluster.restartDataNode(dnprops));
+
+ List<File> bbwFilesAfterRestart = null;
+ // Wait up to 10 heartbeats for the files to get removed - it should
+ // really happen after just a couple.
+ for (int i = 0; i < 10; i++) {
+ LOG.info("Waiting for heartbeat #" + i + " after DN restart");
+ cluster.waitForDNHeartbeat(0, 10000);
+
+ // Check if it has been deleted
+ bbwFilesAfterRestart = getBBWFiles(dataDirs);
+ if (bbwFilesAfterRestart.size() == 0) {
+ break;
+ }
+ }
+
+ assertEquals(0, bbwFilesAfterRestart.size());
+
+ } finally {
+ fs1.close();
+ cluster.shutdown();
+ }
+ }
+
+ private List<File> getBBWFiles(String dfsDataDirs) {
+ ArrayList<File> files = new ArrayList<File>();
+ for (String dirString : dfsDataDirs.split(",")) {
+ File dir = new File(dirString);
+ assertTrue("data dir " + dir + " should exist",
+ dir.exists());
+ File bbwDir = new File(dir, "blocksBeingWritten");
+ assertTrue("bbw dir " + bbwDir + " should eixst",
+ bbwDir.exists());
+ for (File blockFile : bbwDir.listFiles()) {
+ if (!blockFile.getName().endsWith(".meta")) {
+ files.add(blockFile);
+ }
+ }
+ }
+ return files;
+ }
+
+ /**
+ * Test for following sequence:
+ * 1. Client finishes writing a block, but does not allocate next one
+ * 2. Client loses lease
+ * 3. Recovery process starts, but commitBlockSynchronization not called yet
+ * 4. Client calls addBlock and continues writing
+ * 5. commitBlockSynchronization proceeds
+ * 6. Original client tries to write/close
+ */
+ public void testRecoveryOnBlockBoundary() throws Throwable {
+ LOG.info("START");
+ cluster = new MiniDFSCluster(conf, 1, true, null);
+ FileSystem fs1 = cluster.getFileSystem();
+ ;
+ final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
+
+ // Allow us to delay commitBlockSynchronization
+ DelayAnswer delayer = new DelayAnswer();
+ NameNode nn = cluster.getNameNode();
+ nn.namesystem = spy(nn.namesystem);
+ doAnswer(delayer).
+ when(nn.namesystem).
+ commitBlockSynchronization((Block) anyObject(), anyInt(), anyInt(),
+ anyBoolean(), anyBoolean(),
+ (DatanodeID[]) anyObject());
+
+ try {
+ file1 = new Path("/testWritingDuringRecovery.test");
+ stm = fs1.create(file1, true, (int) BLOCK_SIZE * 2, (short) 3, BLOCK_SIZE);
+ AppendTestUtil.write(stm, 0, (int) (BLOCK_SIZE));
+ stm.sync();
+
+ LOG.info("Losing lease");
+ loseLeases(fs1);
+
+
+ LOG.info("Triggering recovery in another thread");
+
+ final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
+ Thread recoverThread = new Thread() {
+ public void run() {
+ try {
+ recoverFile(fs2);
+ } catch (Throwable t) {
+ err.set(t);
+ }
+ }
+ };
+ recoverThread.start();
+
+ LOG.info("Waiting for recovery about to call commitBlockSynchronization");
+ delayer.waitForCall();
+
+ LOG.info("Continuing to write to stream");
+ AppendTestUtil.write(stm, 0, (int) (BLOCK_SIZE));
+ try {
+ stm.sync();
+ fail("Sync was allowed after recovery started");
+ } catch (IOException ioe) {
+ LOG.info("Got expected IOE trying to write to a file from the writer " +
+ "that lost its lease", ioe);
+ }
+
+ LOG.info("Written more to stream, allowing commit to proceed");
+ delayer.proceed();
+
+ LOG.info("Joining on recovery thread");
+ recoverThread.join();
+ if (err.get() != null) {
+ throw err.get();
+ }
+
+ LOG.info("Now that recovery has finished, still expect further writes to fail.");
+ try {
+ AppendTestUtil.write(stm, 0, (int) (BLOCK_SIZE));
+ stm.sync();
+ fail("Further writes after recovery finished did not fail!");
+ } catch (IOException ioe) {
+ LOG.info("Got expected exception", ioe);
+ }
+
+
+ LOG.info("Checking that file looks good");
+
+ // close() should write recovered only the first successful
+ // writes
+ assertFileSize(fs2, BLOCK_SIZE);
+ checkFile(fs2, BLOCK_SIZE);
+ } finally {
+ try {
+ fs2.close();
+ fs1.close();
+ cluster.shutdown();
+ } catch (Throwable t) {
+ LOG.warn("Didn't close down cleanly", t);
+ }
+ }
+ LOG.info("STOP");
+ }
+
+ /**
+ * Mockito answer helper that triggers one latch as soon as the
+ * method is called, then waits on another before continuing.
+ */
+ private static class DelayAnswer implements Answer {
+ private final CountDownLatch fireLatch = new CountDownLatch(1);
+ private final CountDownLatch waitLatch = new CountDownLatch(1);
+
+ /**
+ * Wait until the method is called.
+ */
+ public void waitForCall() throws InterruptedException {
+ fireLatch.await();
+ }
+
+ /**
+ * Tell the method to proceed.
+ * This should only be called after waitForCall()
+ */
+ public void proceed() {
+ waitLatch.countDown();
+ }
+
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ LOG.info("DelayAnswer firing fireLatch");
+ fireLatch.countDown();
+ try {
+ LOG.info("DelayAnswer waiting on waitLatch");
+ waitLatch.await();
+ LOG.info("DelayAnswer delay complete");
+ } catch (InterruptedException ie) {
+ throw new IOException("Interrupted waiting on latch", ie);
+ }
+ return invocation.callRealMethod();
+ }
+ }
+
+ /**
+ * Mockito answer helper that will throw an exception a given number
+ * of times before eventually succeding.
+ */
+ private static class ThrowNTimesAnswer implements Answer {
+ private int numTimesToThrow;
+ private Class<? extends Throwable> exceptionClass;
+
+ public ThrowNTimesAnswer(Class<? extends Throwable> exceptionClass,
+ int numTimesToThrow) {
+ this.exceptionClass = exceptionClass;
+ this.numTimesToThrow = numTimesToThrow;
+ }
+
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ if (numTimesToThrow-- > 0) {
+ throw exceptionClass.newInstance();
+ }
+
+ return invocation.callRealMethod();
+ }
+ }
+
+}
Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java Thu Jun 10 22:25:39 2010
@@ -27,6 +27,8 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetTestUtil;
+
import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
@@ -106,9 +108,10 @@ public class TestLeaseRecovery extends j
//update blocks with random block sizes
Block[] newblocks = new Block[REPLICATION_NUM];
for(int i = 0; i < REPLICATION_NUM; i++) {
+ DataNode dn = datanodes[i];
+ FSDatasetTestUtil.truncateBlock(dn, lastblock, newblocksizes[i]);
newblocks[i] = new Block(lastblock.getBlockId(), newblocksizes[i],
lastblock.getGenerationStamp());
- idps[i].updateBlock(lastblock, newblocks[i], false);
checkMetaInfo(newblocks[i], idps[i]);
}
Added: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java?rev=953482&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java (added)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java Thu Jun 10 22:25:39 2010
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.io.File;
+import org.apache.hadoop.hdfs.protocol.Block;
+
+public abstract class FSDatasetTestUtil {
+
+ /**
+ * Truncate the given block in place, such that the new truncated block
+ * is still valid (ie checksums are updated to stay in sync with block file)
+ */
+ public static void truncateBlock(DataNode dn,
+ Block block,
+ long newLength)
+ throws IOException
+ {
+ FSDataset ds = (FSDataset)dn.data;
+
+ File blockFile = ds.findBlockFile(block.getBlockId());
+ if (blockFile == null) {
+ throw new IOException("Can't find block file for block " +
+ block + " on DN " + dn);
+ }
+ File metaFile = ds.findMetaFile(blockFile);
+ FSDataset.truncateBlock(blockFile, metaFile,
+ block.getNumBytes(), newLength);
+ }
+
+}
\ No newline at end of file