You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/08/12 18:58:02 UTC
svn commit: r803601 - in /hadoop/hdfs/branches/HDFS-265: ./
src/java/org/apache/hadoop/hdfs/server/common/
src/java/org/apache/hadoop/hdfs/server/datanode/
src/test/hdfs/org/apache/hadoop/hdfs/
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/
Author: hairong
Date: Wed Aug 12 16:58:02 2009
New Revision: 803601
URL: http://svn.apache.org/viewvc?rev=803601&view=rev
Log:
HDFS-509. Redesign DataNode volumeMap to include all types of Replicas. Contributed by Hairong Kuang.
Added:
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
Modified:
hadoop/hdfs/branches/HDFS-265/CHANGES.txt
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=803601&r1=803600&r2=803601&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Wed Aug 12 16:58:02 2009
@@ -92,6 +92,9 @@
HDFS-451. Add fault injection tests, Pipeline_Fi_06,07,14,15, for
DataTransferProtocol. (szetszwo)
+ HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.
+ (hairong)
+
BUG FIXES
HDFS-76. Better error message to users when commands fail because of
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=803601&r1=803600&r2=803601&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Wed Aug 12 16:58:02 2009
@@ -79,5 +79,16 @@
return description;
}
}
+
+ /**
+ * Define Replica Type
+ */
+ static public enum ReplicaState {
+ FINALIZED, // finalized replica
+ RBW, // replica being written
+ RWR, // replica waiting to be recovered
+ RUR, // replica under recovery
+ TEMPORARY // temporary replica
+ }
}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=803601&r1=803600&r2=803601&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Aug 12 16:58:02 2009
@@ -1560,9 +1560,8 @@
// This can happen if the namenode and client start recovering the same
// file at the same time.
synchronized (ongoingRecovery) {
- Block tmp = new Block();
- tmp.set(block.getBlockId(), block.getNumBytes(), GenerationStamp.WILDCARD_STAMP);
- if (ongoingRecovery.get(tmp) != null) {
+ if (ongoingRecovery.get(new Block(block.getBlockId(), block.getNumBytes(),
+ GenerationStamp.WILDCARD_STAMP)) != null) {
String msg = "Block " + block + " is already being recovered, " +
" ignoring this request to recover it.";
LOG.info(msg);
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=803601&r1=803600&r2=803601&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Aug 12 16:58:02 2009
@@ -36,6 +36,7 @@
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.conf.*;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
@@ -105,8 +106,9 @@
if ( ! metaData.renameTo( newmeta ) ||
! src.renameTo( dest ) ) {
throw new IOException( "could not move files for " + b +
- " from tmp to " +
- dest.getAbsolutePath() );
+ " from " + src + " to " +
+ dest.getAbsolutePath() + " or from"
+ + metaData + " to " + newmeta);
}
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("addBlock: Moved " + metaData + " to " + newmeta);
@@ -171,7 +173,7 @@
return Block.GRANDFATHER_GENERATION_STAMP;
}
- void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap, FSVolume volume) {
+ void getVolumeMap(ReplicasMap volumeMap, FSVolume volume) {
if (children != null) {
for (int i = 0; i < children.length; i++) {
children[i].getVolumeMap(volumeMap, volume);
@@ -179,11 +181,18 @@
}
File blockFiles[] = dir.listFiles();
- for (int i = 0; i < blockFiles.length; i++) {
- if (Block.isBlockFilename(blockFiles[i])) {
- long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
- volumeMap.put(new Block(blockFiles[i], blockFiles[i].length(), genStamp),
- new ReplicaInfo(volume, blockFiles[i]));
+ for (File blockFile : blockFiles) {
+ if (Block.isBlockFilename(blockFile)) {
+ long genStamp = getGenerationStampFromFile(blockFiles, blockFile);
+ long blockId = Block.filename2id(blockFile.getName());
+ ReplicaInfo oldReplica = volumeMap.add(
+ new FinalizedReplica(blockId, blockFile.length(), genStamp,
+ volume, blockFile.getParentFile()));
+ if (oldReplica != null) {
+ DataNode.LOG.warn("Two block files have the same block id exits " +
+ "on disk: " + oldReplica.getBlockFile() +
+ " and " + blockFile );
+ }
}
}
}
@@ -403,7 +412,7 @@
DiskChecker.checkDir(tmpDir);
}
- void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
+ void getVolumeMap(ReplicasMap volumeMap) {
dataDir.getVolumeMap(volumeMap, this);
}
@@ -496,7 +505,7 @@
return remaining;
}
- synchronized void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
+ synchronized void getVolumeMap(ReplicasMap volumeMap) {
for (int idx = 0; idx < volumes.length; idx++) {
volumes[idx].getVolumeMap(volumeMap);
}
@@ -538,24 +547,6 @@
public static final short METADATA_VERSION = 1;
- static class ActiveFile {
- final File file;
- final List<Thread> threads = new ArrayList<Thread>(2);
-
- ActiveFile(File f, List<Thread> list) {
- file = f;
- if (list != null) {
- threads.addAll(list);
- }
- threads.add(Thread.currentThread());
- }
-
- public String toString() {
- return getClass().getSimpleName() + "(file=" + file
- + ", threads=" + threads + ")";
- }
- }
-
static String getMetaFileName(String blockFileName, long genStamp) {
return blockFileName + "_" + genStamp + METADATA_EXTENSION;
}
@@ -605,22 +596,7 @@
/** Return the block file for the given ID */
public File findBlockFile(long blockId) {
- final Block b = new Block(blockId);
- File blockfile = null;
- ActiveFile activefile = ongoingCreates.get(b);
- if (activefile != null) {
- blockfile = activefile.file;
- }
- if (blockfile == null) {
- blockfile = getFile(b);
- }
- if (blockfile == null) {
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("ongoingCreates=" + ongoingCreates);
- DataNode.LOG.debug("volumeMap=" + volumeMap);
- }
- }
- return blockfile;
+ return getFile(blockId);
}
/** {@inheritDoc} */
@@ -651,9 +627,8 @@
}
FSVolumeSet volumes;
- private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
private int maxBlocksPerDir = 0;
- HashMap<Block,ReplicaInfo> volumeMap = null;
+ ReplicasMap volumeMap = new ReplicasMap();
static Random random = new Random();
// Used for synchronizing access to usage stats
@@ -669,7 +644,6 @@
volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
}
volumes = new FSVolumeSet(volArray);
- volumeMap = new HashMap<Block, ReplicaInfo>();
volumes.getVolumeMap(volumeMap);
registerMBean(storage.getStorageID());
}
@@ -737,15 +711,27 @@
}
/**
- * Returns handles to the block file and its metadata file
+ * Get the meta info of a block stored in volumeMap
+ * @param b block
+ * @return the meta replica information
+ * @throws IOException if no entry is in the map or
+ * there is a generation stamp mismatch
*/
- public synchronized BlockInputStreams getTmpInputStreams(Block b,
- long blkOffset, long ckoff) throws IOException {
-
+ private ReplicaInfo getReplicaInfo(Block b) throws IOException {
ReplicaInfo info = volumeMap.get(b);
if (info == null) {
throw new IOException("Block " + b + " does not exist in volumeMap.");
}
+ return info;
+ }
+
+ /**
+ * Returns handles to the block file and its metadata file
+ */
+ public synchronized BlockInputStreams getTmpInputStreams(Block b,
+ long blkOffset, long ckoff) throws IOException {
+
+ ReplicaInfo info = getReplicaInfo(b);
FSVolume v = info.getVolume();
File blockFile = v.getTmpFile(b);
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
@@ -774,23 +760,16 @@
* @param block Block
* @param numLinks Detach if the number of links exceed this value
* @throws IOException
- * @return - true if the specified block was detached
+ * @return - true if the specified block was detached or the block
+ * is not in any snapshot.
*/
public boolean detachBlock(Block block, int numLinks) throws IOException {
ReplicaInfo info = null;
synchronized (this) {
- info = volumeMap.get(block);
- }
- return info.detachBlock(block, numLinks);
- }
-
- static private <T> void updateBlockMap(Map<Block, T> blockmap,
- Block oldblock, Block newblock) throws IOException {
- if (blockmap.containsKey(oldblock)) {
- T value = blockmap.remove(oldblock);
- blockmap.put(newblock, value);
+ info = getReplicaInfo(block);
}
+ return info.detachBlock(numLinks);
}
/** {@inheritDoc} */
@@ -822,18 +801,24 @@
/**
* Try to update an old block to a new block.
- * If there are ongoing create threads running for the old block,
+ * If there are write threads running for the old block,
* the threads will be returned without updating the block.
*
- * @return ongoing create threads if there is any. Otherwise, return null.
+ * @return write threads if there is any. Otherwise, return null.
*/
private synchronized List<Thread> tryUpdateBlock(
Block oldblock, Block newblock) throws IOException {
- //check ongoing create threads
- final ActiveFile activefile = ongoingCreates.get(oldblock);
- if (activefile != null && !activefile.threads.isEmpty()) {
+ //check write threads
+ final ReplicaInfo replicaInfo = volumeMap.get(oldblock.getBlockId());
+ File blockFile = replicaInfo==null?null:replicaInfo.getBlockFile();
+ if (blockFile == null) {
+ throw new IOException("Block " + oldblock + " does not exist.");
+ }
+
+ if (replicaInfo instanceof ReplicaInPipeline) {
+ List<Thread> threads = ((ReplicaInPipeline)replicaInfo).getThreads();
//remove dead threads
- for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
+ for(Iterator<Thread> i = threads.iterator(); i.hasNext(); ) {
final Thread t = i.next();
if (!t.isAlive()) {
i.remove();
@@ -841,19 +826,14 @@
}
//return living threads
- if (!activefile.threads.isEmpty()) {
- return new ArrayList<Thread>(activefile.threads);
+ if (!threads.isEmpty()) {
+ return new ArrayList<Thread>(threads);
}
}
//No ongoing create threads is alive. Update block.
- File blockFile = findBlockFile(oldblock.getBlockId());
- if (blockFile == null) {
- throw new IOException("Block " + oldblock + " does not exist.");
- }
-
- File oldMetaFile = findMetaFile(blockFile);
- long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
+ File oldMetaFile = replicaInfo.getMetaFile();
+ long oldgs = replicaInfo.getGenerationStamp();
//rename meta file to a tmp file
File tmpMetaFile = new File(oldMetaFile.getParent(),
@@ -863,7 +843,7 @@
}
//update generation stamp
- if (oldgs > newblock.getGenerationStamp()) {
+ if (oldgs >= newblock.getGenerationStamp()) {
throw new IOException("Cannot update block (id=" + newblock.getBlockId()
+ ") generation stamp from " + oldgs
+ " to " + newblock.getGenerationStamp());
@@ -878,15 +858,16 @@
truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes());
}
+ // update replicaInfo
+ replicaInfo.setGenerationStamp(newblock.getGenerationStamp());
+ replicaInfo.setNumBytes(newblock.getNumBytes());
+
//rename the tmp file to the new meta file (with new generation stamp)
- File newMetaFile = getMetaFile(blockFile, newblock);
+ File newMetaFile = replicaInfo.getMetaFile();
if (!tmpMetaFile.renameTo(newMetaFile)) {
throw new IOException("Cannot rename tmp meta file to " + newMetaFile);
}
- updateBlockMap(ongoingCreates, oldblock, newblock);
- updateBlockMap(volumeMap, oldblock, newblock);
-
// paranoia! verify that the contents of the stored block
// matches the block file on disk.
validateBlockMetadata(newblock);
@@ -964,6 +945,7 @@
//
// Make sure the block isn't a valid one - we're still creating it!
//
+ ReplicaInfo replicaInfo = volumeMap.get(b);
if (isValidBlock(b)) {
if (!isRecovery) {
throw new BlockAlreadyExistsException("Block " + b + " is valid, and cannot be written to.");
@@ -973,7 +955,9 @@
// some of the packets were not received by the client. The client
// re-opens the connection and retries sending those packets.
// The other reason is that an "append" is occurring to this block.
- detachBlock(b, 1);
+ if (replicaInfo != null) {
+ replicaInfo.detachBlock(1);
+ }
}
long blockSize = b.getNumBytes();
@@ -986,10 +970,9 @@
//
// Is it already in the create process?
//
- ActiveFile activeFile = ongoingCreates.get(b);
- if (activeFile != null) {
- f = activeFile.file;
- threads = activeFile.threads;
+ if (replicaInfo != null && replicaInfo instanceof ReplicaInPipeline) {
+ f = replicaInfo.getBlockFile();
+ threads = ((ReplicaInPipeline)replicaInfo).getThreads();
if (!isRecovery) {
throw new BlockAlreadyExistsException("Block " + b +
@@ -999,27 +982,27 @@
thread.interrupt();
}
}
- ongoingCreates.remove(b);
}
FSVolume v = null;
- if (!isRecovery) {
+ if (!isRecovery) { // create a new block
v = volumes.getNextVolume(blockSize);
// create temporary file to hold block in the designated volume
f = createTmpFile(v, b);
- volumeMap.put(b, new ReplicaInfo(v));
+ replicaInfo = new ReplicaInPipeline(b.getBlockId(),
+ b.getGenerationStamp(), v, f.getParentFile());
+ volumeMap.add(replicaInfo);
} else if (f != null) {
DataNode.LOG.info("Reopen already-open Block for append " + b);
- // create or reuse temporary file to hold block in the designated volume
- v = volumeMap.get(b).getVolume();
- volumeMap.put(b, new ReplicaInfo(v));
} else {
// reopening block for appending to it.
DataNode.LOG.info("Reopen Block for append " + b);
- v = volumeMap.get(b).getVolume();
+ v = replicaInfo.getVolume();
f = createTmpFile(v, b);
- File blkfile = getBlockFile(b);
- File oldmeta = getMetaFile(b);
- File newmeta = getMetaFile(f, b);
+ File blkfile = replicaInfo.getBlockFile();
+ File oldmeta = replicaInfo.getMetaFile();
+ replicaInfo = new ReplicaInPipeline(replicaInfo,
+ v, f.getParentFile(), threads);
+ File newmeta = replicaInfo.getMetaFile();
// rename meta file to tmp directory
DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
@@ -1042,7 +1025,7 @@
" to tmp dir " + f);
}
}
- volumeMap.put(b, new ReplicaInfo(v));
+ volumeMap.add(replicaInfo);
}
if (f == null) {
DataNode.LOG.warn("Block " + b + " reopen failed " +
@@ -1050,7 +1033,6 @@
throw new IOException("Block " + b + " reopen failed " +
" Unable to locate tmp file.");
}
- ongoingCreates.put(b, new ActiveFile(f, threads));
}
try {
@@ -1093,7 +1075,7 @@
throws IOException {
long size = 0;
synchronized (this) {
- FSVolume vol = volumeMap.get(b).getVolume();
+ FSVolume vol = getReplicaInfo(b).getVolume();
size = vol.getTmpFile(b).length();
}
if (size < dataOffset) {
@@ -1111,7 +1093,7 @@
synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {
if ( vol == null ) {
- vol = volumeMap.get( blk ).getVolume();
+ vol = getReplicaInfo( blk ).getVolume();
if ( vol == null ) {
throw new IOException("Could not find volume for block " + blk);
}
@@ -1131,40 +1113,43 @@
* Complete the block write!
*/
public synchronized void finalizeBlock(Block b) throws IOException {
- ActiveFile activeFile = ongoingCreates.get(b);
- if (activeFile == null) {
+ ReplicaInfo replicaInfo = getReplicaInfo(b);
+ if (replicaInfo.getState() == ReplicaState.FINALIZED) {
throw new IOException("Block " + b + " is already finalized.");
}
- File f = activeFile.file;
- if (f == null || !f.exists()) {
- throw new IOException("No temporary file " + f + " for block " + b);
- }
- FSVolume v = volumeMap.get(b).getVolume();
- if (v == null) {
- throw new IOException("No volume for temporary file " + f +
- " for block " + b);
+ ReplicaInfo newReplicaInfo = null;
+ if (replicaInfo.getState() == ReplicaState.RUR &&
+ ((ReplicaUnderRecovery)replicaInfo).getOrignalReplicaState() ==
+ ReplicaState.FINALIZED) {
+ newReplicaInfo = ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
+ } else {
+ FSVolume v = replicaInfo.getVolume();
+ File f = replicaInfo.getBlockFile();
+ if (v == null) {
+ throw new IOException("No volume for temporary file " + f +
+ " for block " + b);
+ }
+
+ File dest = v.addBlock(b, f);
+ newReplicaInfo = new FinalizedReplica(b, v, dest.getParentFile());
}
-
- File dest = null;
- dest = v.addBlock(b, f);
- volumeMap.put(b, new ReplicaInfo(v, dest));
- ongoingCreates.remove(b);
+ volumeMap.add(newReplicaInfo);
}
/**
* Remove the temporary block file (if any)
*/
public synchronized void unfinalizeBlock(Block b) throws IOException {
- // remove the block from in-memory data structure
- ActiveFile activefile = ongoingCreates.remove(b);
- if (activefile == null) {
- return;
- }
- volumeMap.remove(b);
-
- // delete the on-disk temp file
- if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
- DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+ ReplicaInfo replicaInfo = volumeMap.get(b);
+ if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
+ // remove from volumeMap
+ volumeMap.remove(b);
+
+ // delete the on-disk temp file
+ if (delBlockFromDisk(replicaInfo.getBlockFile(),
+ replicaInfo.getMetaFile(), b)) {
+ DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+ }
}
}
@@ -1200,8 +1185,8 @@
public Block[] getBlockReport() {
ArrayList<Block> list = new ArrayList<Block>(volumeMap.size());
synchronized(this) {
- for (Block b : volumeMap.keySet()) {
- if (!ongoingCreates.containsKey(b)) {
+ for (ReplicaInfo b : volumeMap.replicas()) {
+ if (b.getState() == ReplicaState.FINALIZED) {
list.add(new Block(b));
}
}
@@ -1216,7 +1201,7 @@
* is needed to handle concurrent modification to the block.
*/
synchronized Block[] getBlockList(boolean deepcopy) {
- Block[] list = volumeMap.keySet().toArray(new Block[volumeMap.size()]);
+ Block[] list = volumeMap.replicas().toArray(new Block[volumeMap.size()]);
if (deepcopy) {
for (int i = 0; i < list.length; i++) {
list[i] = new Block(list[i]);
@@ -1227,9 +1212,15 @@
/**
* Check whether the given block is a valid one.
+ * valid means finalized
*/
public boolean isValidBlock(Block b) {
- return validateBlockFile(b) != null;
+ ReplicaInfo replicaInfo = volumeMap.get(b);
+ if (replicaInfo == null ||
+ replicaInfo.getState() != ReplicaState.FINALIZED) {
+ return false;
+ }
+ return replicaInfo.getBlockFile().exists();
}
/**
@@ -1248,10 +1239,7 @@
/** {@inheritDoc} */
public void validateBlockMetadata(Block b) throws IOException {
- ReplicaInfo info = volumeMap.get(b);
- if (info == null) {
- throw new IOException("Block " + b + " does not exist in volumeMap.");
- }
+ ReplicaInfo info = getReplicaInfo(b);
FSVolume v = info.getVolume();
File tmp = v.getTmpFile(b);
File f = getFile(b);
@@ -1307,7 +1295,8 @@
synchronized (this) {
f = getFile(invalidBlks[i]);
ReplicaInfo dinfo = volumeMap.get(invalidBlks[i]);
- if (dinfo == null) {
+ if (dinfo == null ||
+ dinfo.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) {
DataNode.LOG.warn("Unexpected error trying to delete block "
+ invalidBlks[i] +
". BlockInfo not found in volumeMap.");
@@ -1366,16 +1355,24 @@
}
/**
- * Turn the block identifier into a filename.
+ * Turn the block identifier into a filename; ignore generation stamp!!!
*/
public synchronized File getFile(Block b) {
- ReplicaInfo info = volumeMap.get(b);
+ return getFile(b.getBlockId());
+ }
+
+ /**
+ * Turn the block identifier into a filename
+ * @param blockId a block's id
+ * @return on disk data file path; null if the replica does not exist
+ */
+ private File getFile(long blockId) {
+ ReplicaInfo info = volumeMap.get(blockId);
if (info != null) {
- return info.getFile();
+ return info.getBlockFile();
}
- return null;
+ return null;
}
-
/**
* check if a data directory is healthy
* @throws DiskErrorException
@@ -1459,11 +1456,12 @@
*/
public void checkAndUpdate(long blockId, File diskFile,
File diskMetaFile, FSVolume vol) {
- Block block = new Block(blockId);
DataNode datanode = DataNode.getDataNode();
Block corruptBlock = null;
+ ReplicaInfo memBlockInfo;
synchronized (this) {
- if (ongoingCreates.get(block) != null) {
+ memBlockInfo = volumeMap.get(blockId);
+ if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
// Block is not finalized - ignore the difference
return;
}
@@ -1472,7 +1470,6 @@
Block.getGenerationStamp(diskMetaFile.getName()) :
Block.GRANDFATHER_GENERATION_STAMP;
- ReplicaInfo memBlockInfo = volumeMap.get(block);
if (diskFile == null || !diskFile.exists()) {
if (memBlockInfo == null) {
// Block file does not exist and block does not exist in memory
@@ -1484,14 +1481,14 @@
}
return;
}
- if (!memBlockInfo.getFile().exists()) {
+ if (!memBlockInfo.getBlockFile().exists()) {
// Block is in memory and not on the disk
// Remove the block from volumeMap
- volumeMap.remove(block);
+ volumeMap.remove(blockId);
if (datanode.blockScanner != null) {
- datanode.blockScanner.deleteBlock(block);
+ datanode.blockScanner.deleteBlock(new Block(blockId));
}
- DataNode.LOG.warn("Removed block " + block.getBlockId()
+ DataNode.LOG.warn("Removed block " + blockId
+ " from memory with missing block file on the disk");
// Finally remove the metadata file
if (diskMetaFile != null && diskMetaFile.exists()
@@ -1507,23 +1504,20 @@
*/
if (memBlockInfo == null) {
// Block is missing in memory - add the block to volumeMap
- ReplicaInfo diskBlockInfo = new ReplicaInfo(vol, diskFile);
- Block diskBlock = new Block(diskFile, diskFile.length(), diskGS);
- volumeMap.put(diskBlock, diskBlockInfo);
+ ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId,
+ diskFile.length(), diskGS, vol, diskFile.getParentFile());
+ volumeMap.add(diskBlockInfo);
if (datanode.blockScanner != null) {
- datanode.blockScanner.addBlock(diskBlock);
+ datanode.blockScanner.addBlock(diskBlockInfo);
}
- DataNode.LOG.warn("Added missing block to memory " + diskBlock);
+ DataNode.LOG.warn("Added missing block to memory " + (Block)diskBlockInfo);
return;
}
/*
* Block exists in volumeMap and the block file exists on the disk
*/
- // Iterate to get key from volumeMap for the blockId
- Block memBlock = getBlockKey(blockId);
-
// Compare block files
- File memFile = memBlockInfo.getFile();
+ File memFile = memBlockInfo.getBlockFile();
if (memFile.exists()) {
if (memFile.compareTo(diskFile) != 0) {
DataNode.LOG.warn("Block file " + memFile.getAbsolutePath()
@@ -1540,19 +1534,17 @@
+ memFile.getAbsolutePath()
+ " does not exist. Updating it to the file found during scan "
+ diskFile.getAbsolutePath());
- ReplicaInfo info = volumeMap.remove(memBlock);
- info.setFile(diskFile);
+ memBlockInfo.setDir(diskFile.getParentFile());
memFile = diskFile;
DataNode.LOG.warn("Updating generation stamp for block " + blockId
- + " from " + memBlock.getGenerationStamp() + " to " + diskGS);
- memBlock.setGenerationStamp(diskGS);
- volumeMap.put(memBlock, info);
+ + " from " + memBlockInfo.getGenerationStamp() + " to " + diskGS);
+ memBlockInfo.setGenerationStamp(diskGS);
}
// Compare generation stamp
- if (memBlock.getGenerationStamp() != diskGS) {
- File memMetaFile = getMetaFile(diskFile, memBlock);
+ if (memBlockInfo.getGenerationStamp() != diskGS) {
+ File memMetaFile = getMetaFile(diskFile, memBlockInfo);
if (memMetaFile.exists()) {
if (memMetaFile.compareTo(diskMetaFile) != 0) {
DataNode.LOG.warn("Metadata file in memory "
@@ -1569,23 +1561,19 @@
: Block.GRANDFATHER_GENERATION_STAMP;
DataNode.LOG.warn("Updating generation stamp for block " + blockId
- + " from " + memBlock.getGenerationStamp() + " to " + gs);
+ + " from " + memBlockInfo.getGenerationStamp() + " to " + gs);
- ReplicaInfo info = volumeMap.remove(memBlock);
- memBlock.setGenerationStamp(gs);
- volumeMap.put(memBlock, info);
+ memBlockInfo.setGenerationStamp(gs);
}
}
// Compare block size
- if (memBlock.getNumBytes() != memFile.length()) {
+ if (memBlockInfo.getNumBytes() != memFile.length()) {
// Update the length based on the block file
- corruptBlock = new Block(memBlock);
+ corruptBlock = new Block(memBlockInfo);
DataNode.LOG.warn("Updating size of block " + blockId + " from "
- + memBlock.getNumBytes() + " to " + memFile.length());
- ReplicaInfo info = volumeMap.remove(memBlock);
- memBlock.setNumBytes(memFile.length());
- volumeMap.put(memBlock, info);
+ + memBlockInfo.getNumBytes() + " to " + memFile.length());
+ memBlockInfo.setNumBytes(memFile.length());
}
}
@@ -1605,18 +1593,14 @@
}
/**
- * Get reference to the key in the volumeMap. To be called from methods that
+ * Get reference to the replica meta info in the replicasMap.
+ * To be called from methods that
* are synchronized on {@link FSDataset}
* @param blockId
- * @return key from the volumeMap
+ * @return replica's meta information from the replicas map
*/
- Block getBlockKey(long blockId) {
+ ReplicaInfo getBlock(long blockId) {
assert(Thread.holdsLock(this));
- for (Block b : volumeMap.keySet()) {
- if (b.getBlockId() == blockId) {
- return b;
- }
- }
- return null;
+ return volumeMap.get(blockId);
}
}
Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java?rev=803601&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java Wed Aug 12 16:58:02 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/**
+ * This class describes a replica that has been finalized.
+ */
+class FinalizedReplica extends ReplicaInfo {
+ private boolean detached; // copy-on-write done for block
+
+ /**
+ * Constructor
+ * @param blockId block id
+ * @param len replica length
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ FinalizedReplica(long blockId, long len, long genStamp,
+ FSVolume vol, File dir) {
+ super(blockId, len, genStamp, vol, dir);
+ }
+
+ /**
+ * Constructor
+ * @param block a block
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ FinalizedReplica(Block block, FSVolume vol, File dir) {
+ super(block, vol, dir);
+ }
+
+ @Override // ReplicaInfo
+ ReplicaState getState() {
+ return ReplicaState.FINALIZED;
+ }
+
+ @Override // ReplicaInfo
+ boolean isDetached() {
+ return detached;
+ }
+
+ @Override // ReplicaInfo
+ void setDetached() {
+ detached = true;
+ }
+
+ @Override // ReplicaInfo
+ long getVisibleLen() throws IOException {
+ return getNumBytes(); // all bytes are visible
+ }
+
+ @Override // Object
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override // Object
+ public int hashCode() {
+ return super.hashCode();
+ }
+}
Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java?rev=803601&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java Wed Aug 12 16:58:02 2009
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/** This class represents replicas being written.
+ * Those are the replicas that
+ * are created in a pipeline initiated by a dfs client.
+ */
+class ReplicaBeingWritten extends ReplicaInPipeline {
+ /**
+ * Constructor for a zero length replica
+ * @param blockId block id
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ ReplicaBeingWritten(long blockId, long genStamp,
+ FSVolume vol, File dir) {
+ super( blockId, genStamp, vol, dir);
+ }
+
+ /**
+ * Constructor
+ * @param block a block
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ * @param threads a list of threads that are writing to this replica
+ */
+ ReplicaBeingWritten(Block block,
+ FSVolume vol, File dir, List<Thread> threads) {
+ super( block, vol, dir, threads);
+ }
+
+ /**
+ * Constructor
+ * @param blockId block id
+ * @param len replica length
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ * @param threads a list of threads that are writing to this replica
+ */
+ ReplicaBeingWritten(long blockId, long len, long genStamp,
+ FSVolume vol, File dir, List<Thread> threads ) {
+ super( blockId, len, genStamp, vol, dir, threads);
+ }
+
+ @Override //ReplicaInfo
+ long getVisibleLen() throws IOException {
+ return getBytesAcked(); // all acked bytes are visible
+ }
+
+ @Override //ReplicaInfo
+ ReplicaState getState() {
+ return ReplicaState.RBW;
+ }
+
+ @Override // Object
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override // Object
+ public int hashCode() {
+ return super.hashCode();
+ }
+}
Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=803601&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java Wed Aug 12 16:58:02 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/**
+ * This class defines a replica in a pipeline, which
+ * includes a persistent replica being written to by a dfs client or
+ * a temporary replica being replicated by a source datanode or
+ * being copied for the balancing purpose.
+ *
+ * The base class implements a temporary replica
+ */
+class ReplicaInPipeline extends ReplicaInfo {
+ private long bytesAcked;
+ private long bytesOnDisk;
+ private List<Thread> threads = new ArrayList<Thread>();
+
+ /**
+ * Constructor for a zero length replica
+ * @param blockId block id
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ * @param state replica state
+ */
+ ReplicaInPipeline(long blockId, long genStamp,
+ FSVolume vol, File dir) {
+ this( blockId, 0L, genStamp, vol, dir, null);
+ }
+
+ /**
+ * Constructor
+ * @param block a block
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ * @param threads a list of threads that are writing to this replica
+ */
+ ReplicaInPipeline(Block block,
+ FSVolume vol, File dir, List<Thread> threads) {
+ this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
+ vol, dir, threads);
+ }
+
+ /**
+ * Constructor
+ * @param blockId block id
+ * @param len replica length
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ * @param threads a list of threads that are writing to this replica
+ */
+ ReplicaInPipeline(long blockId, long len, long genStamp,
+ FSVolume vol, File dir, List<Thread> threads ) {
+ super( blockId, len, genStamp, vol, dir);
+ this.bytesAcked = len;
+ this.bytesOnDisk = len;
+ setThreads(threads);
+ this.threads.add(Thread.currentThread());
+ }
+
+ @Override //ReplicaInfo
+ long getVisibleLen() throws IOException {
+ // no bytes are visible
+ throw new IOException("No bytes are visible for temporary replicas");
+ }
+
+ @Override //ReplicaInfo
+ ReplicaState getState() {
+ return ReplicaState.TEMPORARY;
+ }
+
+ /**
+ * Get the number of bytes acked
+ * @return the number of bytes acked
+ */
+ long getBytesAcked() {
+ return bytesAcked;
+ }
+
+ /**
+ * Set the number bytes that have acked
+ * @param bytesAcked
+ */
+ void setBytesAcked(long bytesAcked) {
+ this.bytesAcked = bytesAcked;
+ }
+
+ /**
+ * Get the number of bytes that have written to disk
+ * @return the number of bytes that have written to disk
+ */
+ long getBytesOnDisk() {
+ return bytesOnDisk;
+ }
+
+ /**
+ * Set the number of bytes on disk
+ * @param bytesOnDisk number of bytes on disk
+ */
+ void setBytesOnDisk(long bytesOnDisk) {
+ this.bytesOnDisk = bytesOnDisk;
+ }
+
+ /**
+ * Set the threads that are writing to this replica
+ * @param threads a list of threads writing to this replica
+ */
+ public void setThreads(List<Thread> threads) {
+ if (threads != null) {
+ threads.addAll(threads);
+ }
+ }
+
+ /**
+ * Get a list of threads writing to this replica
+ * @return a list of threads writing to this replica
+ */
+ public List<Thread> getThreads() {
+ return threads;
+ }
+
+ @Override // Object
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override // Object
+ public int hashCode() {
+ return super.hashCode();
+ }
+}
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=803601&r1=803600&r2=803601&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Wed Aug 12 16:58:02 2009
@@ -22,61 +22,137 @@
import java.io.FileOutputStream;
import java.io.IOException;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FileUtil.HardLink;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
import org.apache.hadoop.io.IOUtils;
/**
- * This class is used by the datanode to maintain the map from a block
- * to its metadata.
+ * This class is used by datanodes to maintain meta data of its replicas.
+ * It provides a general interface for meta information of a replica.
*/
-class ReplicaInfo {
+abstract class ReplicaInfo extends Block {
+ private FSVolume volume; // volume where the replica belongs
+ private File dir; // directory where block & meta files belong
- private FSVolume volume; // volume where the block belongs
- private File file; // block file
- private boolean detached; // copy-on-write done for block
-
- ReplicaInfo(FSVolume vol, File file) {
- this.volume = vol;
- this.file = file;
- detached = false;
+ /**
+ * Constructor for a zero length replica
+ * @param blockId block id
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ ReplicaInfo(long blockId, long genStamp, FSVolume vol, File dir) {
+ this( blockId, 0L, genStamp, vol, dir);
}
-
- ReplicaInfo(FSVolume vol) {
+
+ /**
+ * Constructor
+ * @param block a block
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ ReplicaInfo(Block block, FSVolume vol, File dir) {
+ this(block.getBlockId(), block.getNumBytes(),
+ block.getGenerationStamp(), vol, dir);
+ }
+
+ /**
+ * Constructor
+ * @param blockId block id
+ * @param len replica length
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ ReplicaInfo(long blockId, long len, long genStamp,
+ FSVolume vol, File dir) {
+ super(blockId, len, genStamp);
this.volume = vol;
- this.file = null;
- detached = false;
+ this.dir = dir;
}
+ /**
+ * Get this replica's meta file name
+ * @return this replica's meta file name
+ */
+ private String getMetaFileName() {
+ return getBlockName() + "_" + getGenerationStamp() + METADATA_EXTENSION;
+ }
+
+ /**
+ * Get the full path of this replica's data file
+ * @return the full path of this replica's data file
+ */
+ File getBlockFile() {
+ return new File(getDir(), getBlockName());
+ }
+
+ /**
+ * Get the full path of this replica's meta file
+ * @return the full path of this replica's meta file
+ */
+ File getMetaFile() {
+ return new File(getDir(), getMetaFileName());
+ }
+
+ /**
+ * Get the volume where this replica is located on disk
+ * @return the volume where this replica is located on disk
+ */
FSVolume getVolume() {
return volume;
}
-
- File getFile() {
- return file;
+
+ /**
+ * Set the volume where this replica is located on disk
+ */
+ void setVolume(FSVolume vol) {
+ this.volume = vol;
+ }
+
+ /**
+ * Return the parent directory path where this replica is located
+ * @return the parent directory path where this replica is located
+ */
+ File getDir() {
+ return dir;
}
- void setFile(File f) {
- file = f;
+ /**
+ * Set the parent directory where this replica is located
+ * @param dir the parent directory where the replica is located
+ */
+ void setDir(File dir) {
+ this.dir = dir;
}
+
/**
- * Is this block already detached?
+ * Get the replica state
+ * @return the replica state
+ */
+ abstract ReplicaState getState();
+
+ /**
+ * check if this replica has already detached.
+ * @return true if the replica has already detached or no need to detach;
+ * false otherwise
*/
boolean isDetached() {
- return detached;
+ return true; // no need to be detached
}
/**
- * Block has been successfully detached
+ * set that this replica is detached
*/
void setDetached() {
- detached = true;
+ // no need to be detached
}
-
- /**
+
+ /**
* Copy specified file into a temporary file. Then rename the
* temporary file to the original name. This will cause any
* hardlinks to the original file to be removed. The temporary
@@ -84,7 +160,7 @@
* be recovered (especially on Windows) on datanode restart.
*/
private void detachFile(File file, Block b) throws IOException {
- File tmpFile = volume.createDetachFile(b, file.getName());
+ File tmpFile = getVolume().createDetachFile(b, file.getName());
try {
FileInputStream in = new FileInputStream(file);
try {
@@ -114,33 +190,60 @@
}
/**
- * Returns true if this block was copied, otherwise returns false.
+ * Remove a hard link by copying the block to a temporary place and
+ * then moving it back
+ * @param numLinks number of hard links
+ * @return true if copy is successful;
+ * false if it is already detached or no need to be detached
+ * @throws IOException if there is any copy error
*/
- boolean detachBlock(Block block, int numLinks) throws IOException {
+ boolean detachBlock(int numLinks) throws IOException {
if (isDetached()) {
return false;
}
- if (file == null || volume == null) {
- throw new IOException("detachBlock:Block not found. " + block);
+ File file = getBlockFile();
+ if (file == null || getVolume() == null) {
+ throw new IOException("detachBlock:Block not found. " + this);
}
- File meta = FSDataset.getMetaFile(file, block);
+ File meta = getMetaFile();
if (meta == null) {
- throw new IOException("Meta file not found for block " + block);
+ throw new IOException("Meta file not found for block " + this);
}
if (HardLink.getLinkCount(file) > numLinks) {
- DataNode.LOG.info("CopyOnWrite for block " + block);
- detachFile(file, block);
+ DataNode.LOG.info("CopyOnWrite for block " + this);
+ detachFile(file, this);
}
if (HardLink.getLinkCount(meta) > numLinks) {
- detachFile(meta, block);
+ detachFile(meta, this);
}
setDetached();
return true;
}
+
+ /**
+ * Get the number of bytes that are visible to readers
+ * @return the number of bytes that are visible to readers
+ */
+ abstract long getVisibleLen() throws IOException;
+
+ /**
+ * Set this replica's generation stamp to be a newer one
+ * @param newGS new generation stamp
+ * @throws IOException is the new generation stamp is not greater than the current one
+ */
+ void setNewerGenerationStamp(long newGS) throws IOException {
+ long curGS = getGenerationStamp();
+ if (newGS <= curGS) {
+ throw new IOException("New generation stamp (" + newGS
+ + ") must be greater than current one (" + curGS + ")");
+ }
+ setGenerationStamp(newGS);
+ }
+ @Override //Object
public String toString() {
- return getClass().getSimpleName() + "(volume=" + volume
- + ", file=" + file + ", detached=" + detached + ")";
+ return getClass().getSimpleName() + " " + super.toString() +
+ "(volume=" + volume + ", file=" + getBlockFile() + ")";
}
}
Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java?rev=803601&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java Wed Aug 12 16:58:02 2009
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/**
+ * This class represents replicas that are under block recovery
+ * It has a recovery id that is equal to the generation stamp
+ * that the replica will be bumped to after recovery
+ * The recovery id is used to handle multiple concurrent block recoveries.
+ * A recovery with higher recovery id preempts recoveries with a lower id.
+ *
+ */
+class ReplicaUnderRecovery extends ReplicaInfo {
+ private ReplicaInfo original; // the original replica that needs to be recovered
+ private long recoveryId; // recovery id; it is also the generation stamp
+ // that the replica will be bumped to after recovery
+
+ ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) {
+ super(replica.getBlockId(), replica.getNumBytes(), replica.getGenerationStamp(),
+ replica.getVolume(), replica.getDir());
+ if ( replica.getState() != ReplicaState.FINALIZED ||
+ replica.getState() != ReplicaState.RBW ||
+ replica.getState() != ReplicaState.RWR ) {
+ throw new IllegalArgumentException("Cannot recover replica: " + replica);
+ }
+ this.original = replica;
+ this.recoveryId = recoveryId;
+ }
+
+ /**
+ * Get the recovery id
+ * @return the generation stamp that the replica will be bumped to
+ */
+ long getRecoveryID() {
+ return recoveryId;
+ }
+
+ /**
+ * Set the recovery id
+ * @param recoveryId the new recoveryId
+ */
+ void setRecoveryID(long recoveryId) {
+ if (recoveryId > this.recoveryId) {
+ this.recoveryId = recoveryId;
+ } else {
+ throw new IllegalArgumentException("The new rcovery id: " + recoveryId
+ + " must be greater than the current one: " + this.recoveryId);
+ }
+ }
+
+ /**
+ * Get the original replica that's under recovery
+ * @return the original replica under recovery
+ */
+ ReplicaInfo getOriginalReplica() {
+ return original;
+ }
+
+ /**
+ * Get the original replica's state
+ * @return the original replica's state
+ */
+ ReplicaState getOrignalReplicaState() {
+ return original.getState();
+ }
+
+ @Override //ReplicaInfo
+ boolean isDetached() {
+ return original.isDetached();
+ }
+
+ @Override //ReplicaInfo
+ void setDetached() {
+ original.setDetached();
+ }
+
+ @Override //ReplicaInfo
+ ReplicaState getState() {
+ return ReplicaState.RUR;
+ }
+
+ @Override
+ long getVisibleLen() throws IOException {
+ return original.getVisibleLen();
+ }
+
+ @Override //org.apache.hadoop.hdfs.protocol.Block
+ public void setBlockId(long blockId) {
+ super.setBlockId(blockId);
+ original.setBlockId(blockId);
+ }
+
+ @Override //org.apache.hadoop.hdfs.protocol.Block
+ public void setGenerationStamp(long gs) {
+ super.setGenerationStamp(gs);
+ original.setGenerationStamp(gs);
+ }
+
+ @Override //org.apache.hadoop.hdfs.protocol.Block
+ public void setNumBytes(long numBytes) {
+ super.setNumBytes(numBytes);
+ original.setNumBytes(numBytes);
+ }
+
+ @Override //ReplicaInfo
+ void setDir(File dir) {
+ super.setDir(dir);
+ original.setDir(dir);
+ }
+
+ @Override //ReplicaInfo
+ void setVolume(FSVolume vol) {
+ super.setVolume(vol);
+ original.setVolume(vol);
+ }
+
+ @Override // Object
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override // Object
+ public int hashCode() {
+ return super.hashCode();
+ }
+}
Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java?rev=803601&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java Wed Aug 12 16:58:02 2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/**
+ * This class represents a replica that is waiting to be recovered.
+ * After a datanode restart, any replica in "rbw" directory is loaded
+ * as a replica waiting to be recovered.
+ * A replica waiting to be recovered does not provision read nor
+ * participates in any pipeline recovery. It will become outdated if its
+ * client continues to write or be recovered as a result of
+ * lease recovery.
+ */
+class ReplicaWaitingToBeRecovered extends ReplicaInfo {
+ private boolean detached; // copy-on-write done for block
+
+ /**
+ * Constructor
+ * @param blockId block id
+ * @param len replica length
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ ReplicaWaitingToBeRecovered(long blockId, long len, long genStamp,
+ FSVolume vol, File dir) {
+ super(blockId, len, genStamp, vol, dir);
+ }
+
+ /**
+ * Constructor
+ * @param block a block
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ ReplicaWaitingToBeRecovered(Block block, FSVolume vol, File dir) {
+ super(block, vol, dir);
+ }
+
+ @Override //ReplicaInfo
+ ReplicaState getState() {
+ return ReplicaState.RWR;
+ }
+
+ @Override //ReplicaInfo
+ boolean isDetached() {
+ return detached;
+ }
+
+ @Override //ReplicaInfo
+ void setDetached() {
+ detached = true;
+ }
+
+ @Override //ReplicaInfo
+ long getVisibleLen() throws IOException {
+ return -1; //no bytes are visible
+ }
+
+ @Override // Object
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override // Object
+ public int hashCode() {
+ return super.hashCode();
+ }
+}
Added: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java?rev=803601&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java (added)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java Wed Aug 12 16:58:02 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.Collection;
+import java.util.HashMap;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+class ReplicasMap {
+ // HashMap: maps a block id to the replica's meta info
+ private HashMap<Long, ReplicaInfo> map = new HashMap<Long, ReplicaInfo>();
+ /**
+ * Get the meta information of the replica that matches both block id
+ * and generation stamp
+ * @param block block with its id as the key
+ * @return the replica's meta information
+ * @throws IllegalArgumentException if the input block is null
+ */
+ ReplicaInfo get(Block block) {
+ if (block == null) {
+ throw new IllegalArgumentException("Do not expect null block");
+ }
+ ReplicaInfo replicaInfo = get(block.getBlockId());
+ if (replicaInfo != null &&
+ block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
+ return replicaInfo;
+ }
+ return null;
+ }
+
+ /**
+ * Get the meta information of the replica that matches the block id
+ * @param blockId a block's id
+ * @return the replica's meta information
+ */
+ ReplicaInfo get(long blockId) {
+ return map.get(blockId);
+ }
+
+ /**
+ * Add a replica's meta information into the map
+ *
+ * @param replicaInfo a replica's meta information
+ * @return previous meta information of the replica
+ * @throws IllegalArgumentException if the input parameter is null
+ */
+ ReplicaInfo add(ReplicaInfo replicaInfo) {
+ if (replicaInfo == null) {
+ throw new IllegalArgumentException("Do not expect null block");
+ }
+ return map.put(replicaInfo.getBlockId(), replicaInfo);
+ }
+
+ /**
+ * Remove the replica's meta information from the map that matches
+ * the input block's id and generation stamp
+ * @param block block with its id as the key
+ * @return the removed replica's meta information
+ * @throws IllegalArgumentException if the input block is null
+ */
+ ReplicaInfo remove(Block block) {
+ if (block == null) {
+ throw new IllegalArgumentException("Do not expect null block");
+ }
+ Long key = Long.valueOf(block.getBlockId());
+ ReplicaInfo replicaInfo = map.get(key);
+ if (replicaInfo != null &&
+ replicaInfo.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
+ return remove(key);
+ }
+
+ return null;
+ }
+
+ /**
+ * Remove the replica's meta information from the map if present
+ * @param the block id of the replica to be removed
+ * @return the removed replica's meta information
+ */
+ ReplicaInfo remove(long blockId) {
+ return map.remove(blockId);
+ }
+
+ /**
+ * Get the size of the map
+ * @return the number of replicas in the map
+ */
+ int size() {
+ return map.size();
+ }
+
+ /**
+ * Get a collection of the replicas
+ * @return a collection of the replicas
+ */
+ Collection<ReplicaInfo> replicas() {
+ return map.values();
+ }
+}
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=803601&r1=803600&r2=803601&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java Wed Aug 12 16:58:02 2009
@@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -103,17 +104,20 @@
}
DataNode.LOG.info("newblocksizes = " + Arrays.asList(newblocksizes));
+ DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
+ cluster.getNameNode().append(filestr, dfs.dfs.clientName);
+
//update blocks with random block sizes
+ long newGS = cluster.getNameNode().nextGenerationStamp(lastblock);
Block[] newblocks = new Block[REPLICATION_NUM];
for(int i = 0; i < REPLICATION_NUM; i++) {
newblocks[i] = new Block(lastblock.getBlockId(), newblocksizes[i],
- lastblock.getGenerationStamp());
+ newGS);
idps[i].updateBlock(lastblock, newblocks[i], false);
checkMetaInfo(newblocks[i], idps[i]);
}
-
- DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
- cluster.getNameNode().append(filestr, dfs.dfs.clientName);
+ cluster.getNameNode().commitBlockSynchronization(lastblock, newGS,
+ lastblocksize, false, false, new DatanodeID[]{});
//block synchronization
final int primarydatanodeindex = AppendTestUtil.nextInt(datanodes.length);
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=803601&r1=803600&r2=803601&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java Wed Aug 12 16:58:02 2009
@@ -22,7 +22,6 @@
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Random;
-import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -67,17 +66,16 @@
/** Truncate a block file */
private long truncateBlockFile() throws IOException {
synchronized (fds) {
- for (Entry<Block, ReplicaInfo> entry : fds.volumeMap.entrySet()) {
- Block b = entry.getKey();
- File f = entry.getValue().getFile();
- File mf = FSDataset.getMetaFile(f, b);
+ for (ReplicaInfo b : fds.volumeMap.replicas()) {
+ File f = b.getBlockFile();
+ File mf = b.getMetaFile();
// Truncate a block file that has a corresponding metadata file
if (f.exists() && f.length() != 0 && mf.exists()) {
FileOutputStream s = new FileOutputStream(f);
FileChannel channel = s.getChannel();
channel.truncate(0);
LOG.info("Truncated block file " + f.getAbsolutePath());
- return entry.getKey().getBlockId();
+ return b.getBlockId();
}
}
}
@@ -87,14 +85,13 @@
/** Delete a block file */
private long deleteBlockFile() {
synchronized(fds) {
- for (Entry<Block, ReplicaInfo> entry : fds.volumeMap.entrySet()) {
- Block b = entry.getKey();
- File f = entry.getValue().getFile();
- File mf = FSDataset.getMetaFile(f, b);
+ for (ReplicaInfo b : fds.volumeMap.replicas()) {
+ File f = b.getBlockFile();
+ File mf = b.getMetaFile();
// Delete a block file that has corresponding metadata file
if (f.exists() && mf.exists() && f.delete()) {
LOG.info("Deleting block file " + f.getAbsolutePath());
- return entry.getKey().getBlockId();
+ return b.getBlockId();
}
}
}
@@ -104,16 +101,12 @@
/** Delete block meta file */
private long deleteMetaFile() {
synchronized(fds) {
- for (Entry<Block, ReplicaInfo> entry : fds.volumeMap.entrySet()) {
- Block b = entry.getKey();
- String blkfile = entry.getValue().getFile().getAbsolutePath();
- long genStamp = b.getGenerationStamp();
- String metafile = FSDataset.getMetaFileName(blkfile, genStamp);
- File file = new File(metafile);
+ for (ReplicaInfo b : fds.volumeMap.replicas()) {
+ File file = b.getMetaFile();
// Delete a metadata file
if (file.exists() && file.delete()) {
LOG.info("Deleting metadata file " + file.getAbsolutePath());
- return entry.getKey().getBlockId();
+ return b.getBlockId();
}
}
}
@@ -324,7 +317,7 @@
}
private void verifyAddition(long blockId, long genStamp, long size) {
- Block memBlock = fds.getBlockKey(blockId);
+ Block memBlock = fds.getBlock(blockId);
assertNotNull(memBlock);
ReplicaInfo blockInfo;
synchronized(fds) {
@@ -334,7 +327,7 @@
// Added block has the same file as the one created by the test
File file = new File(getBlockFile(blockId));
- assertEquals(file.getName(), blockInfo.getFile().getName());
+ assertEquals(file.getName(), blockInfo.getBlockFile().getName());
// Generation stamp is same as that of created file
assertEquals(genStamp, memBlock.getGenerationStamp());
@@ -353,7 +346,7 @@
private void verifyGenStamp(long blockId, long genStamp) {
Block memBlock;
synchronized(fds) {
- memBlock = fds.getBlockKey(blockId);
+ memBlock = fds.getBlock(blockId);
}
assertNotNull(memBlock);
assertEquals(genStamp, memBlock.getGenerationStamp());