You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/10/01 01:39:33 UTC
svn commit: r820497 [3/7] - in /hadoop/hdfs/trunk: ./
.eclipse.templates/.launches/ src/contrib/hdfsproxy/
src/docs/src/documentation/content/xdocs/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apach...
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Sep 30 23:39:30 2009
@@ -17,8 +17,11 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -26,10 +29,8 @@
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import javax.management.NotCompliantMBeanException;
@@ -41,17 +42,23 @@
import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.io.IOUtils;
/**************************************************
* FSDataset manages a set of data blocks. Each block
@@ -119,8 +126,9 @@
if ( ! metaData.renameTo( newmeta ) ||
! src.renameTo( dest ) ) {
throw new IOException( "could not move files for " + b +
- " from tmp to " +
- dest.getAbsolutePath() );
+ " from " + src + " to " +
+ dest.getAbsolutePath() + " or from"
+ + metaData + " to " + newmeta);
}
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("addBlock: Moved " + metaData + " to " + newmeta);
@@ -185,23 +193,48 @@
return Block.GRANDFATHER_GENERATION_STAMP;
}
- void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap, FSVolume volume) {
+ void getVolumeMap(ReplicasMap volumeMap, FSVolume volume)
+ throws IOException {
if (children != null) {
for (int i = 0; i < children.length; i++) {
children[i].getVolumeMap(volumeMap, volume);
}
}
- File blockFiles[] = dir.listFiles();
- for (int i = 0; i < blockFiles.length; i++) {
- if (Block.isBlockFilename(blockFiles[i])) {
- long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
- volumeMap.put(new Block(blockFiles[i], blockFiles[i].length(), genStamp),
- new ReplicaInfo(volume, blockFiles[i]));
+ recoverTempUnlinkedBlock();
+ volume.addToReplicasMap(volumeMap, dir, true);
+ }
+
+ /**
+ * Recover unlinked tmp files on datanode restart. If the original block
+ * does not exist, then the tmp file is renamed to be the
+ * original file name; otherwise the tmp file is deleted.
+ */
+ private void recoverTempUnlinkedBlock() throws IOException {
+ File files[] = dir.listFiles();
+ for (File file : files) {
+ if (!FSDataset.isUnlinkTmpFile(file)) {
+ continue;
+ }
+ File blockFile = getOrigFile(file);
+ if (blockFile.exists()) {
+ //
+ // If the original block file still exists, then no recovery
+ // is needed.
+ //
+ if (!file.delete()) {
+ throw new IOException("Unable to cleanup unlinked tmp file " +
+ file);
+ }
+ } else {
+ if (!file.renameTo(blockFile)) {
+ throw new IOException("Unable to cleanup detached file " +
+ file);
+ }
}
}
}
-
+
/**
* check if a data diretory is healthy
* @throws DiskErrorException
@@ -279,9 +312,9 @@
}
class FSVolume {
- private FSDir dataDir;
- private File tmpDir;
- private File detachDir; // copy on write for blocks in snapshot
+ private FSDir dataDir; // directory store Finalized replica
+ private File rbwDir; // directory store RBW replica
+ private File tmpDir; // directory store Temporary replica
private DF usage;
private DU dfsUsage;
private long reserved;
@@ -289,13 +322,9 @@
FSVolume(File currentDir, Configuration conf) throws IOException {
this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
- boolean supportAppends = conf.getBoolean("dfs.support.append", false);
File parent = currentDir.getParentFile();
-
- this.detachDir = new File(parent, "detach");
- if (detachDir.exists()) {
- recoverDetachedBlocks(currentDir, detachDir);
- }
+ final File finalizedDir = new File(
+ currentDir, DataStorage.STORAGE_DIR_FINALIZED);
// Files that were being written when the datanode was last shutdown
// are now moved back to the data directory. It is possible that
@@ -304,23 +333,23 @@
//
this.tmpDir = new File(parent, "tmp");
if (tmpDir.exists()) {
- if (supportAppends) {
- recoverDetachedBlocks(currentDir, tmpDir);
- } else {
- FileUtil.fullyDelete(tmpDir);
+ FileUtil.fullyDelete(tmpDir);
+ }
+ this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
+ if (rbwDir.exists() && !supportAppends) {
+ FileUtil.fullyDelete(rbwDir);
+ }
+ this.dataDir = new FSDir(finalizedDir);
+ if (!rbwDir.mkdirs()) { // create rbw directory if not exist
+ if (!rbwDir.isDirectory()) {
+ throw new IOException("Mkdirs failed to create " + rbwDir.toString());
}
}
- this.dataDir = new FSDir(currentDir);
if (!tmpDir.mkdirs()) {
if (!tmpDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + tmpDir.toString());
}
}
- if (!detachDir.mkdirs()) {
- if (!detachDir.isDirectory()) {
- throw new IOException("Mkdirs failed to create " + detachDir.toString());
- }
- }
this.usage = new DF(parent, conf);
this.dfsUsage = new DU(parent, conf);
this.dfsUsage.start();
@@ -360,51 +389,23 @@
}
/**
- * Temporary files. They get moved to the real block directory either when
- * the block is finalized or the datanode restarts.
+ * Temporary files. They get moved to the finalized block directory when
+ * the block is finalized.
*/
File createTmpFile(Block b) throws IOException {
File f = new File(tmpDir, b.getBlockName());
- return createTmpFile(b, f);
- }
-
- /**
- * Returns the name of the temporary file for this block.
- */
- File getTmpFile(Block b) throws IOException {
- File f = new File(tmpDir, b.getBlockName());
- return f;
+ return FSDataset.createTmpFile(b, f);
}
/**
- * Files used for copy-on-write. They need recovery when datanode
- * restarts.
+ * RBW files. They get moved to the finalized block directory when
+ * the block is finalized.
*/
- File createDetachFile(Block b, String filename) throws IOException {
- File f = new File(detachDir, filename);
- return createTmpFile(b, f);
+ File createRbwFile(Block b) throws IOException {
+ File f = new File(rbwDir, b.getBlockName());
+ return FSDataset.createTmpFile(b, f);
}
- private File createTmpFile(Block b, File f) throws IOException {
- if (f.exists()) {
- throw new IOException("Unexpected problem in creating temporary file for "+
- b + ". File " + f + " should not be present, but is.");
- }
- // Create the zero-length temp file
- //
- boolean fileCreated = false;
- try {
- fileCreated = f.createNewFile();
- } catch (IOException ioe) {
- throw (IOException)new IOException(DISK_ERROR +f).initCause(ioe);
- }
- if (!fileCreated) {
- throw new IOException("Unexpected problem in creating temporary file for "+
- b + ". File " + f + " should be creatable, but is already present.");
- }
- return f;
- }
-
File addBlock(Block b, File f) throws IOException {
File blockFile = dataDir.addBlock(b, f);
File metaFile = getMetaFile( blockFile , b);
@@ -415,55 +416,126 @@
void checkDirs() throws DiskErrorException {
dataDir.checkDirTree();
DiskChecker.checkDir(tmpDir);
+ DiskChecker.checkDir(rbwDir);
}
- void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
+ void getVolumeMap(ReplicasMap volumeMap) throws IOException {
+ // add finalized replicas
dataDir.getVolumeMap(volumeMap, this);
- }
-
- void clearPath(File f) {
- dataDir.clearPath(f);
- }
-
- public String toString() {
- return dataDir.dir.getAbsolutePath();
+ // add rbw replicas
+ addToReplicasMap(volumeMap, rbwDir, false);
}
/**
- * Recover detached files on datanode restart. If a detached block
- * does not exist in the original directory, then it is moved to the
- * original directory.
+ * Add replicas under the given directory to the volume map
+ * @param volumeMap the replicas map
+ * @param dir an input directory
+ * @param isFinalized true if the directory has finalized replicas;
+ * false if the directory has rbw replicas
*/
- private void recoverDetachedBlocks(File dataDir, File dir)
- throws IOException {
- File contents[] = dir.listFiles();
- if (contents == null) {
- return;
- }
- for (int i = 0; i < contents.length; i++) {
- if (!contents[i].isFile()) {
- throw new IOException ("Found " + contents[i] + " in " + dir +
- " but it is not a file.");
+ private void addToReplicasMap(ReplicasMap volumeMap,
+ File dir, boolean isFinalized) {
+ File blockFiles[] = dir.listFiles();
+ for (File blockFile : blockFiles) {
+ if (!Block.isBlockFilename(blockFile))
+ continue;
+
+ long genStamp = getGenerationStampFromFile(blockFiles, blockFile);
+ long blockId = Block.filename2id(blockFile.getName());
+ ReplicaInfo newReplica = null;
+ if (isFinalized) {
+ newReplica = new FinalizedReplica(blockId,
+ blockFile.length(), genStamp, this, blockFile.getParentFile());
+ } else {
+ newReplica = new ReplicaWaitingToBeRecovered(blockId,
+ validateIntegrity(blockFile, genStamp),
+ genStamp, this, blockFile.getParentFile());
}
- //
- // If the original block file still exists, then no recovery
- // is needed.
- //
- File blk = new File(dataDir, contents[i].getName());
- if (!blk.exists()) {
- if (!contents[i].renameTo(blk)) {
- throw new IOException("Unable to recover detached file " +
- contents[i]);
- }
- continue;
+ ReplicaInfo oldReplica = volumeMap.add(newReplica);
+ if (oldReplica != null) {
+ DataNode.LOG.warn("Two block files with the same block id exist " +
+ "on disk: " + oldReplica.getBlockFile() +
+ " and " + blockFile );
}
- if (!contents[i].delete()) {
- throw new IOException("Unable to cleanup detached file " +
- contents[i]);
+ }
+ }
+
+ /**
+ * Find out the number of bytes in the block that match its crc.
+ *
+ * This algorithm assumes that data corruption caused by unexpected
+ * datanode shutdown occurs only in the last crc chunk. So it checks
+ * only the last chunk.
+ *
+ * @param blockFile the block file
+ * @param genStamp generation stamp of the block
+ * @return the number of valid bytes
+ */
+ private long validateIntegrity(File blockFile, long genStamp) {
+ DataInputStream checksumIn = null;
+ InputStream blockIn = null;
+ try {
+ File metaFile = new File(getMetaFileName(blockFile.toString(), genStamp));
+ long blockFileLen = blockFile.length();
+ long metaFileLen = metaFile.length();
+ int crcHeaderLen = DataChecksum.getChecksumHeaderSize();
+ if (!blockFile.exists() || blockFileLen == 0 ||
+ !metaFile.exists() || metaFileLen < (long)crcHeaderLen) {
+ return 0;
+ }
+ checksumIn = new DataInputStream(
+ new BufferedInputStream(new FileInputStream(metaFile),
+ BUFFER_SIZE));
+
+ // read and handle the common header here. For now just a version
+ BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+ short version = header.getVersion();
+ if (version != FSDataset.METADATA_VERSION) {
+ DataNode.LOG.warn("Wrong version (" + version + ") for metadata file "
+ + metaFile + " ignoring ...");
+ }
+ DataChecksum checksum = header.getChecksum();
+ int bytesPerChecksum = checksum.getBytesPerChecksum();
+ int checksumSize = checksum.getChecksumSize();
+ long numChunks = Math.min(
+ (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum,
+ (metaFileLen - crcHeaderLen)/checksumSize);
+ if (numChunks == 0) {
+ return 0;
+ }
+ IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize);
+ blockIn = new FileInputStream(blockFile);
+ long lastChunkStartPos = (numChunks-1)*bytesPerChecksum;
+ IOUtils.skipFully(blockIn, lastChunkStartPos);
+ int lastChunkSize = (int)Math.min(
+ bytesPerChecksum, blockFileLen-lastChunkStartPos);
+ byte[] buf = new byte[lastChunkSize+checksumSize];
+ checksumIn.readFully(buf, lastChunkSize, checksumSize);
+ IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
+
+ checksum.update(buf, 0, lastChunkSize);
+ if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
+ return lastChunkStartPos + lastChunkSize;
+ } else { // last chunck is corrupt
+ return lastChunkStartPos;
}
+ } catch (IOException e) {
+ DataNode.LOG.warn(e);
+ return 0;
+ } finally {
+ IOUtils.closeStream(checksumIn);
+ IOUtils.closeStream(blockIn);
}
}
+
+ void clearPath(File f) {
+ dataDir.clearPath(f);
+ }
+
+ public String toString() {
+ return getDir().getAbsolutePath();
+ }
}
static class FSVolumeSet {
@@ -526,7 +598,7 @@
return remaining;
}
- synchronized void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
+ synchronized void getVolumeMap(ReplicasMap volumeMap) throws IOException {
for (int idx = 0; idx < volumes.length; idx++) {
volumes[idx].getVolumeMap(volumeMap);
}
@@ -603,25 +675,22 @@
//Find better place?
public static final String METADATA_EXTENSION = ".meta";
public static final short METADATA_VERSION = 1;
-
+ static final String UNLINK_BLOCK_SUFFIX = ".unlinked";
- static class ActiveFile {
- final File file;
- final List<Thread> threads = new ArrayList<Thread>(2);
-
- ActiveFile(File f, List<Thread> list) {
- file = f;
- if (list != null) {
- threads.addAll(list);
- }
- threads.add(Thread.currentThread());
- }
-
- public String toString() {
- return getClass().getSimpleName() + "(file=" + file
- + ", threads=" + threads + ")";
- }
- }
+ private static boolean isUnlinkTmpFile(File f) {
+ String name = f.getName();
+ return name.endsWith(UNLINK_BLOCK_SUFFIX);
+ }
+
+ static File getUnlinkTmpFile(File f) {
+ return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
+ }
+
+ private static File getOrigFile(File unlinkTmpFile) {
+ String fileName = unlinkTmpFile.getName();
+ return new File(unlinkTmpFile.getParentFile(),
+ fileName.substring(0, fileName.length()-UNLINK_BLOCK_SUFFIX.length()));
+ }
static String getMetaFileName(String blockFileName, long genStamp) {
return blockFileName + "_" + genStamp + METADATA_EXTENSION;
@@ -635,6 +704,26 @@
return getMetaFile(getBlockFile(b), b);
}
+ /** Find the metadata file for the specified block file.
+ * Return the generation stamp from the name of the metafile.
+ */
+ private static long getGenerationStampFromFile(File[] listdir, File blockFile) {
+ String blockName = blockFile.getName();
+ for (int j = 0; j < listdir.length; j++) {
+ String path = listdir[j].getName();
+ if (!path.startsWith(blockName)) {
+ continue;
+ }
+ if (blockFile == listdir[j]) {
+ continue;
+ }
+ return Block.getGenerationStamp(listdir[j].getName());
+ }
+ DataNode.LOG.warn("Block " + blockFile +
+ " does not have a metafile!");
+ return Block.GRANDFATHER_GENERATION_STAMP;
+ }
+
/** Find the corresponding meta data file from a given block file */
private static File findMetaFile(final File blockFile) throws IOException {
final String prefix = blockFile.getName() + "_";
@@ -672,22 +761,7 @@
/** Return the block file for the given ID */
public File findBlockFile(long blockId) {
- final Block b = new Block(blockId);
- File blockfile = null;
- ActiveFile activefile = ongoingCreates.get(b);
- if (activefile != null) {
- blockfile = activefile.file;
- }
- if (blockfile == null) {
- blockfile = getFile(b);
- }
- if (blockfile == null) {
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("ongoingCreates=" + ongoingCreates);
- DataNode.LOG.debug("volumeMap=" + volumeMap);
- }
- }
- return blockfile;
+ return getFile(blockId);
}
/** {@inheritDoc} */
@@ -717,26 +791,47 @@
checksumFile.length());
}
+ static File createTmpFile(Block b, File f) throws IOException {
+ if (f.exists()) {
+ throw new IOException("Unexpected problem in creating temporary file for "+
+ b + ". File " + f + " should not be present, but is.");
+ }
+ // Create the zero-length temp file
+ //
+ boolean fileCreated = false;
+ try {
+ fileCreated = f.createNewFile();
+ } catch (IOException ioe) {
+ throw (IOException)new IOException(DISK_ERROR +f).initCause(ioe);
+ }
+ if (!fileCreated) {
+ throw new IOException("Unexpected problem in creating temporary file for "+
+ b + ". File " + f + " should be creatable, but is already present.");
+ }
+ return f;
+ }
+
FSVolumeSet volumes;
- private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
private int maxBlocksPerDir = 0;
- HashMap<Block,ReplicaInfo> volumeMap = null;
+ ReplicasMap volumeMap = new ReplicasMap();
static Random random = new Random();
// Used for synchronizing access to usage stats
private Object statsLock = new Object();
+ boolean supportAppends = false;
+
/**
* An FSDataset has a directory where it loads its data files.
*/
public FSDataset(DataStorage storage, Configuration conf) throws IOException {
this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
+ this.supportAppends = conf.getBoolean("dfs.support.append", false);
FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
}
volumes = new FSVolumeSet(volArray);
- volumeMap = new HashMap<Block, ReplicaInfo>();
volumes.getVolumeMap(volumeMap);
registerMBean(storage.getStorageID());
}
@@ -811,22 +906,33 @@
}
/**
- * Returns handles to the block file and its metadata file
+ * Get the meta info of a block stored in volumeMap
+ * @param b block
+ * @return the meta replica information
+ * @throws IOException if no entry is in the map or
+ * there is a generation stamp mismatch
*/
- public synchronized BlockInputStreams getTmpInputStreams(Block b,
- long blkOffset, long ckoff) throws IOException {
-
+ private ReplicaInfo getReplicaInfo(Block b) throws IOException {
ReplicaInfo info = volumeMap.get(b);
if (info == null) {
throw new IOException("Block " + b + " does not exist in volumeMap.");
}
- FSVolume v = info.getVolume();
- File blockFile = v.getTmpFile(b);
+ return info;
+ }
+
+ /**
+ * Returns handles to the block file and its metadata file
+ */
+ public synchronized BlockInputStreams getTmpInputStreams(Block b,
+ long blkOffset, long ckoff) throws IOException {
+
+ ReplicaInfo info = getReplicaInfo(b);
+ File blockFile = info.getBlockFile();
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
if (blkOffset > 0) {
blockInFile.seek(blkOffset);
}
- File metaFile = getMetaFile(blockFile, b);
+ File metaFile = info.getMetaFile();
RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r");
if (ckoff > 0) {
metaInFile.seek(ckoff);
@@ -835,36 +941,23 @@
new FileInputStream(metaInFile.getFD()));
}
- private BlockWriteStreams createBlockWriteStreams( File f , File metafile) throws IOException {
- return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( f , "rw" ).getFD()),
- new FileOutputStream( new RandomAccessFile( metafile , "rw" ).getFD() ));
-
- }
-
/**
* Make a copy of the block if this block is linked to an existing
* snapshot. This ensures that modifying this block does not modify
* data in any existing snapshots.
* @param block Block
- * @param numLinks Detach if the number of links exceed this value
+ * @param numLinks Unlink if the number of links exceed this value
* @throws IOException
- * @return - true if the specified block was detached
+ * @return - true if the specified block was unlinked or the block
+ * is not in any snapshot.
*/
- public boolean detachBlock(Block block, int numLinks) throws IOException {
+ public boolean unlinkBlock(Block block, int numLinks) throws IOException {
ReplicaInfo info = null;
synchronized (this) {
- info = volumeMap.get(block);
- }
- return info.detachBlock(block, numLinks);
- }
-
- static private <T> void updateBlockMap(Map<Block, T> blockmap,
- Block oldblock, Block newblock) throws IOException {
- if (blockmap.containsKey(oldblock)) {
- T value = blockmap.remove(oldblock);
- blockmap.put(newblock, value);
+ info = getReplicaInfo(block);
}
+ return info.unlinkBlock(numLinks);
}
/** {@inheritDoc} */
@@ -874,60 +967,20 @@
+ ") to newblock (=" + newblock + ").");
}
- for(;;) {
- final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
- if (threads == null) {
- return;
- }
-
- // interrupt and wait for all ongoing create threads
- for(Thread t : threads) {
- t.interrupt();
- }
- for(Thread t : threads) {
- try {
- t.join();
- } catch (InterruptedException e) {
- DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
- }
- }
+ final ReplicaInfo replicaInfo = volumeMap.get(oldblock.getBlockId());
+ File blockFile = replicaInfo==null?null:replicaInfo.getBlockFile();
+ if (blockFile == null) {
+ throw new IOException("Block " + oldblock + " does not exist.");
}
- }
-
- /**
- * Try to update an old block to a new block.
- * If there are ongoing create threads running for the old block,
- * the threads will be returned without updating the block.
- *
- * @return ongoing create threads if there is any. Otherwise, return null.
- */
- private synchronized List<Thread> tryUpdateBlock(
- Block oldblock, Block newblock) throws IOException {
- //check ongoing create threads
- final ActiveFile activefile = ongoingCreates.get(oldblock);
- if (activefile != null && !activefile.threads.isEmpty()) {
- //remove dead threads
- for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
- final Thread t = i.next();
- if (!t.isAlive()) {
- i.remove();
- }
- }
- //return living threads
- if (!activefile.threads.isEmpty()) {
- return new ArrayList<Thread>(activefile.threads);
- }
+ //check write threads
+ if (replicaInfo instanceof ReplicaInPipeline) {
+ ((ReplicaInPipeline)replicaInfo).stopWriter();
}
//No ongoing create threads is alive. Update block.
- File blockFile = findBlockFile(oldblock.getBlockId());
- if (blockFile == null) {
- throw new IOException("Block " + oldblock + " does not exist.");
- }
-
- File oldMetaFile = findMetaFile(blockFile);
- long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
+ File oldMetaFile = replicaInfo.getMetaFile();
+ long oldgs = replicaInfo.getGenerationStamp();
//rename meta file to a tmp file
File tmpMetaFile = new File(oldMetaFile.getParent(),
@@ -937,7 +990,7 @@
}
//update generation stamp
- if (oldgs > newblock.getGenerationStamp()) {
+ if (oldgs >= newblock.getGenerationStamp()) {
throw new IOException("Cannot update block (id=" + newblock.getBlockId()
+ ") generation stamp from " + oldgs
+ " to " + newblock.getGenerationStamp());
@@ -952,23 +1005,28 @@
truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes());
}
+ // update replicaInfo
+ replicaInfo.setGenerationStamp(newblock.getGenerationStamp());
+ replicaInfo.setNumBytes(newblock.getNumBytes());
+
//rename the tmp file to the new meta file (with new generation stamp)
- File newMetaFile = getMetaFile(blockFile, newblock);
+ File newMetaFile = replicaInfo.getMetaFile();
if (!tmpMetaFile.renameTo(newMetaFile)) {
throw new IOException("Cannot rename tmp meta file to " + newMetaFile);
}
- updateBlockMap(ongoingCreates, oldblock, newblock);
- updateBlockMap(volumeMap, oldblock, newblock);
-
// paranoia! verify that the contents of the stored block
// matches the block file on disk.
validateBlockMetadata(newblock);
- return null;
}
static private void truncateBlock(File blockFile, File metaFile,
long oldlen, long newlen) throws IOException {
+ DataNode.LOG.info("truncateBlock: blockFile=" + blockFile
+ + ", metaFile=" + metaFile
+ + ", oldlen=" + oldlen
+ + ", newlen=" + newlen);
+
if (newlen == oldlen) {
return;
}
@@ -1027,125 +1085,290 @@
}
}
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface append(Block b,
+ long newGS, long expectedBlockLen) throws IOException {
+ // If the block was successfully finalized because all packets
+ // were successfully processed at the Datanode but the ack for
+ // some of the packets were not received by the client. The client
+ // re-opens the connection and retries sending those packets.
+ // The other reason is that an "append" is occurring to this block.
+
+ // check the validity of the parameter
+ if (newGS < b.getGenerationStamp()) {
+ throw new IOException("The new generation stamp " + newGS +
+ " should be greater than the replica " + b + "'s generation stamp");
+ }
+ ReplicaInfo replicaInfo = volumeMap.get(b);
+ if (replicaInfo == null) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+ }
+ DataNode.LOG.info("Appending to replica " + replicaInfo);
+ if (replicaInfo.getState() != ReplicaState.FINALIZED) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
+ }
+ if (replicaInfo.getNumBytes() != expectedBlockLen) {
+ throw new IOException("Corrupted replica " + replicaInfo +
+ " with a length of " + replicaInfo.getNumBytes() +
+ " expected length is " + expectedBlockLen);
+ }
+
+ return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+ }
+
+ /** Append to a finalized replica
+ * Change a finalized replica to be a RBW replica and
+ * bump its generation stamp to be the newGS
+ *
+ * @param replicaInfo a finalized replica
+ * @param newGS new generation stamp
+ * @param estimateBlockLen estimate generation stamp
+ * @return a RBW replica
+ * @throws IOException if moving the replica from finalized directory
+ * to rbw directory fails
+ */
+ private synchronized ReplicaBeingWritten append(FinalizedReplica replicaInfo,
+ long newGS, long estimateBlockLen) throws IOException {
+ // unlink the finalized replica
+ replicaInfo.unlinkBlock(1);
+
+ // construct a RBW replica with the new GS
+ File blkfile = replicaInfo.getBlockFile();
+ FSVolume v = volumes.getNextVolume(estimateBlockLen);
+ File newBlkFile = v.createRbwFile(replicaInfo);
+ File oldmeta = replicaInfo.getMetaFile();
+ ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
+ replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
+ v, newBlkFile.getParentFile(), Thread.currentThread());
+ File newmeta = newReplicaInfo.getMetaFile();
+
+ // rename meta file to rbw directory
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+ }
+ if (!oldmeta.renameTo(newmeta)) {
+ throw new IOException("Block " + replicaInfo + " reopen failed. " +
+ " Unable to move meta file " + oldmeta +
+ " to rbw dir " + newmeta);
+ }
+
+ // rename block file to rbw directory
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("Renaming " + blkfile + " to " + newBlkFile);
+ DataNode.LOG.debug("Old block file length is " + blkfile.length());
+ }
+ if (!blkfile.renameTo(newBlkFile)) {
+ if (!newmeta.renameTo(oldmeta)) { // restore the meta file
+ DataNode.LOG.warn("Cannot move meta file " + newmeta +
+ "back to the finalized directory " + oldmeta);
+ }
+ throw new IOException("Block " + replicaInfo + " reopen failed. " +
+ " Unable to move block file " + blkfile +
+ " to rbw dir " + newBlkFile);
+ }
+
+ // Replace finalized replica by a RBW replica in replicas map
+ volumeMap.add(newReplicaInfo);
+
+ return newReplicaInfo;
+ }
+
+ private ReplicaInfo recoverCheck(Block b, long newGS,
+ long expectedBlockLen) throws IOException {
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+ if (replicaInfo == null) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+ }
+
+ // check state
+ if (replicaInfo.getState() != ReplicaState.FINALIZED &&
+ replicaInfo.getState() != ReplicaState.RBW) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
+ }
+
+ // check generation stamp
+ long replicaGenerationStamp = replicaInfo.getGenerationStamp();
+ if (replicaGenerationStamp < b.getGenerationStamp() ||
+ replicaGenerationStamp > newGS) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + replicaGenerationStamp
+ + ". Expected GS range is [" + b.getGenerationStamp() + ", " +
+ newGS + "].");
+ }
+
+ // stop the previous writer before check a replica's length
+ long replicaLen = replicaInfo.getNumBytes();
+ if (replicaInfo.getState() == ReplicaState.RBW) {
+ ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
+ // kill the previous writer
+ rbw.stopWriter();
+ rbw.setWriter(Thread.currentThread());
+ // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
+ if (replicaLen != rbw.getBytesOnDisk()
+ || replicaLen != rbw.getBytesAcked()) {
+ throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo +
+ "bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" +
+ rbw.getBytesOnDisk() + "), and bytesAcked(" + rbw.getBytesAcked() +
+ ") are not the same.");
+ }
+ }
+
+ // check block length
+ if (replicaLen != expectedBlockLen) {
+ throw new IOException("Corrupted replica " + replicaInfo +
+ " with a length of " + replicaLen +
+ " expected length is " + expectedBlockLen);
+ }
+
+ return replicaInfo;
+ }
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+ long newGS, long expectedBlockLen) throws IOException {
+ DataNode.LOG.info("Recover failed append to " + b);
+
+ ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
+
+ // change the replica's state/gs etc.
+ if (replicaInfo.getState() == ReplicaState.FINALIZED ) {
+ return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+ } else { //RBW
+ bumpReplicaGS(replicaInfo, newGS);
+ return (ReplicaBeingWritten)replicaInfo;
+ }
+ }
+
+ @Override
+ public void recoverClose(Block b, long newGS,
+ long expectedBlockLen) throws IOException {
+ DataNode.LOG.info("Recover failed close " + b);
+ // check replica's state
+ ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
+ // bump the replica's GS
+ bumpReplicaGS(replicaInfo, newGS);
+ // finalize the replica if RBW
+ if (replicaInfo.getState() == ReplicaState.RBW) {
+ finalizeBlock(replicaInfo);
+ }
+ }
+
/**
- * Start writing to a block file
- * If isRecovery is true and the block pre-exists, then we kill all
- volumeMap.put(b, v);
- volumeMap.put(b, v);
- * other threads that might be writing to this block, and then reopen the file.
- */
- public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException {
- //
- // Make sure the block isn't a valid one - we're still creating it!
- //
- if (isValidBlock(b)) {
- if (!isRecovery) {
- throw new BlockAlreadyExistsException("Block " + b + " is valid, and cannot be written to.");
- }
- // If the block was successfully finalized because all packets
- // were successfully processed at the Datanode but the ack for
- // some of the packets were not received by the client. The client
- // re-opens the connection and retries sending those packets.
- // The other reason is that an "append" is occurring to this block.
- detachBlock(b, 1);
+ * Bump a replica's generation stamp to a new one.
+ * Its on-disk meta file name is renamed to be the new one too.
+ *
+ * @param replicaInfo a replica
+ * @param newGS new generation stamp
+ * @throws IOException if rename fails
+ */
+ private void bumpReplicaGS(ReplicaInfo replicaInfo,
+ long newGS) throws IOException {
+ long oldGS = replicaInfo.getGenerationStamp();
+ File oldmeta = replicaInfo.getMetaFile();
+ replicaInfo.setGenerationStamp(newGS);
+ File newmeta = replicaInfo.getMetaFile();
+
+ // rename meta file to new GS
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+ }
+ if (!oldmeta.renameTo(newmeta)) {
+ replicaInfo.setGenerationStamp(oldGS); // restore old GS
+ throw new IOException("Block " + (Block)replicaInfo + " reopen failed. " +
+ " Unable to move meta file " + oldmeta +
+ " to " + newmeta);
}
- long blockSize = b.getNumBytes();
+ }
- //
- // Serialize access to /tmp, and check if file already there.
- //
- File f = null;
- List<Thread> threads = null;
- synchronized (this) {
- //
- // Is it already in the create process?
- //
- ActiveFile activeFile = ongoingCreates.get(b);
- if (activeFile != null) {
- f = activeFile.file;
- threads = activeFile.threads;
-
- if (!isRecovery) {
- throw new BlockAlreadyExistsException("Block " + b +
- " has already been started (though not completed), and thus cannot be created.");
- } else {
- for (Thread thread:threads) {
- thread.interrupt();
- }
- }
- ongoingCreates.remove(b);
- }
- FSVolume v = null;
- if (!isRecovery) {
- v = volumes.getNextVolume(blockSize);
- // create temporary file to hold block in the designated volume
- f = createTmpFile(v, b);
- volumeMap.put(b, new ReplicaInfo(v));
- } else if (f != null) {
- DataNode.LOG.info("Reopen already-open Block for append " + b);
- // create or reuse temporary file to hold block in the designated volume
- v = volumeMap.get(b).getVolume();
- volumeMap.put(b, new ReplicaInfo(v));
- } else {
- // reopening block for appending to it.
- DataNode.LOG.info("Reopen Block for append " + b);
- v = volumeMap.get(b).getVolume();
- f = createTmpFile(v, b);
- File blkfile = getBlockFile(b);
- File oldmeta = getMetaFile(b);
- File newmeta = getMetaFile(f, b);
-
- // rename meta file to tmp directory
- DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
- if (!oldmeta.renameTo(newmeta)) {
- throw new IOException("Block " + b + " reopen failed. " +
- " Unable to move meta file " + oldmeta +
- " to tmp dir " + newmeta);
- }
-
- // rename block file to tmp directory
- DataNode.LOG.debug("Renaming " + blkfile + " to " + f);
- if (!blkfile.renameTo(f)) {
- if (!f.delete()) {
- throw new IOException("Block " + b + " reopen failed. " +
- " Unable to remove file " + f);
- }
- if (!blkfile.renameTo(f)) {
- throw new IOException("Block " + b + " reopen failed. " +
- " Unable to move block file " + blkfile +
- " to tmp dir " + f);
- }
- }
- volumeMap.put(b, new ReplicaInfo(v));
- }
- if (f == null) {
- DataNode.LOG.warn("Block " + b + " reopen failed " +
- " Unable to locate tmp file.");
- throw new IOException("Block " + b + " reopen failed " +
- " Unable to locate tmp file.");
- }
- ongoingCreates.put(b, new ActiveFile(f, threads));
+ @Override
+ public synchronized ReplicaInPipelineInterface createRbw(Block b)
+ throws IOException {
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+ if (replicaInfo != null) {
+ throw new ReplicaAlreadyExistsException("Block " + b +
+ " already exists in state " + replicaInfo.getState() +
+ " and thus cannot be created.");
+ }
+ // create a new block
+ FSVolume v = volumes.getNextVolume(b.getNumBytes());
+ // create a rbw file to hold block in the designated volume
+ File f = v.createRbwFile(b);
+ ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
+ b.getGenerationStamp(), v, f.getParentFile());
+ volumeMap.add(newReplicaInfo);
+ return newReplicaInfo;
+ }
+
+ @Override
+ public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
+ long newGS, long minBytesRcvd, long maxBytesRcvd)
+ throws IOException {
+ DataNode.LOG.info("Recover the RBW replica " + b);
+
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+ if (replicaInfo == null) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+ }
+
+ // check the replica's state
+ if (replicaInfo.getState() != ReplicaState.RBW) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
}
+ ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
+
+ DataNode.LOG.info("Recovering replica " + rbw);
- try {
- if (threads != null) {
- for (Thread thread:threads) {
- thread.join();
- }
- }
- } catch (InterruptedException e) {
- throw new IOException("Recovery waiting for thread interrupted.");
+ // Stop the previous writer
+ rbw.stopWriter();
+ rbw.setWriter(Thread.currentThread());
+
+ // check generation stamp
+ long replicaGenerationStamp = rbw.getGenerationStamp();
+ if (replicaGenerationStamp < b.getGenerationStamp() ||
+ replicaGenerationStamp > newGS) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
+ ". Expected GS range is [" + b.getGenerationStamp() + ", " +
+ newGS + "].");
+ }
+
+ // check replica length
+ if (rbw.getBytesAcked() < minBytesRcvd || rbw.getNumBytes() > maxBytesRcvd){
+ throw new ReplicaNotFoundException("Unmatched length replica " +
+ replicaInfo + ": BytesAcked = " + rbw.getBytesAcked() +
+ " BytesRcvd = " + rbw.getNumBytes() + " are not in the range of [" +
+ minBytesRcvd + ", " + maxBytesRcvd + "].");
}
- //
- // Finally, allow a writer to the block file
- // REMIND - mjc - make this a filter stream that enforces a max
- // block size, so clients can't go crazy
- //
- File metafile = getMetaFile(f, b);
- DataNode.LOG.debug("writeTo blockfile is " + f + " of size " + f.length());
- DataNode.LOG.debug("writeTo metafile is " + metafile + " of size " + metafile.length());
- return createBlockWriteStreams( f , metafile);
+ // bump the replica's generation stamp to newGS
+ bumpReplicaGS(rbw, newGS);
+
+ return rbw;
+ }
+
+ @Override
+ public synchronized ReplicaInPipelineInterface createTemporary(Block b)
+ throws IOException {
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+ if (replicaInfo != null) {
+ throw new ReplicaAlreadyExistsException("Block " + b +
+ " already exists in state " + replicaInfo.getState() +
+ " and thus cannot be created.");
+ }
+
+ FSVolume v = volumes.getNextVolume(b.getNumBytes());
+ // create a temporary file to hold block in the designated volume
+ File f = v.createTmpFile(b);
+ ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
+ b.getGenerationStamp(), v, f.getParentFile());
+ volumeMap.add(newReplicaInfo);
+
+ return newReplicaInfo;
}
/**
@@ -1167,8 +1390,7 @@
throws IOException {
long size = 0;
synchronized (this) {
- FSVolume vol = volumeMap.get(b).getVolume();
- size = vol.getTmpFile(b).length();
+ size = getReplicaInfo(b).getBlockFile().length();
}
if (size < dataOffset) {
String msg = "Trying to change block file offset of block " + b +
@@ -1185,7 +1407,7 @@
synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {
if ( vol == null ) {
- vol = volumeMap.get( blk ).getVolume();
+ vol = getReplicaInfo( blk ).getVolume();
if ( vol == null ) {
throw new IOException("Could not find volume for block " + blk);
}
@@ -1205,40 +1427,52 @@
* Complete the block write!
*/
public synchronized void finalizeBlock(Block b) throws IOException {
- ActiveFile activeFile = ongoingCreates.get(b);
- if (activeFile == null) {
- throw new IOException("Block " + b + " is already finalized.");
- }
- File f = activeFile.file;
- if (f == null || !f.exists()) {
- throw new IOException("No temporary file " + f + " for block " + b);
- }
- FSVolume v = volumeMap.get(b).getVolume();
- if (v == null) {
- throw new IOException("No volume for temporary file " + f +
- " for block " + b);
+ ReplicaInfo replicaInfo = getReplicaInfo(b);
+ if (replicaInfo.getState() == ReplicaState.FINALIZED) {
+ // this is legal, when recovery happens on a file that has
+ // been opened for append but never modified
+ return;
}
-
- File dest = null;
- dest = v.addBlock(b, f);
- volumeMap.put(b, new ReplicaInfo(v, dest));
- ongoingCreates.remove(b);
+ finalizeReplica(replicaInfo);
+ }
+
+ private synchronized FinalizedReplica finalizeReplica(ReplicaInfo replicaInfo)
+ throws IOException {
+ FinalizedReplica newReplicaInfo = null;
+ if (replicaInfo.getState() == ReplicaState.RUR &&
+ ((ReplicaUnderRecovery)replicaInfo).getOrignalReplicaState() ==
+ ReplicaState.FINALIZED) {
+ newReplicaInfo = (FinalizedReplica)
+ ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
+ } else {
+ FSVolume v = replicaInfo.getVolume();
+ File f = replicaInfo.getBlockFile();
+ if (v == null) {
+ throw new IOException("No volume for temporary file " + f +
+ " for block " + replicaInfo);
+ }
+
+ File dest = v.addBlock(replicaInfo, f);
+ newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
+ }
+ volumeMap.add(newReplicaInfo);
+ return newReplicaInfo;
}
/**
* Remove the temporary block file (if any)
*/
public synchronized void unfinalizeBlock(Block b) throws IOException {
- // remove the block from in-memory data structure
- ActiveFile activefile = ongoingCreates.remove(b);
- if (activefile == null) {
- return;
- }
- volumeMap.remove(b);
-
- // delete the on-disk temp file
- if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
- DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+ ReplicaInfo replicaInfo = volumeMap.get(b);
+ if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
+ // remove from volumeMap
+ volumeMap.remove(b);
+
+ // delete the on-disk temp file
+ if (delBlockFromDisk(replicaInfo.getBlockFile(),
+ replicaInfo.getMetaFile(), b)) {
+ DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+ }
}
}
@@ -1269,18 +1503,34 @@
}
/**
- * Return finalized blocks from the in-memory blockmap
+ * Generates a block report from the in-memory block map.
*/
- public Block[] getBlockReport() {
- ArrayList<Block> list = new ArrayList<Block>(volumeMap.size());
+ public BlockListAsLongs getBlockReport() {
+ ArrayList<ReplicaInfo> finalized =
+ new ArrayList<ReplicaInfo>(volumeMap.size());
+ ArrayList<ReplicaInfo> uc = new ArrayList<ReplicaInfo>();
synchronized(this) {
- for (Block b : volumeMap.keySet()) {
- if (!ongoingCreates.containsKey(b)) {
- list.add(new Block(b));
+ for (ReplicaInfo b : volumeMap.replicas()) {
+ switch(b.getState()) {
+ case FINALIZED:
+ finalized.add(b);
+ break;
+ case RBW:
+ case RWR:
+ uc.add(b);
+ break;
+ case RUR:
+ ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
+ uc.add(rur.getOriginalReplica());
+ break;
+ case TEMPORARY:
+ break;
+ default:
+ assert false : "Illegal ReplicaInfo state.";
}
}
+ return new BlockListAsLongs(finalized, uc);
}
- return list.toArray(new Block[list.size()]);
}
/**
@@ -1290,7 +1540,7 @@
* is needed to handle concurrent modification to the block.
*/
synchronized Block[] getBlockList(boolean deepcopy) {
- Block[] list = volumeMap.keySet().toArray(new Block[volumeMap.size()]);
+ Block[] list = volumeMap.replicas().toArray(new Block[volumeMap.size()]);
if (deepcopy) {
for (int i = 0; i < list.length; i++) {
list[i] = new Block(list[i]);
@@ -1300,17 +1550,29 @@
}
/**
+ * Get the list of finalized blocks from in-memory blockmap.
+ */
+ synchronized List<Block> getFinalizedBlocks() {
+ ArrayList<Block> finalized = new ArrayList<Block>(volumeMap.size());
+ for (ReplicaInfo b : volumeMap.replicas()) {
+ if(b.getState() == ReplicaState.FINALIZED) {
+ finalized.add(new Block(b));
+ }
+ }
+ return finalized;
+ }
+
+ /**
* Check whether the given block is a valid one.
+ * valid means finalized
*/
public boolean isValidBlock(Block b) {
- File f = null;;
- try {
- f = validateBlockFile(b);
- } catch(IOException e) {
- DataNode.LOG.warn("Block " + b + " is not valid:",e);
+ ReplicaInfo replicaInfo = volumeMap.get(b);
+ if (replicaInfo == null ||
+ replicaInfo.getState() != ReplicaState.FINALIZED) {
+ return false;
}
-
- return f != null;
+ return replicaInfo.getBlockFile().exists();
}
/**
@@ -1337,49 +1599,25 @@
/** {@inheritDoc} */
public void validateBlockMetadata(Block b) throws IOException {
- ReplicaInfo info = volumeMap.get(b);
- if (info == null) {
- throw new IOException("Block " + b + " does not exist in volumeMap.");
- }
- FSVolume v = info.getVolume();
- File tmp = v.getTmpFile(b);
- File f = getFile(b);
- if (f == null) {
- f = tmp;
+ checkReplicaFiles(getReplicaInfo(b));
+ }
+
+ /** Check the files of a replica. */
+ static void checkReplicaFiles(final ReplicaInfo r) throws IOException {
+ final File f = r.getBlockFile();
+ if (!f.exists()) {
+ throw new FileNotFoundException("File " + f + " not found, r=" + r);
}
- if (f == null) {
- throw new IOException("Block " + b + " does not exist on disk.");
+ if (r.getNumBytes() != f.length()) {
+ throw new IOException("File length mismatched."
+ + f + " length is " + f.length() + " but r=" + r);
+ }
+ final File metafile = getMetaFile(f, r);
+ if (!metafile.exists()) {
+ throw new IOException("Metafile " + metafile + " does not exist, r=" + r);
}
- if (!f.exists()) {
- throw new IOException("Block " + b +
- " block file " + f +
- " does not exist on disk.");
- }
- if (b.getNumBytes() != f.length()) {
- throw new IOException("Block " + b +
- " length is " + b.getNumBytes() +
- " does not match block file length " +
- f.length());
- }
- File meta = getMetaFile(f, b);
- if (meta == null) {
- throw new IOException("Block " + b +
- " metafile does not exist.");
- }
- if (!meta.exists()) {
- throw new IOException("Block " + b +
- " metafile " + meta +
- " does not exist on disk.");
- }
- if (meta.length() == 0) {
- throw new IOException("Block " + b + " metafile " + meta + " is empty.");
- }
- long stamp = parseGenerationStamp(f, meta);
- if (stamp != b.getGenerationStamp()) {
- throw new IOException("Block " + b +
- " genstamp is " + b.getGenerationStamp() +
- " does not match meta file stamp " +
- stamp);
+ if (metafile.length() == 0) {
+ throw new IOException("Metafile " + metafile + " is empty, r=" + r);
}
}
@@ -1396,7 +1634,8 @@
synchronized (this) {
f = getFile(invalidBlks[i]);
ReplicaInfo dinfo = volumeMap.get(invalidBlks[i]);
- if (dinfo == null) {
+ if (dinfo == null ||
+ dinfo.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) {
DataNode.LOG.warn("Unexpected error trying to delete block "
+ invalidBlks[i] +
". BlockInfo not found in volumeMap.");
@@ -1428,7 +1667,13 @@
error = true;
continue;
}
- v.clearPath(parent);
+ ReplicaState replicaState = dinfo.getState();
+ if (replicaState == ReplicaState.FINALIZED ||
+ (replicaState == ReplicaState.RUR &&
+ ((ReplicaUnderRecovery)dinfo).getOrignalReplicaState() ==
+ ReplicaState.FINALIZED)) {
+ v.clearPath(parent);
+ }
volumeMap.remove(invalidBlks[i]);
}
File metaFile = getMetaFile( f, invalidBlks[i] );
@@ -1455,16 +1700,24 @@
}
/**
- * Turn the block identifier into a filename.
+ * Turn the block identifier into a filename; ignore generation stamp!!!
*/
public synchronized File getFile(Block b) {
- ReplicaInfo info = volumeMap.get(b);
+ return getFile(b.getBlockId());
+ }
+
+ /**
+ * Turn the block identifier into a filename
+ * @param blockId a block's id
+ * @return on disk data file path; null if the replica does not exist
+ */
+ private File getFile(long blockId) {
+ ReplicaInfo info = volumeMap.get(blockId);
if (info != null) {
- return info.getFile();
+ return info.getBlockFile();
}
- return null;
+ return null;
}
-
/**
* check if a data directory is healthy
* if some volumes failed - make sure to remove all the blocks that belong
@@ -1483,12 +1736,12 @@
// remove related blocks
long mlsec = System.currentTimeMillis();
synchronized (this) {
- Iterator<Block> ib = volumeMap.keySet().iterator();
+ Iterator<ReplicaInfo> ib = volumeMap.replicas().iterator();
while(ib.hasNext()) {
- Block b = ib.next();
+ ReplicaInfo b = ib.next();
total_blocks ++;
// check if the volume block belongs to still valid
- FSVolume vol = volumeMap.get(b).getVolume();
+ FSVolume vol = b.getVolume();
for(FSVolume fv: failed_vols) {
if(vol == fv) {
DataNode.LOG.warn("removing block " + b.getBlockId() + " from vol "
@@ -1589,11 +1842,12 @@
*/
public void checkAndUpdate(long blockId, File diskFile,
File diskMetaFile, FSVolume vol) {
- Block block = new Block(blockId);
DataNode datanode = DataNode.getDataNode();
Block corruptBlock = null;
+ ReplicaInfo memBlockInfo;
synchronized (this) {
- if (ongoingCreates.get(block) != null) {
+ memBlockInfo = volumeMap.get(blockId);
+ if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
// Block is not finalized - ignore the difference
return;
}
@@ -1602,7 +1856,6 @@
Block.getGenerationStamp(diskMetaFile.getName()) :
Block.GRANDFATHER_GENERATION_STAMP;
- ReplicaInfo memBlockInfo = volumeMap.get(block);
if (diskFile == null || !diskFile.exists()) {
if (memBlockInfo == null) {
// Block file does not exist and block does not exist in memory
@@ -1614,14 +1867,14 @@
}
return;
}
- if (!memBlockInfo.getFile().exists()) {
+ if (!memBlockInfo.getBlockFile().exists()) {
// Block is in memory and not on the disk
// Remove the block from volumeMap
- volumeMap.remove(block);
+ volumeMap.remove(blockId);
if (datanode.blockScanner != null) {
- datanode.blockScanner.deleteBlock(block);
+ datanode.blockScanner.deleteBlock(new Block(blockId));
}
- DataNode.LOG.warn("Removed block " + block.getBlockId()
+ DataNode.LOG.warn("Removed block " + blockId
+ " from memory with missing block file on the disk");
// Finally remove the metadata file
if (diskMetaFile != null && diskMetaFile.exists()
@@ -1637,23 +1890,20 @@
*/
if (memBlockInfo == null) {
// Block is missing in memory - add the block to volumeMap
- ReplicaInfo diskBlockInfo = new ReplicaInfo(vol, diskFile);
- Block diskBlock = new Block(diskFile, diskFile.length(), diskGS);
- volumeMap.put(diskBlock, diskBlockInfo);
+ ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId,
+ diskFile.length(), diskGS, vol, diskFile.getParentFile());
+ volumeMap.add(diskBlockInfo);
if (datanode.blockScanner != null) {
- datanode.blockScanner.addBlock(diskBlock);
+ datanode.blockScanner.addBlock(diskBlockInfo);
}
- DataNode.LOG.warn("Added missing block to memory " + diskBlock);
+ DataNode.LOG.warn("Added missing block to memory " + (Block)diskBlockInfo);
return;
}
/*
* Block exists in volumeMap and the block file exists on the disk
*/
- // Iterate to get key from volumeMap for the blockId
- Block memBlock = getBlockKey(blockId);
-
// Compare block files
- File memFile = memBlockInfo.getFile();
+ File memFile = memBlockInfo.getBlockFile();
if (memFile.exists()) {
if (memFile.compareTo(diskFile) != 0) {
DataNode.LOG.warn("Block file " + memFile.getAbsolutePath()
@@ -1670,19 +1920,17 @@
+ memFile.getAbsolutePath()
+ " does not exist. Updating it to the file found during scan "
+ diskFile.getAbsolutePath());
- ReplicaInfo info = volumeMap.remove(memBlock);
- info.setFile(diskFile);
+ memBlockInfo.setDir(diskFile.getParentFile());
memFile = diskFile;
DataNode.LOG.warn("Updating generation stamp for block " + blockId
- + " from " + memBlock.getGenerationStamp() + " to " + diskGS);
- memBlock.setGenerationStamp(diskGS);
- volumeMap.put(memBlock, info);
+ + " from " + memBlockInfo.getGenerationStamp() + " to " + diskGS);
+ memBlockInfo.setGenerationStamp(diskGS);
}
// Compare generation stamp
- if (memBlock.getGenerationStamp() != diskGS) {
- File memMetaFile = getMetaFile(diskFile, memBlock);
+ if (memBlockInfo.getGenerationStamp() != diskGS) {
+ File memMetaFile = getMetaFile(diskFile, memBlockInfo);
if (memMetaFile.exists()) {
if (memMetaFile.compareTo(diskMetaFile) != 0) {
DataNode.LOG.warn("Metadata file in memory "
@@ -1699,23 +1947,19 @@
: Block.GRANDFATHER_GENERATION_STAMP;
DataNode.LOG.warn("Updating generation stamp for block " + blockId
- + " from " + memBlock.getGenerationStamp() + " to " + gs);
+ + " from " + memBlockInfo.getGenerationStamp() + " to " + gs);
- ReplicaInfo info = volumeMap.remove(memBlock);
- memBlock.setGenerationStamp(gs);
- volumeMap.put(memBlock, info);
+ memBlockInfo.setGenerationStamp(gs);
}
}
// Compare block size
- if (memBlock.getNumBytes() != memFile.length()) {
+ if (memBlockInfo.getNumBytes() != memFile.length()) {
// Update the length based on the block file
- corruptBlock = new Block(memBlock);
+ corruptBlock = new Block(memBlockInfo);
DataNode.LOG.warn("Updating size of block " + blockId + " from "
- + memBlock.getNumBytes() + " to " + memFile.length());
- ReplicaInfo info = volumeMap.remove(memBlock);
- memBlock.setNumBytes(memFile.length());
- volumeMap.put(memBlock, info);
+ + memBlockInfo.getNumBytes() + " to " + memFile.length());
+ memBlockInfo.setNumBytes(memFile.length());
}
}
@@ -1734,19 +1978,158 @@
}
}
- /**
- * Get reference to the key in the volumeMap. To be called from methods that
- * are synchronized on {@link FSDataset}
- * @param blockId
- * @return key from the volumeMap
- */
- Block getBlockKey(long blockId) {
+ @Override
+ public ReplicaInfo getReplica(long blockId) {
assert(Thread.holdsLock(this));
- for (Block b : volumeMap.keySet()) {
- if (b.getBlockId() == blockId) {
- return b;
- }
+ return volumeMap.get(blockId);
+ }
+
+ @Override // FSDatasetInterface
+ public synchronized ReplicaRecoveryInfo initReplicaRecovery(
+ RecoveringBlock rBlock) throws IOException {
+ return initReplicaRecovery(
+ volumeMap, rBlock.getBlock(), rBlock.getNewGenerationStamp());
+ }
+
+ /** static version of {@link #initReplicaRecovery(Block, long)}. */
+ static ReplicaRecoveryInfo initReplicaRecovery(
+ ReplicasMap map, Block block, long recoveryId) throws IOException {
+ final ReplicaInfo replica = map.get(block.getBlockId());
+ DataNode.LOG.info("initReplicaRecovery: block=" + block
+ + ", recoveryId=" + recoveryId
+ + ", replica=" + replica);
+
+ //check replica
+ if (replica == null) {
+ return null;
}
- return null;
+
+ //stop writer if there is any
+ if (replica instanceof ReplicaInPipeline) {
+ ((ReplicaInPipeline)replica).stopWriter();
+ }
+
+ //check generation stamp
+ if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+ throw new IOException(
+ "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+ + block + ", replica=" + replica);
+ }
+
+ //check recovery id
+ if (replica.getGenerationStamp() >= recoveryId) {
+ throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
+ + " replica.getGenerationStamp() >= recoveryId = " + recoveryId
+ + ", block=" + block + ", replica=" + replica);
+ }
+
+ //check RUR
+ final ReplicaUnderRecovery rur;
+ if (replica.getState() == ReplicaState.RUR) {
+ rur = (ReplicaUnderRecovery)replica;
+ if (rur.getRecoveryID() >= recoveryId) {
+ throw new RecoveryInProgressException(
+ "rur.getRecoveryID() >= recoveryId = " + recoveryId
+ + ", block=" + block + ", rur=" + rur);
+ }
+ final long oldRecoveryID = rur.getRecoveryID();
+ rur.setRecoveryID(recoveryId);
+ DataNode.LOG.info("initReplicaRecovery: update recovery id for " + block
+ + " from " + oldRecoveryID + " to " + recoveryId);
+ }
+ else {
+ rur = new ReplicaUnderRecovery(replica, recoveryId);
+ map.add(rur);
+ DataNode.LOG.info("initReplicaRecovery: changing replica state for "
+ + block + " from " + replica.getState()
+ + " to " + rur.getState());
+ }
+ return rur.createInfo();
+ }
+
+ /** Update a replica of a block. */
+ synchronized void updateReplica(final Block block, final long recoveryId,
+ final long newlength) throws IOException {
+ //get replica
+ final ReplicaInfo replica = volumeMap.get(block.getBlockId());
+ DataNode.LOG.info("updateReplica: block=" + block
+ + ", recoveryId=" + recoveryId
+ + ", length=" + newlength
+ + ", replica=" + replica);
+
+ //check replica
+ if (replica == null) {
+ throw new ReplicaNotFoundException(block);
+ }
+
+ //check replica state
+ if (replica.getState() != ReplicaState.RUR) {
+ throw new IOException("replica.getState() != " + ReplicaState.RUR
+ + ", replica=" + replica);
+ }
+
+ //check replica files before update
+ checkReplicaFiles(replica);
+
+ //update replica
+ final ReplicaInfo finalized = (ReplicaInfo)updateReplicaUnderRecovery(
+ replica, recoveryId, newlength);
+
+ //check replica files after update
+ checkReplicaFiles(finalized);
+ }
+
+ @Override // FSDatasetInterface
+ public synchronized FinalizedReplica updateReplicaUnderRecovery(
+ Block oldBlock,
+ long recoveryId,
+ long newlength) throws IOException {
+ Replica r = getReplica(oldBlock.getBlockId());
+ if(r.getState() != ReplicaState.RUR)
+ throw new IOException("Replica " + r + " must be under recovery.");
+ ReplicaUnderRecovery rur = (ReplicaUnderRecovery)r;
+ DataNode.LOG.info("updateReplicaUnderRecovery: recoveryId=" + recoveryId
+ + ", newlength=" + newlength
+ + ", rur=" + rur);
+
+ //check recovery id
+ if (rur.getRecoveryID() != recoveryId) {
+ throw new IOException("rur.getRecoveryID() != recoveryId = " + recoveryId
+ + ", rur=" + rur);
+ }
+
+ // bump rur's GS to be recovery id
+ bumpReplicaGS(rur, recoveryId);
+
+ //update length
+ final File replicafile = rur.getBlockFile();
+ if (rur.getNumBytes() < newlength) {
+ throw new IOException("rur.getNumBytes() < newlength = " + newlength
+ + ", rur=" + rur);
+ }
+ if (rur.getNumBytes() > newlength) {
+ rur.unlinkBlock(1);
+ truncateBlock(replicafile, rur.getMetaFile(), rur.getNumBytes(), newlength);
+ // update RUR with the new length
+ rur.setNumBytes(newlength);
+ }
+
+ // finalize the block
+ return finalizeReplica(rur);
+ }
+
+ @Override // FSDatasetInterface
+ public synchronized long getReplicaVisibleLength(final Block block)
+ throws IOException {
+ final Replica replica = getReplica(block.getBlockId());
+ if (replica == null) {
+ throw new ReplicaNotFoundException(block);
+ }
+ if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+ throw new IOException(
+ "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+ + block + ", replica=" + replica);
+ }
+ return replica.getVisibleLength();
}
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Wed Sep 30 23:39:30 2009
@@ -28,7 +28,10 @@
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -94,6 +97,14 @@
public long getLength(Block b) throws IOException;
/**
+ * Get reference to the replica meta info in the replicasMap.
+ * To be called from methods that are synchronized on {@link FSDataset}
+ * @param blockId
+ * @return replica from the replicas map
+ */
+ public Replica getReplica(long blockId);
+
+ /**
* @return the generation stamp stored with the block.
*/
public Block getStoredBlock(long blkid) throws IOException;
@@ -144,6 +155,10 @@
checksumOut = cOut;
}
+ void close() throws IOException {
+ IOUtils.closeStream(dataOut);
+ IOUtils.closeStream(checksumOut);
+ }
}
/**
@@ -167,16 +182,76 @@
}
/**
- * Creates the block and returns output streams to write data and CRC
- * @param b
- * @param isRecovery True if this is part of erro recovery, otherwise false
- * @return a BlockWriteStreams object to allow writing the block data
- * and CRC
+ * Creates a temporary replica and returns the meta information of the replica
+ *
+ * @param b block
+ * @return the meta info of the replica which is being written to
+ * @throws IOException if an error occurs
+ */
+ public ReplicaInPipelineInterface createTemporary(Block b)
+ throws IOException;
+
+ /**
+ * Creates a RBW replica and returns the meta info of the replica
+ *
+ * @param b block
+ * @return the meta info of the replica which is being written to
+ * @throws IOException if an error occurs
+ */
+ public ReplicaInPipelineInterface createRbw(Block b) throws IOException;
+
+ /**
+ * Recovers a RBW replica and returns the meta info of the replica
+ *
+ * @param b block
+ * @param newGS the new generation stamp for the replica
+ * @param minBytesRcvd the minimum number of bytes that the replica could have
+ * @param maxBytesRcvd the maximum number of bytes that the replica could have
+ * @return the meta info of the replica which is being written to
+ * @throws IOException if an error occurs
+ */
+ public ReplicaInPipelineInterface recoverRbw(Block b,
+ long newGS, long minBytesRcvd, long maxBytesRcvd)
+ throws IOException;
+
+ /**
+ * Append to a finalized replica and returns the meta info of the replica
+ *
+ * @param b block
+ * @param newGS the new generation stamp for the replica
+ * @param expectedBlockLen the number of bytes the replica is expected to have
+ * @return the meata info of the replica which is being written to
* @throws IOException
*/
- public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException;
+ public ReplicaInPipelineInterface append(Block b,
+ long newGS, long expectedBlockLen) throws IOException;
/**
+ * Recover a failed append to a finalized replica
+ * and returns the meta info of the replica
+ *
+ * @param b block
+ * @param newGS the new generation stamp for the replica
+ * @param expectedBlockLen the number of bytes the replica is expected to have
+ * @return the meta info of the replica which is being written to
+ * @throws IOException
+ */
+ public ReplicaInPipelineInterface recoverAppend(Block b,
+ long newGS, long expectedBlockLen) throws IOException;
+
+ /**
+ * Recover a failed pipeline close
+ * It bumps the replica's generation stamp and finalize it if RBW replica
+ *
+ * @param b block
+ * @param newGS the new generation stamp for the replica
+ * @param expectedBlockLen the number of bytes the replica is expected to have
+ * @throws IOException
+ */
+ public void recoverClose(Block b,
+ long newGS, long expectedBlockLen) throws IOException;
+
+ /**
* Update the block to the new generation stamp and length.
*/
public void updateBlock(Block oldblock, Block newblock) throws IOException;
@@ -202,7 +277,7 @@
* Returns the block report - the full list of blocks stored
* @return - the block report - the full list of blocks stored
*/
- public Block[] getBlockReport();
+ public BlockListAsLongs getBlockReport();
/**
* Is the block valid?
@@ -270,4 +345,25 @@
* @return true if more then minimum valid volumes left in the FSDataSet
*/
public boolean hasEnoughResource();
+
+ /**
+ * Get visible length of the specified replica.
+ */
+ long getReplicaVisibleLength(final Block block) throws IOException;
+
+ /**
+ * Initialize a replica recovery.
+ *
+ * @return actual state of the replica on this data-node or
+ * null if data-node does not have the replica.
+ */
+ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+ throws IOException;
+
+ /**
+ * Update replica's generation stamp and length and finalize it.
+ */
+ public FinalizedReplica updateReplicaUnderRecovery(Block oldBlock,
+ long recoveryId,
+ long newLength) throws IOException;
}
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/**
+ * This class describes a replica that has been finalized.
+ */
+class FinalizedReplica extends ReplicaInfo {
+ private boolean unlinked; // copy-on-write done for block
+
+ /**
+ * Constructor
+ * @param blockId block id
+ * @param len replica length
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ FinalizedReplica(long blockId, long len, long genStamp,
+ FSVolume vol, File dir) {
+ super(blockId, len, genStamp, vol, dir);
+ }
+
+ /**
+ * Constructor
+ * @param block a block
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ FinalizedReplica(Block block, FSVolume vol, File dir) {
+ super(block, vol, dir);
+ }
+
+ @Override // ReplicaInfo
+ public ReplicaState getState() {
+ return ReplicaState.FINALIZED;
+ }
+
+ @Override // ReplicaInfo
+ boolean isUnlinked() {
+ return unlinked;
+ }
+
+ @Override // ReplicaInfo
+ void setUnlinked() {
+ unlinked = true;
+ }
+
+ @Override
+ public long getVisibleLength() {
+ return getNumBytes(); // all bytes are visible
+ }
+
+ @Override
+ public long getBytesOnDisk() {
+ return getNumBytes();
+ }
+
+ @Override // Object
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override // Object
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return super.toString()
+ + "\n unlinked=" + unlinked;
+ }
+}
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/Replica.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/Replica.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/Replica.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+
+/**
+ * This represents block replicas which stored in DataNode.
+ */
+public interface Replica {
+ /** get block ID */
+ public long getBlockId();
+
+ /** get generation stamp */
+ public long getGenerationStamp();
+
+ /**
+ * Get the replica state
+ * @return the replica state
+ */
+ public ReplicaState getState();
+
+ /**
+ * Get the number of bytes received
+ * @return the number of bytes that have been received
+ */
+ public long getNumBytes();
+
+ /**
+ * Get the number of bytes that have written to disk
+ * @return the number of bytes that have written to disk
+ */
+ public long getBytesOnDisk();
+
+ /**
+ * Get the number of bytes that are visible to readers
+ * @return the number of bytes that are visible to readers
+ */
+ public long getVisibleLength();
+}
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+/**
+ * Exception indicating that the target block already exists
+ * and is not set to be recovered/overwritten.
+ */
+class ReplicaAlreadyExistsException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public ReplicaAlreadyExistsException() {
+ super();
+ }
+
+ public ReplicaAlreadyExistsException(String msg) {
+ super(msg);
+ }
+}
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/** This class represents replicas being written.
+ * Those are the replicas that
+ * are created in a pipeline initiated by a dfs client.
+ */
+class ReplicaBeingWritten extends ReplicaInPipeline {
+ /**
+ * Constructor for a zero length replica
+ * @param blockId block id
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ ReplicaBeingWritten(long blockId, long genStamp,
+ FSVolume vol, File dir) {
+ super( blockId, genStamp, vol, dir);
+ }
+
+ /**
+ * Constructor
+ * @param block a block
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ * @param writer a thread that is writing to this replica
+ */
+ ReplicaBeingWritten(Block block,
+ FSVolume vol, File dir, Thread writer) {
+ super( block, vol, dir, writer);
+ }
+
+ /**
+ * Constructor
+ * @param blockId block id
+ * @param len replica length
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ * @param writer a thread that is writing to this replica
+ */
+ ReplicaBeingWritten(long blockId, long len, long genStamp,
+ FSVolume vol, File dir, Thread writer ) {
+ super( blockId, len, genStamp, vol, dir, writer);
+ }
+
+ @Override
+ public long getVisibleLength() {
+ return getBytesAcked(); // all acked bytes are visible
+ }
+
+ @Override //ReplicaInfo
+ public ReplicaState getState() {
+ return ReplicaState.RBW;
+ }
+
+ @Override // Object
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override // Object
+ public int hashCode() {
+ return super.hashCode();
+ }
+}