You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:06:08 UTC
svn commit: r885143 [10/18] - in /hadoop/hdfs/branches/HDFS-326: ./
.eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/
src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs...
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Sat Nov 28 20:05:56 2009
@@ -17,19 +17,21 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import javax.management.NotCompliantMBeanException;
@@ -41,18 +43,24 @@
import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.mortbay.log.Log;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.io.IOUtils;
/**************************************************
* FSDataset manages a set of data blocks. Each block
@@ -120,8 +128,9 @@
if ( ! metaData.renameTo( newmeta ) ||
! src.renameTo( dest ) ) {
throw new IOException( "could not move files for " + b +
- " from tmp to " +
- dest.getAbsolutePath() );
+ " from " + src + " to " +
+ dest.getAbsolutePath() + " or from"
+ + metaData + " to " + newmeta);
}
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("addBlock: Moved " + metaData + " to " + newmeta);
@@ -166,46 +175,48 @@
return children[ lastChildIdx ].addBlock(b, src, true, false);
}
- /** Find the metadata file for the specified block file.
- * Return the generation stamp from the name of the metafile.
- */
- long getGenerationStampFromFile(File[] listdir, File blockFile) {
- String blockName = blockFile.getName();
- for (int j = 0; j < listdir.length; j++) {
- String path = listdir[j].getName();
- if (!path.startsWith(blockName)) {
- continue;
- }
- if (blockFile == listdir[j]) {
- continue;
- }
- return Block.getGenerationStamp(listdir[j].getName());
- }
- DataNode.LOG.warn("Block " + blockFile +
- " does not have a metafile!");
- return Block.GRANDFATHER_GENERATION_STAMP;
- }
-
- void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap, FSVolume volume) {
+ void getVolumeMap(ReplicasMap volumeMap, FSVolume volume)
+ throws IOException {
if (children != null) {
for (int i = 0; i < children.length; i++) {
children[i].getVolumeMap(volumeMap, volume);
}
}
- File blockFiles[] = dir.listFiles();
- if (blockFiles == null) {
- throw new IllegalStateException("Not a valid directory: " + dir);
- }
- for (int i = 0; i < blockFiles.length; i++) {
- if (Block.isBlockFilename(blockFiles[i])) {
- long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
- volumeMap.put(new Block(blockFiles[i], blockFiles[i].length(), genStamp),
- new ReplicaInfo(volume, blockFiles[i]));
+ recoverTempUnlinkedBlock();
+ volume.addToReplicasMap(volumeMap, dir, true);
+ }
+
+ /**
+ * Recover unlinked tmp files on datanode restart. If the original block
+ * does not exist, then the tmp file is renamed to be the
+ * original file name; otherwise the tmp file is deleted.
+ */
+ private void recoverTempUnlinkedBlock() throws IOException {
+ File files[] = dir.listFiles();
+ for (File file : files) {
+ if (!FSDataset.isUnlinkTmpFile(file)) {
+ continue;
+ }
+ File blockFile = getOrigFile(file);
+ if (blockFile.exists()) {
+ //
+ // If the original block file still exists, then no recovery
+ // is needed.
+ //
+ if (!file.delete()) {
+ throw new IOException("Unable to cleanup unlinked tmp file " +
+ file);
+ }
+ } else {
+ if (!file.renameTo(blockFile)) {
+ throw new IOException("Unable to cleanup detached file " +
+ file);
+ }
}
}
}
-
+
/**
* check if a data diretory is healthy
* @throws DiskErrorException
@@ -283,9 +294,10 @@
}
class FSVolume {
- private FSDir dataDir;
- private File tmpDir;
- private File detachDir; // copy on write for blocks in snapshot
+ private File currentDir;
+ private FSDir dataDir; // directory store Finalized replica
+ private File rbwDir; // directory store RBW replica
+ private File tmpDir; // directory store Temporary replica
private DF usage;
private DU dfsUsage;
private long reserved;
@@ -293,13 +305,10 @@
FSVolume(File currentDir, Configuration conf) throws IOException {
this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
- boolean supportAppends = conf.getBoolean("dfs.support.append", false);
+ this.currentDir = currentDir;
File parent = currentDir.getParentFile();
-
- this.detachDir = new File(parent, "detach");
- if (detachDir.exists()) {
- recoverDetachedBlocks(currentDir, detachDir);
- }
+ final File finalizedDir = new File(
+ currentDir, DataStorage.STORAGE_DIR_FINALIZED);
// Files that were being written when the datanode was last shutdown
// are now moved back to the data directory. It is possible that
@@ -308,30 +317,38 @@
//
this.tmpDir = new File(parent, "tmp");
if (tmpDir.exists()) {
- if (supportAppends) {
- recoverDetachedBlocks(currentDir, tmpDir);
- } else {
- FileUtil.fullyDelete(tmpDir);
+ FileUtil.fullyDelete(tmpDir);
+ }
+ this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
+ if (rbwDir.exists() && !supportAppends) {
+ FileUtil.fullyDelete(rbwDir);
+ }
+ this.dataDir = new FSDir(finalizedDir);
+ if (!rbwDir.mkdirs()) { // create rbw directory if not exist
+ if (!rbwDir.isDirectory()) {
+ throw new IOException("Mkdirs failed to create " + rbwDir.toString());
}
}
- this.dataDir = new FSDir(currentDir);
if (!tmpDir.mkdirs()) {
if (!tmpDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + tmpDir.toString());
}
}
- if (!detachDir.mkdirs()) {
- if (!detachDir.isDirectory()) {
- throw new IOException("Mkdirs failed to create " + detachDir.toString());
- }
- }
this.usage = new DF(parent, conf);
this.dfsUsage = new DU(parent, conf);
this.dfsUsage.start();
}
+ File getCurrentDir() {
+ return currentDir;
+ }
+
void decDfsUsed(long value) {
- dfsUsage.decDfsUsed(value);
+ // The caller to this method (BlockFileDeleteTask.run()) does
+ // not have locked FSDataset.this yet.
+ synchronized(FSDataset.this) {
+ dfsUsage.decDfsUsed(value);
+ }
}
long getDfsUsed() throws IOException {
@@ -364,51 +381,23 @@
}
/**
- * Temporary files. They get moved to the real block directory either when
- * the block is finalized or the datanode restarts.
+ * Temporary files. They get moved to the finalized block directory when
+ * the block is finalized.
*/
File createTmpFile(Block b) throws IOException {
File f = new File(tmpDir, b.getBlockName());
- return createTmpFile(b, f);
+ return FSDataset.createTmpFile(b, f);
}
/**
- * Returns the name of the temporary file for this block.
+ * RBW files. They get moved to the finalized block directory when
+ * the block is finalized.
*/
- File getTmpFile(Block b) throws IOException {
- File f = new File(tmpDir, b.getBlockName());
- return f;
+ File createRbwFile(Block b) throws IOException {
+ File f = new File(rbwDir, b.getBlockName());
+ return FSDataset.createTmpFile(b, f);
}
- /**
- * Files used for copy-on-write. They need recovery when datanode
- * restarts.
- */
- File createDetachFile(Block b, String filename) throws IOException {
- File f = new File(detachDir, filename);
- return createTmpFile(b, f);
- }
-
- private File createTmpFile(Block b, File f) throws IOException {
- if (f.exists()) {
- throw new IOException("Unexpected problem in creating temporary file for "+
- b + ". File " + f + " should not be present, but is.");
- }
- // Create the zero-length temp file
- //
- boolean fileCreated = false;
- try {
- fileCreated = f.createNewFile();
- } catch (IOException ioe) {
- throw (IOException)new IOException(DISK_ERROR +f).initCause(ioe);
- }
- if (!fileCreated) {
- throw new IOException("Unexpected problem in creating temporary file for "+
- b + ". File " + f + " should be creatable, but is already present.");
- }
- return f;
- }
-
File addBlock(Block b, File f) throws IOException {
File blockFile = dataDir.addBlock(b, f);
File metaFile = getMetaFile( blockFile , b);
@@ -419,55 +408,126 @@
void checkDirs() throws DiskErrorException {
dataDir.checkDirTree();
DiskChecker.checkDir(tmpDir);
+ DiskChecker.checkDir(rbwDir);
}
- void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
+ void getVolumeMap(ReplicasMap volumeMap) throws IOException {
+ // add finalized replicas
dataDir.getVolumeMap(volumeMap, this);
- }
-
- void clearPath(File f) {
- dataDir.clearPath(f);
- }
-
- public String toString() {
- return dataDir.dir.getAbsolutePath();
+ // add rbw replicas
+ addToReplicasMap(volumeMap, rbwDir, false);
}
/**
- * Recover detached files on datanode restart. If a detached block
- * does not exist in the original directory, then it is moved to the
- * original directory.
+ * Add replicas under the given directory to the volume map
+ * @param volumeMap the replicas map
+ * @param dir an input directory
+ * @param isFinalized true if the directory has finalized replicas;
+ * false if the directory has rbw replicas
*/
- private void recoverDetachedBlocks(File dataDir, File dir)
- throws IOException {
- File contents[] = dir.listFiles();
- if (contents == null) {
- return;
- }
- for (int i = 0; i < contents.length; i++) {
- if (!contents[i].isFile()) {
- throw new IOException ("Found " + contents[i] + " in " + dir +
- " but it is not a file.");
+ private void addToReplicasMap(ReplicasMap volumeMap,
+ File dir, boolean isFinalized) {
+ File blockFiles[] = dir.listFiles();
+ for (File blockFile : blockFiles) {
+ if (!Block.isBlockFilename(blockFile))
+ continue;
+
+ long genStamp = getGenerationStampFromFile(blockFiles, blockFile);
+ long blockId = Block.filename2id(blockFile.getName());
+ ReplicaInfo newReplica = null;
+ if (isFinalized) {
+ newReplica = new FinalizedReplica(blockId,
+ blockFile.length(), genStamp, this, blockFile.getParentFile());
+ } else {
+ newReplica = new ReplicaWaitingToBeRecovered(blockId,
+ validateIntegrity(blockFile, genStamp),
+ genStamp, this, blockFile.getParentFile());
}
- //
- // If the original block file still exists, then no recovery
- // is needed.
- //
- File blk = new File(dataDir, contents[i].getName());
- if (!blk.exists()) {
- if (!contents[i].renameTo(blk)) {
- throw new IOException("Unable to recover detached file " +
- contents[i]);
- }
- continue;
+ ReplicaInfo oldReplica = volumeMap.add(newReplica);
+ if (oldReplica != null) {
+ DataNode.LOG.warn("Two block files with the same block id exist " +
+ "on disk: " + oldReplica.getBlockFile() +
+ " and " + blockFile );
}
- if (!contents[i].delete()) {
- throw new IOException("Unable to cleanup detached file " +
- contents[i]);
+ }
+ }
+
+ /**
+ * Find out the number of bytes in the block that match its crc.
+ *
+ * This algorithm assumes that data corruption caused by unexpected
+ * datanode shutdown occurs only in the last crc chunk. So it checks
+ * only the last chunk.
+ *
+ * @param blockFile the block file
+ * @param genStamp generation stamp of the block
+ * @return the number of valid bytes
+ */
+ private long validateIntegrity(File blockFile, long genStamp) {
+ DataInputStream checksumIn = null;
+ InputStream blockIn = null;
+ try {
+ File metaFile = new File(getMetaFileName(blockFile.toString(), genStamp));
+ long blockFileLen = blockFile.length();
+ long metaFileLen = metaFile.length();
+ int crcHeaderLen = DataChecksum.getChecksumHeaderSize();
+ if (!blockFile.exists() || blockFileLen == 0 ||
+ !metaFile.exists() || metaFileLen < (long)crcHeaderLen) {
+ return 0;
+ }
+ checksumIn = new DataInputStream(
+ new BufferedInputStream(new FileInputStream(metaFile),
+ BUFFER_SIZE));
+
+ // read and handle the common header here. For now just a version
+ BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+ short version = header.getVersion();
+ if (version != FSDataset.METADATA_VERSION) {
+ DataNode.LOG.warn("Wrong version (" + version + ") for metadata file "
+ + metaFile + " ignoring ...");
+ }
+ DataChecksum checksum = header.getChecksum();
+ int bytesPerChecksum = checksum.getBytesPerChecksum();
+ int checksumSize = checksum.getChecksumSize();
+ long numChunks = Math.min(
+ (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum,
+ (metaFileLen - crcHeaderLen)/checksumSize);
+ if (numChunks == 0) {
+ return 0;
+ }
+ IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize);
+ blockIn = new FileInputStream(blockFile);
+ long lastChunkStartPos = (numChunks-1)*bytesPerChecksum;
+ IOUtils.skipFully(blockIn, lastChunkStartPos);
+ int lastChunkSize = (int)Math.min(
+ bytesPerChecksum, blockFileLen-lastChunkStartPos);
+ byte[] buf = new byte[lastChunkSize+checksumSize];
+ checksumIn.readFully(buf, lastChunkSize, checksumSize);
+ IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
+
+ checksum.update(buf, 0, lastChunkSize);
+ if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
+ return lastChunkStartPos + lastChunkSize;
+ } else { // last chunck is corrupt
+ return lastChunkStartPos;
}
+ } catch (IOException e) {
+ DataNode.LOG.warn(e);
+ return 0;
+ } finally {
+ IOUtils.closeStream(checksumIn);
+ IOUtils.closeStream(blockIn);
}
}
+
+ void clearPath(File f) {
+ dataDir.clearPath(f);
+ }
+
+ public String toString() {
+ return getDir().getAbsolutePath();
+ }
}
static class FSVolumeSet {
@@ -530,7 +590,7 @@
return remaining;
}
- synchronized void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
+ synchronized void getVolumeMap(ReplicasMap volumeMap) throws IOException {
for (int idx = 0; idx < volumes.length; idx++) {
volumes[idx].getVolumeMap(volumeMap);
}
@@ -571,10 +631,11 @@
}
}
volumes = fsvs; // replace array of volumes
+ DataNode.LOG.info("Completed FSVolumeSet.checkDirs. Removed "
+ + removed_vols.size() + " volumes. List of current volumes: "
+ + this);
}
- Log.info("Completed FSVolumeSet.checkDirs. Removed=" + removed_size +
- "volumes. List of current volumes: " + toString());
-
+
return removed_vols;
}
@@ -606,25 +667,22 @@
//Find better place?
public static final String METADATA_EXTENSION = ".meta";
public static final short METADATA_VERSION = 1;
-
+ static final String UNLINK_BLOCK_SUFFIX = ".unlinked";
- static class ActiveFile {
- final File file;
- final List<Thread> threads = new ArrayList<Thread>(2);
-
- ActiveFile(File f, List<Thread> list) {
- file = f;
- if (list != null) {
- threads.addAll(list);
- }
- threads.add(Thread.currentThread());
- }
-
- public String toString() {
- return getClass().getSimpleName() + "(file=" + file
- + ", threads=" + threads + ")";
- }
- }
+ private static boolean isUnlinkTmpFile(File f) {
+ String name = f.getName();
+ return name.endsWith(UNLINK_BLOCK_SUFFIX);
+ }
+
+ static File getUnlinkTmpFile(File f) {
+ return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
+ }
+
+ private static File getOrigFile(File unlinkTmpFile) {
+ String fileName = unlinkTmpFile.getName();
+ return new File(unlinkTmpFile.getParentFile(),
+ fileName.substring(0, fileName.length()-UNLINK_BLOCK_SUFFIX.length()));
+ }
static String getMetaFileName(String blockFileName, long genStamp) {
return blockFileName + "_" + genStamp + METADATA_EXTENSION;
@@ -638,6 +696,26 @@
return getMetaFile(getBlockFile(b), b);
}
+ /** Find the metadata file for the specified block file.
+ * Return the generation stamp from the name of the metafile.
+ */
+ private static long getGenerationStampFromFile(File[] listdir, File blockFile) {
+ String blockName = blockFile.getName();
+ for (int j = 0; j < listdir.length; j++) {
+ String path = listdir[j].getName();
+ if (!path.startsWith(blockName)) {
+ continue;
+ }
+ if (blockFile == listdir[j]) {
+ continue;
+ }
+ return Block.getGenerationStamp(listdir[j].getName());
+ }
+ DataNode.LOG.warn("Block " + blockFile +
+ " does not have a metafile!");
+ return GenerationStamp.GRANDFATHER_GENERATION_STAMP;
+ }
+
/** Find the corresponding meta data file from a given block file */
private static File findMetaFile(final File blockFile) throws IOException {
final String prefix = blockFile.getName() + "_";
@@ -675,22 +753,7 @@
/** Return the block file for the given ID */
public File findBlockFile(long blockId) {
- final Block b = new Block(blockId);
- File blockfile = null;
- ActiveFile activefile = ongoingCreates.get(b);
- if (activefile != null) {
- blockfile = activefile.file;
- }
- if (blockfile == null) {
- blockfile = getFile(b);
- }
- if (blockfile == null) {
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("ongoingCreates=" + ongoingCreates);
- DataNode.LOG.debug("volumeMap=" + volumeMap);
- }
- }
- return blockfile;
+ return getFile(blockId);
}
/** {@inheritDoc} */
@@ -704,6 +767,31 @@
parseGenerationStamp(blockfile, metafile));
}
+ /**
+ * Returns a clone of a replica stored in data-node memory.
+ * Should be primarily used for testing.
+ * @param blockId
+ * @return
+ */
+ synchronized ReplicaInfo fetchReplicaInfo(long blockId) {
+ ReplicaInfo r = volumeMap.get(blockId);
+ if(r == null)
+ return null;
+ switch(r.getState()) {
+ case FINALIZED:
+ return new FinalizedReplica((FinalizedReplica)r);
+ case RBW:
+ return new ReplicaBeingWritten((ReplicaBeingWritten)r);
+ case RWR:
+ return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered)r);
+ case RUR:
+ return new ReplicaUnderRecovery((ReplicaUnderRecovery)r);
+ case TEMPORARY:
+ return new ReplicaInPipeline((ReplicaInPipeline)r);
+ }
+ return null;
+ }
+
public boolean metaFileExists(Block b) throws IOException {
return getMetaFile(b).exists();
}
@@ -720,27 +808,54 @@
checksumFile.length());
}
+ static File createTmpFile(Block b, File f) throws IOException {
+ if (f.exists()) {
+ throw new IOException("Unexpected problem in creating temporary file for "+
+ b + ". File " + f + " should not be present, but is.");
+ }
+ // Create the zero-length temp file
+ //
+ boolean fileCreated = false;
+ try {
+ fileCreated = f.createNewFile();
+ } catch (IOException ioe) {
+ throw (IOException)new IOException(DISK_ERROR +f).initCause(ioe);
+ }
+ if (!fileCreated) {
+ throw new IOException("Unexpected problem in creating temporary file for "+
+ b + ". File " + f + " should be creatable, but is already present.");
+ }
+ return f;
+ }
+
FSVolumeSet volumes;
- private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
private int maxBlocksPerDir = 0;
- HashMap<Block,ReplicaInfo> volumeMap = null;
+ ReplicasMap volumeMap = new ReplicasMap();
static Random random = new Random();
+ FSDatasetAsyncDiskService asyncDiskService;
// Used for synchronizing access to usage stats
private Object statsLock = new Object();
+ boolean supportAppends = false;
+
/**
* An FSDataset has a directory where it loads its data files.
*/
public FSDataset(DataStorage storage, Configuration conf) throws IOException {
this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
+ this.supportAppends = conf.getBoolean("dfs.support.append", false);
FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
}
volumes = new FSVolumeSet(volArray);
- volumeMap = new HashMap<Block, ReplicaInfo>();
volumes.getVolumeMap(volumeMap);
+ File[] roots = new File[storage.getNumStorageDirs()];
+ for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
+ roots[idx] = storage.getStorageDir(idx).getCurrentDir();
+ }
+ asyncDiskService = new FSDatasetAsyncDiskService(roots);
registerMBean(storage.getStorageID());
}
@@ -814,22 +929,33 @@
}
/**
- * Returns handles to the block file and its metadata file
+ * Get the meta info of a block stored in volumeMap
+ * @param b block
+ * @return the meta replica information
+ * @throws IOException if no entry is in the map or
+ * there is a generation stamp mismatch
*/
- public synchronized BlockInputStreams getTmpInputStreams(Block b,
- long blkOffset, long ckoff) throws IOException {
-
+ private ReplicaInfo getReplicaInfo(Block b) throws IOException {
ReplicaInfo info = volumeMap.get(b);
if (info == null) {
throw new IOException("Block " + b + " does not exist in volumeMap.");
}
- FSVolume v = info.getVolume();
- File blockFile = v.getTmpFile(b);
+ return info;
+ }
+
+ /**
+ * Returns handles to the block file and its metadata file
+ */
+ public synchronized BlockInputStreams getTmpInputStreams(Block b,
+ long blkOffset, long ckoff) throws IOException {
+
+ ReplicaInfo info = getReplicaInfo(b);
+ File blockFile = info.getBlockFile();
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
if (blkOffset > 0) {
blockInFile.seek(blkOffset);
}
- File metaFile = getMetaFile(blockFile, b);
+ File metaFile = info.getMetaFile();
RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r");
if (ckoff > 0) {
metaInFile.seek(ckoff);
@@ -838,140 +964,32 @@
new FileInputStream(metaInFile.getFD()));
}
- private BlockWriteStreams createBlockWriteStreams( File f , File metafile) throws IOException {
- return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( f , "rw" ).getFD()),
- new FileOutputStream( new RandomAccessFile( metafile , "rw" ).getFD() ));
-
- }
-
/**
* Make a copy of the block if this block is linked to an existing
* snapshot. This ensures that modifying this block does not modify
* data in any existing snapshots.
* @param block Block
- * @param numLinks Detach if the number of links exceed this value
+ * @param numLinks Unlink if the number of links exceed this value
* @throws IOException
- * @return - true if the specified block was detached
+ * @return - true if the specified block was unlinked or the block
+ * is not in any snapshot.
*/
- public boolean detachBlock(Block block, int numLinks) throws IOException {
+ public boolean unlinkBlock(Block block, int numLinks) throws IOException {
ReplicaInfo info = null;
synchronized (this) {
- info = volumeMap.get(block);
+ info = getReplicaInfo(block);
}
- return info.detachBlock(block, numLinks);
- }
-
- static private <T> void updateBlockMap(Map<Block, T> blockmap,
- Block oldblock, Block newblock) throws IOException {
- if (blockmap.containsKey(oldblock)) {
- T value = blockmap.remove(oldblock);
- blockmap.put(newblock, value);
- }
- }
-
- /** {@inheritDoc} */
- public void updateBlock(Block oldblock, Block newblock) throws IOException {
- if (oldblock.getBlockId() != newblock.getBlockId()) {
- throw new IOException("Cannot update oldblock (=" + oldblock
- + ") to newblock (=" + newblock + ").");
- }
-
- for(;;) {
- final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
- if (threads == null) {
- return;
- }
-
- // interrupt and wait for all ongoing create threads
- for(Thread t : threads) {
- t.interrupt();
- }
- for(Thread t : threads) {
- try {
- t.join();
- } catch (InterruptedException e) {
- DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
- }
- }
- }
- }
-
- /**
- * Try to update an old block to a new block.
- * If there are ongoing create threads running for the old block,
- * the threads will be returned without updating the block.
- *
- * @return ongoing create threads if there is any. Otherwise, return null.
- */
- private synchronized List<Thread> tryUpdateBlock(
- Block oldblock, Block newblock) throws IOException {
- //check ongoing create threads
- final ActiveFile activefile = ongoingCreates.get(oldblock);
- if (activefile != null && !activefile.threads.isEmpty()) {
- //remove dead threads
- for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
- final Thread t = i.next();
- if (!t.isAlive()) {
- i.remove();
- }
- }
-
- //return living threads
- if (!activefile.threads.isEmpty()) {
- return new ArrayList<Thread>(activefile.threads);
- }
- }
-
- //No ongoing create threads is alive. Update block.
- File blockFile = findBlockFile(oldblock.getBlockId());
- if (blockFile == null) {
- throw new IOException("Block " + oldblock + " does not exist.");
- }
-
- File oldMetaFile = findMetaFile(blockFile);
- long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
-
- //rename meta file to a tmp file
- File tmpMetaFile = new File(oldMetaFile.getParent(),
- oldMetaFile.getName()+"_tmp" + newblock.getGenerationStamp());
- if (!oldMetaFile.renameTo(tmpMetaFile)){
- throw new IOException("Cannot rename block meta file to " + tmpMetaFile);
- }
-
- //update generation stamp
- if (oldgs > newblock.getGenerationStamp()) {
- throw new IOException("Cannot update block (id=" + newblock.getBlockId()
- + ") generation stamp from " + oldgs
- + " to " + newblock.getGenerationStamp());
- }
-
- //update length
- if (newblock.getNumBytes() > oldblock.getNumBytes()) {
- throw new IOException("Cannot update block file (=" + blockFile
- + ") length from " + oldblock.getNumBytes() + " to " + newblock.getNumBytes());
- }
- if (newblock.getNumBytes() < oldblock.getNumBytes()) {
- truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes());
- }
-
- //rename the tmp file to the new meta file (with new generation stamp)
- File newMetaFile = getMetaFile(blockFile, newblock);
- if (!tmpMetaFile.renameTo(newMetaFile)) {
- throw new IOException("Cannot rename tmp meta file to " + newMetaFile);
- }
-
- updateBlockMap(ongoingCreates, oldblock, newblock);
- updateBlockMap(volumeMap, oldblock, newblock);
-
- // paranoia! verify that the contents of the stored block
- // matches the block file on disk.
- validateBlockMetadata(newblock);
- return null;
+ return info.unlinkBlock(numLinks);
}
static private void truncateBlock(File blockFile, File metaFile,
long oldlen, long newlen) throws IOException {
+ DataNode.LOG.info("truncateBlock: blockFile=" + blockFile
+ + ", metaFile=" + metaFile
+ + ", oldlen=" + oldlen
+ + ", newlen=" + newlen);
+
if (newlen == oldlen) {
return;
}
@@ -1030,165 +1048,310 @@
}
}
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface append(Block b,
+ long newGS, long expectedBlockLen) throws IOException {
+ // If the block was successfully finalized because all packets
+ // were successfully processed at the Datanode but the ack for
+ // some of the packets were not received by the client. The client
+ // re-opens the connection and retries sending those packets.
+ // The other reason is that an "append" is occurring to this block.
+
+ // check the validity of the parameter
+ if (newGS < b.getGenerationStamp()) {
+ throw new IOException("The new generation stamp " + newGS +
+ " should be greater than the replica " + b + "'s generation stamp");
+ }
+ ReplicaInfo replicaInfo = volumeMap.get(b);
+ if (replicaInfo == null) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+ }
+ DataNode.LOG.info("Appending to replica " + replicaInfo);
+ if (replicaInfo.getState() != ReplicaState.FINALIZED) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
+ }
+ if (replicaInfo.getNumBytes() != expectedBlockLen) {
+ throw new IOException("Corrupted replica " + replicaInfo +
+ " with a length of " + replicaInfo.getNumBytes() +
+ " expected length is " + expectedBlockLen);
+ }
+
+ return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+ }
+
+ /** Append to a finalized replica
+ * Change a finalized replica to be a RBW replica and
+ * bump its generation stamp to be the newGS
+ *
+ * @param replicaInfo a finalized replica
+ * @param newGS new generation stamp
+ * @param estimateBlockLen estimate generation stamp
+ * @return a RBW replica
+ * @throws IOException if moving the replica from finalized directory
+ * to rbw directory fails
+ */
+ private synchronized ReplicaBeingWritten append(FinalizedReplica replicaInfo,
+ long newGS, long estimateBlockLen) throws IOException {
+ // unlink the finalized replica
+ replicaInfo.unlinkBlock(1);
+
+ // construct a RBW replica with the new GS
+ File blkfile = replicaInfo.getBlockFile();
+ FSVolume v = volumes.getNextVolume(estimateBlockLen);
+ File newBlkFile = new File(v.rbwDir, replicaInfo.getBlockName());
+ File oldmeta = replicaInfo.getMetaFile();
+ ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
+ replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
+ v, newBlkFile.getParentFile(), Thread.currentThread());
+ File newmeta = newReplicaInfo.getMetaFile();
+
+ // rename meta file to rbw directory
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+ }
+ if (!oldmeta.renameTo(newmeta)) {
+ throw new IOException("Block " + replicaInfo + " reopen failed. " +
+ " Unable to move meta file " + oldmeta +
+ " to rbw dir " + newmeta);
+ }
+
+ // rename block file to rbw directory
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("Renaming " + blkfile + " to " + newBlkFile);
+ DataNode.LOG.debug("Old block file length is " + blkfile.length());
+ }
+ if (!blkfile.renameTo(newBlkFile)) {
+ if (!newmeta.renameTo(oldmeta)) { // restore the meta file
+ DataNode.LOG.warn("Cannot move meta file " + newmeta +
+ "back to the finalized directory " + oldmeta);
+ }
+ throw new IOException("Block " + replicaInfo + " reopen failed. " +
+ " Unable to move block file " + blkfile +
+ " to rbw dir " + newBlkFile);
+ }
+
+ // Replace finalized replica by a RBW replica in replicas map
+ volumeMap.add(newReplicaInfo);
+
+ return newReplicaInfo;
+ }
+
+ private ReplicaInfo recoverCheck(Block b, long newGS,
+ long expectedBlockLen) throws IOException {
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+ if (replicaInfo == null) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+ }
+
+ // check state
+ if (replicaInfo.getState() != ReplicaState.FINALIZED &&
+ replicaInfo.getState() != ReplicaState.RBW) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
+ }
+
+ // check generation stamp
+ long replicaGenerationStamp = replicaInfo.getGenerationStamp();
+ if (replicaGenerationStamp < b.getGenerationStamp() ||
+ replicaGenerationStamp > newGS) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + replicaGenerationStamp
+ + ". Expected GS range is [" + b.getGenerationStamp() + ", " +
+ newGS + "].");
+ }
+
+ // stop the previous writer before check a replica's length
+ long replicaLen = replicaInfo.getNumBytes();
+ if (replicaInfo.getState() == ReplicaState.RBW) {
+ ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
+ // kill the previous writer
+ rbw.stopWriter();
+ rbw.setWriter(Thread.currentThread());
+ // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
+ if (replicaLen != rbw.getBytesOnDisk()
+ || replicaLen != rbw.getBytesAcked()) {
+ throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo +
+ "bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" +
+ rbw.getBytesOnDisk() + "), and bytesAcked(" + rbw.getBytesAcked() +
+ ") are not the same.");
+ }
+ }
+
+ // check block length
+ if (replicaLen != expectedBlockLen) {
+ throw new IOException("Corrupted replica " + replicaInfo +
+ " with a length of " + replicaLen +
+ " expected length is " + expectedBlockLen);
+ }
+
+ return replicaInfo;
+ }
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+ long newGS, long expectedBlockLen) throws IOException {
+ DataNode.LOG.info("Recover failed append to " + b);
+
+ ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
+
+ // change the replica's state/gs etc.
+ if (replicaInfo.getState() == ReplicaState.FINALIZED ) {
+ return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+ } else { //RBW
+ bumpReplicaGS(replicaInfo, newGS);
+ return (ReplicaBeingWritten)replicaInfo;
+ }
+ }
+
+ @Override
+ public void recoverClose(Block b, long newGS,
+ long expectedBlockLen) throws IOException {
+ DataNode.LOG.info("Recover failed close " + b);
+ // check replica's state
+ ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
+ // bump the replica's GS
+ bumpReplicaGS(replicaInfo, newGS);
+ // finalize the replica if RBW
+ if (replicaInfo.getState() == ReplicaState.RBW) {
+ finalizeBlock(replicaInfo);
+ }
+ }
+
/**
- * Start writing to a block file
- * If isRecovery is true and the block pre-exists, then we kill all
- volumeMap.put(b, v);
- volumeMap.put(b, v);
- * other threads that might be writing to this block, and then reopen the file.
- */
- public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException {
- //
- // Make sure the block isn't a valid one - we're still creating it!
- //
- if (isValidBlock(b)) {
- if (!isRecovery) {
- throw new BlockAlreadyExistsException("Block " + b + " is valid, and cannot be written to.");
- }
- // If the block was successfully finalized because all packets
- // were successfully processed at the Datanode but the ack for
- // some of the packets were not received by the client. The client
- // re-opens the connection and retries sending those packets.
- // The other reason is that an "append" is occurring to this block.
- detachBlock(b, 1);
+ * Bump a replica's generation stamp to a new one.
+ * Its on-disk meta file name is renamed to be the new one too.
+ *
+ * @param replicaInfo a replica
+ * @param newGS new generation stamp
+ * @throws IOException if rename fails
+ */
+ private void bumpReplicaGS(ReplicaInfo replicaInfo,
+ long newGS) throws IOException {
+ long oldGS = replicaInfo.getGenerationStamp();
+ File oldmeta = replicaInfo.getMetaFile();
+ replicaInfo.setGenerationStamp(newGS);
+ File newmeta = replicaInfo.getMetaFile();
+
+ // rename meta file to new GS
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+ }
+ if (!oldmeta.renameTo(newmeta)) {
+ replicaInfo.setGenerationStamp(oldGS); // restore old GS
+ throw new IOException("Block " + (Block)replicaInfo + " reopen failed. " +
+ " Unable to move meta file " + oldmeta +
+ " to " + newmeta);
}
- long blockSize = b.getNumBytes();
+ }
- //
- // Serialize access to /tmp, and check if file already there.
- //
- File f = null;
- List<Thread> threads = null;
- synchronized (this) {
- //
- // Is it already in the create process?
- //
- ActiveFile activeFile = ongoingCreates.get(b);
- if (activeFile != null) {
- f = activeFile.file;
- threads = activeFile.threads;
-
- if (!isRecovery) {
- throw new BlockAlreadyExistsException("Block " + b +
- " has already been started (though not completed), and thus cannot be created.");
- } else {
- for (Thread thread:threads) {
- thread.interrupt();
- }
- }
- ongoingCreates.remove(b);
- }
- FSVolume v = null;
- if (!isRecovery) {
- v = volumes.getNextVolume(blockSize);
- // create temporary file to hold block in the designated volume
- f = createTmpFile(v, b);
- volumeMap.put(b, new ReplicaInfo(v));
- } else if (f != null) {
- DataNode.LOG.info("Reopen already-open Block for append " + b);
- // create or reuse temporary file to hold block in the designated volume
- v = volumeMap.get(b).getVolume();
- volumeMap.put(b, new ReplicaInfo(v));
- } else {
- // reopening block for appending to it.
- DataNode.LOG.info("Reopen Block for append " + b);
- v = volumeMap.get(b).getVolume();
- f = createTmpFile(v, b);
- File blkfile = getBlockFile(b);
- File oldmeta = getMetaFile(b);
- File newmeta = getMetaFile(f, b);
-
- // rename meta file to tmp directory
- DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
- if (!oldmeta.renameTo(newmeta)) {
- throw new IOException("Block " + b + " reopen failed. " +
- " Unable to move meta file " + oldmeta +
- " to tmp dir " + newmeta);
- }
-
- // rename block file to tmp directory
- DataNode.LOG.debug("Renaming " + blkfile + " to " + f);
- if (!blkfile.renameTo(f)) {
- if (!f.delete()) {
- throw new IOException("Block " + b + " reopen failed. " +
- " Unable to remove file " + f);
- }
- if (!blkfile.renameTo(f)) {
- throw new IOException("Block " + b + " reopen failed. " +
- " Unable to move block file " + blkfile +
- " to tmp dir " + f);
- }
- }
- volumeMap.put(b, new ReplicaInfo(v));
- }
- if (f == null) {
- DataNode.LOG.warn("Block " + b + " reopen failed " +
- " Unable to locate tmp file.");
- throw new IOException("Block " + b + " reopen failed " +
- " Unable to locate tmp file.");
- }
- ongoingCreates.put(b, new ActiveFile(f, threads));
+ @Override
+ public synchronized ReplicaInPipelineInterface createRbw(Block b)
+ throws IOException {
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+ if (replicaInfo != null) {
+ throw new ReplicaAlreadyExistsException("Block " + b +
+ " already exists in state " + replicaInfo.getState() +
+ " and thus cannot be created.");
+ }
+ // create a new block
+ FSVolume v = volumes.getNextVolume(b.getNumBytes());
+ // create a rbw file to hold block in the designated volume
+ File f = v.createRbwFile(b);
+ ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
+ b.getGenerationStamp(), v, f.getParentFile());
+ volumeMap.add(newReplicaInfo);
+ return newReplicaInfo;
+ }
+
+ @Override
+ public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
+ long newGS, long minBytesRcvd, long maxBytesRcvd)
+ throws IOException {
+ DataNode.LOG.info("Recover the RBW replica " + b);
+
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+ if (replicaInfo == null) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+ }
+
+ // check the replica's state
+ if (replicaInfo.getState() != ReplicaState.RBW) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
}
+ ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
+
+ DataNode.LOG.info("Recovering replica " + rbw);
- try {
- if (threads != null) {
- for (Thread thread:threads) {
- thread.join();
- }
- }
- } catch (InterruptedException e) {
- throw new IOException("Recovery waiting for thread interrupted.");
+ // Stop the previous writer
+ rbw.stopWriter();
+ rbw.setWriter(Thread.currentThread());
+
+ // check generation stamp
+ long replicaGenerationStamp = rbw.getGenerationStamp();
+ if (replicaGenerationStamp < b.getGenerationStamp() ||
+ replicaGenerationStamp > newGS) {
+ throw new ReplicaNotFoundException(
+ ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
+ ". Expected GS range is [" + b.getGenerationStamp() + ", " +
+ newGS + "].");
+ }
+
+ // check replica length
+ if (rbw.getBytesAcked() < minBytesRcvd || rbw.getNumBytes() > maxBytesRcvd){
+ throw new ReplicaNotFoundException("Unmatched length replica " +
+ replicaInfo + ": BytesAcked = " + rbw.getBytesAcked() +
+ " BytesRcvd = " + rbw.getNumBytes() + " are not in the range of [" +
+ minBytesRcvd + ", " + maxBytesRcvd + "].");
}
- //
- // Finally, allow a writer to the block file
- // REMIND - mjc - make this a filter stream that enforces a max
- // block size, so clients can't go crazy
- //
- File metafile = getMetaFile(f, b);
- DataNode.LOG.debug("writeTo blockfile is " + f + " of size " + f.length());
- DataNode.LOG.debug("writeTo metafile is " + metafile + " of size " + metafile.length());
- return createBlockWriteStreams( f , metafile);
+ // bump the replica's generation stamp to newGS
+ bumpReplicaGS(rbw, newGS);
+
+ return rbw;
}
-
- /**
- * Retrieves the offset in the block to which the
- * the next write will write data to.
- */
- public long getChannelPosition(Block b, BlockWriteStreams streams)
- throws IOException {
- FileOutputStream file = (FileOutputStream) streams.dataOut;
- return file.getChannel().position();
+
+ @Override
+ public synchronized ReplicaInPipelineInterface createTemporary(Block b)
+ throws IOException {
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+ if (replicaInfo != null) {
+ throw new ReplicaAlreadyExistsException("Block " + b +
+ " already exists in state " + replicaInfo.getState() +
+ " and thus cannot be created.");
+ }
+
+ FSVolume v = volumes.getNextVolume(b.getNumBytes());
+ // create a temporary file to hold block in the designated volume
+ File f = v.createTmpFile(b);
+ ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
+ b.getGenerationStamp(), v, f.getParentFile());
+ volumeMap.add(newReplicaInfo);
+
+ return newReplicaInfo;
}
/**
- * Sets the offset in the block to which the
- * the next write will write data to.
- */
- public void setChannelPosition(Block b, BlockWriteStreams streams,
- long dataOffset, long ckOffset)
- throws IOException {
- long size = 0;
- synchronized (this) {
- FSVolume vol = volumeMap.get(b).getVolume();
- size = vol.getTmpFile(b).length();
- }
- if (size < dataOffset) {
- String msg = "Trying to change block file offset of block " + b +
- " to " + dataOffset +
- " but actual size of file is " +
- size;
- throw new IOException(msg);
- }
- FileOutputStream file = (FileOutputStream) streams.dataOut;
- file.getChannel().position(dataOffset);
- file = (FileOutputStream) streams.checksumOut;
- file.getChannel().position(ckOffset);
+ * Sets the offset in the meta file so that the
+ * last checksum will be overwritten.
+ */
+ public void adjustCrcChannelPosition(Block b, BlockWriteStreams streams,
+ int checksumSize) throws IOException {
+ FileOutputStream file = (FileOutputStream) streams.checksumOut;
+ FileChannel channel = file.getChannel();
+ long oldPos = channel.position();
+ long newPos = oldPos - checksumSize;
+ DataNode.LOG.info("Changing meta file offset of block " + b + " from " +
+ oldPos + " to " + newPos);
+ channel.position(newPos);
}
synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {
if ( vol == null ) {
- vol = volumeMap.get( blk ).getVolume();
+ vol = getReplicaInfo( blk ).getVolume();
if ( vol == null ) {
throw new IOException("Could not find volume for block " + blk);
}
@@ -1208,40 +1371,52 @@
* Complete the block write!
*/
public synchronized void finalizeBlock(Block b) throws IOException {
- ActiveFile activeFile = ongoingCreates.get(b);
- if (activeFile == null) {
- throw new IOException("Block " + b + " is already finalized.");
- }
- File f = activeFile.file;
- if (f == null || !f.exists()) {
- throw new IOException("No temporary file " + f + " for block " + b);
- }
- FSVolume v = volumeMap.get(b).getVolume();
- if (v == null) {
- throw new IOException("No volume for temporary file " + f +
- " for block " + b);
+ ReplicaInfo replicaInfo = getReplicaInfo(b);
+ if (replicaInfo.getState() == ReplicaState.FINALIZED) {
+ // this is legal, when recovery happens on a file that has
+ // been opened for append but never modified
+ return;
}
-
- File dest = null;
- dest = v.addBlock(b, f);
- volumeMap.put(b, new ReplicaInfo(v, dest));
- ongoingCreates.remove(b);
+ finalizeReplica(replicaInfo);
+ }
+
+ private synchronized FinalizedReplica finalizeReplica(ReplicaInfo replicaInfo)
+ throws IOException {
+ FinalizedReplica newReplicaInfo = null;
+ if (replicaInfo.getState() == ReplicaState.RUR &&
+ ((ReplicaUnderRecovery)replicaInfo).getOrignalReplicaState() ==
+ ReplicaState.FINALIZED) {
+ newReplicaInfo = (FinalizedReplica)
+ ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
+ } else {
+ FSVolume v = replicaInfo.getVolume();
+ File f = replicaInfo.getBlockFile();
+ if (v == null) {
+ throw new IOException("No volume for temporary file " + f +
+ " for block " + replicaInfo);
+ }
+
+ File dest = v.addBlock(replicaInfo, f);
+ newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
+ }
+ volumeMap.add(newReplicaInfo);
+ return newReplicaInfo;
}
/**
* Remove the temporary block file (if any)
*/
public synchronized void unfinalizeBlock(Block b) throws IOException {
- // remove the block from in-memory data structure
- ActiveFile activefile = ongoingCreates.remove(b);
- if (activefile == null) {
- return;
- }
- volumeMap.remove(b);
-
- // delete the on-disk temp file
- if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
- DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+ ReplicaInfo replicaInfo = volumeMap.get(b);
+ if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
+ // remove from volumeMap
+ volumeMap.remove(b);
+
+ // delete the on-disk temp file
+ if (delBlockFromDisk(replicaInfo.getBlockFile(),
+ replicaInfo.getMetaFile(), b)) {
+ DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+ }
}
}
@@ -1272,18 +1447,34 @@
}
/**
- * Return finalized blocks from the in-memory blockmap
+ * Generates a block report from the in-memory block map.
*/
- public Block[] getBlockReport() {
- ArrayList<Block> list = new ArrayList<Block>(volumeMap.size());
+ public BlockListAsLongs getBlockReport() {
+ ArrayList<ReplicaInfo> finalized =
+ new ArrayList<ReplicaInfo>(volumeMap.size());
+ ArrayList<ReplicaInfo> uc = new ArrayList<ReplicaInfo>();
synchronized(this) {
- for (Block b : volumeMap.keySet()) {
- if (!ongoingCreates.containsKey(b)) {
- list.add(new Block(b));
+ for (ReplicaInfo b : volumeMap.replicas()) {
+ switch(b.getState()) {
+ case FINALIZED:
+ finalized.add(b);
+ break;
+ case RBW:
+ case RWR:
+ uc.add(b);
+ break;
+ case RUR:
+ ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
+ uc.add(rur.getOriginalReplica());
+ break;
+ case TEMPORARY:
+ break;
+ default:
+ assert false : "Illegal ReplicaInfo state.";
}
}
+ return new BlockListAsLongs(finalized, uc);
}
- return list.toArray(new Block[list.size()]);
}
/**
@@ -1293,7 +1484,7 @@
* is needed to handle concurrent modification to the block.
*/
synchronized Block[] getBlockList(boolean deepcopy) {
- Block[] list = volumeMap.keySet().toArray(new Block[volumeMap.size()]);
+ Block[] list = volumeMap.replicas().toArray(new Block[volumeMap.size()]);
if (deepcopy) {
for (int i = 0; i < list.length; i++) {
list[i] = new Block(list[i]);
@@ -1303,17 +1494,29 @@
}
/**
+ * Get the list of finalized blocks from in-memory blockmap.
+ */
+ synchronized List<Block> getFinalizedBlocks() {
+ ArrayList<Block> finalized = new ArrayList<Block>(volumeMap.size());
+ for (ReplicaInfo b : volumeMap.replicas()) {
+ if(b.getState() == ReplicaState.FINALIZED) {
+ finalized.add(new Block(b));
+ }
+ }
+ return finalized;
+ }
+
+ /**
* Check whether the given block is a valid one.
+ * valid means finalized
*/
public boolean isValidBlock(Block b) {
- File f = null;;
- try {
- f = validateBlockFile(b);
- } catch(IOException e) {
- Log.warn("Block " + b + " is not valid:",e);
+ ReplicaInfo replicaInfo = volumeMap.get(b);
+ if (replicaInfo == null ||
+ replicaInfo.getState() != ReplicaState.FINALIZED) {
+ return false;
}
-
- return f != null;
+ return replicaInfo.getBlockFile().exists();
}
/**
@@ -1338,51 +1541,25 @@
return null;
}
- /** {@inheritDoc} */
- public void validateBlockMetadata(Block b) throws IOException {
- ReplicaInfo info = volumeMap.get(b);
- if (info == null) {
- throw new IOException("Block " + b + " does not exist in volumeMap.");
+ /** Check the files of a replica. */
+ static void checkReplicaFiles(final ReplicaInfo r) throws IOException {
+ //check replica's file
+ final File f = r.getBlockFile();
+ if (!f.exists()) {
+ throw new FileNotFoundException("File " + f + " not found, r=" + r);
}
- FSVolume v = info.getVolume();
- File tmp = v.getTmpFile(b);
- File f = getFile(b);
- if (f == null) {
- f = tmp;
+ if (r.getBytesOnDisk() != f.length()) {
+ throw new IOException("File length mismatched. The length of "
+ + f + " is " + f.length() + " but r=" + r);
}
- if (f == null) {
- throw new IOException("Block " + b + " does not exist on disk.");
+
+ //check replica's meta file
+ final File metafile = getMetaFile(f, r);
+ if (!metafile.exists()) {
+ throw new IOException("Metafile " + metafile + " does not exist, r=" + r);
}
- if (!f.exists()) {
- throw new IOException("Block " + b +
- " block file " + f +
- " does not exist on disk.");
- }
- if (b.getNumBytes() != f.length()) {
- throw new IOException("Block " + b +
- " length is " + b.getNumBytes() +
- " does not match block file length " +
- f.length());
- }
- File meta = getMetaFile(f, b);
- if (meta == null) {
- throw new IOException("Block " + b +
- " metafile does not exist.");
- }
- if (!meta.exists()) {
- throw new IOException("Block " + b +
- " metafile " + meta +
- " does not exist on disk.");
- }
- if (meta.length() == 0) {
- throw new IOException("Block " + b + " metafile " + meta + " is empty.");
- }
- long stamp = parseGenerationStamp(f, meta);
- if (stamp != b.getGenerationStamp()) {
- throw new IOException("Block " + b +
- " genstamp is " + b.getGenerationStamp() +
- " does not match meta file stamp " +
- stamp);
+ if (metafile.length() == 0) {
+ throw new IOException("Metafile " + metafile + " is empty, r=" + r);
}
}
@@ -1399,7 +1576,8 @@
synchronized (this) {
f = getFile(invalidBlks[i]);
ReplicaInfo dinfo = volumeMap.get(invalidBlks[i]);
- if (dinfo == null) {
+ if (dinfo == null ||
+ dinfo.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) {
DataNode.LOG.warn("Unexpected error trying to delete block "
+ invalidBlks[i] +
". BlockInfo not found in volumeMap.");
@@ -1431,26 +1609,20 @@
error = true;
continue;
}
- v.clearPath(parent);
+ ReplicaState replicaState = dinfo.getState();
+ if (replicaState == ReplicaState.FINALIZED ||
+ (replicaState == ReplicaState.RUR &&
+ ((ReplicaUnderRecovery)dinfo).getOrignalReplicaState() ==
+ ReplicaState.FINALIZED)) {
+ v.clearPath(parent);
+ }
volumeMap.remove(invalidBlks[i]);
}
File metaFile = getMetaFile( f, invalidBlks[i] );
- long blockSize = f.length()+metaFile.length();
- if ( !f.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
- DataNode.LOG.warn("Unexpected error trying to delete block "
- + invalidBlks[i] + " at file " + f);
- error = true;
- continue;
- }
- v.decDfsUsed(blockSize);
- DataNode.LOG.info("Deleting block " + invalidBlks[i] + " file " + f);
- if (f.exists()) {
- //
- // This is a temporary check especially for hadoop-1220.
- // This will go away in the future.
- //
- DataNode.LOG.info("File " + f + " was deleted but still exists!");
- }
+ long dfsBytes = f.length() + metaFile.length();
+
+ // Delete the block asynchronously to make sure we can do it fast enough
+ asyncDiskService.deleteAsync(v, f, metaFile, dfsBytes, invalidBlks[i].toString());
}
if (error) {
throw new IOException("Error in deleting blocks.");
@@ -1458,16 +1630,24 @@
}
/**
- * Turn the block identifier into a filename.
+ * Turn the block identifier into a filename; ignore generation stamp!!!
*/
public synchronized File getFile(Block b) {
- ReplicaInfo info = volumeMap.get(b);
+ return getFile(b.getBlockId());
+ }
+
+ /**
+ * Turn the block identifier into a filename
+ * @param blockId a block's id
+ * @return on disk data file path; null if the replica does not exist
+ */
+ private File getFile(long blockId) {
+ ReplicaInfo info = volumeMap.get(blockId);
if (info != null) {
- return info.getFile();
+ return info.getBlockFile();
}
- return null;
+ return null;
}
-
/**
* check if a data directory is healthy
* if some volumes failed - make sure to remove all the blocks that belong
@@ -1486,12 +1666,12 @@
// remove related blocks
long mlsec = System.currentTimeMillis();
synchronized (this) {
- Iterator<Block> ib = volumeMap.keySet().iterator();
+ Iterator<ReplicaInfo> ib = volumeMap.replicas().iterator();
while(ib.hasNext()) {
- Block b = ib.next();
+ ReplicaInfo b = ib.next();
total_blocks ++;
// check if the volume block belongs to still valid
- FSVolume vol = volumeMap.get(b).getVolume();
+ FSVolume vol = b.getVolume();
for(FSVolume fv: failed_vols) {
if(vol == fv) {
DataNode.LOG.warn("removing block " + b.getBlockId() + " from vol "
@@ -1504,7 +1684,7 @@
}
} // end of sync
mlsec = System.currentTimeMillis() - mlsec;
- DataNode.LOG.warn(">>>>>>>>>>>>Removed " + removed_blocks + " out of " + total_blocks +
+ DataNode.LOG.warn("Removed " + removed_blocks + " out of " + total_blocks +
"(took " + mlsec + " millisecs)");
// report the error
@@ -1554,6 +1734,10 @@
if (mbeanName != null)
MBeanUtil.unregisterMBean(mbeanName);
+ if (asyncDiskService != null) {
+ asyncDiskService.shutdown();
+ }
+
if(volumes != null) {
for (FSVolume volume : volumes.volumes) {
if(volume != null) {
@@ -1592,20 +1776,20 @@
*/
public void checkAndUpdate(long blockId, File diskFile,
File diskMetaFile, FSVolume vol) {
- Block block = new Block(blockId);
DataNode datanode = DataNode.getDataNode();
Block corruptBlock = null;
+ ReplicaInfo memBlockInfo;
synchronized (this) {
- if (ongoingCreates.get(block) != null) {
+ memBlockInfo = volumeMap.get(blockId);
+ if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
// Block is not finalized - ignore the difference
return;
}
final long diskGS = diskMetaFile != null && diskMetaFile.exists() ?
Block.getGenerationStamp(diskMetaFile.getName()) :
- Block.GRANDFATHER_GENERATION_STAMP;
+ GenerationStamp.GRANDFATHER_GENERATION_STAMP;
- ReplicaInfo memBlockInfo = volumeMap.get(block);
if (diskFile == null || !diskFile.exists()) {
if (memBlockInfo == null) {
// Block file does not exist and block does not exist in memory
@@ -1617,14 +1801,14 @@
}
return;
}
- if (!memBlockInfo.getFile().exists()) {
+ if (!memBlockInfo.getBlockFile().exists()) {
// Block is in memory and not on the disk
// Remove the block from volumeMap
- volumeMap.remove(block);
+ volumeMap.remove(blockId);
if (datanode.blockScanner != null) {
- datanode.blockScanner.deleteBlock(block);
+ datanode.blockScanner.deleteBlock(new Block(blockId));
}
- DataNode.LOG.warn("Removed block " + block.getBlockId()
+ DataNode.LOG.warn("Removed block " + blockId
+ " from memory with missing block file on the disk");
// Finally remove the metadata file
if (diskMetaFile != null && diskMetaFile.exists()
@@ -1640,23 +1824,20 @@
*/
if (memBlockInfo == null) {
// Block is missing in memory - add the block to volumeMap
- ReplicaInfo diskBlockInfo = new ReplicaInfo(vol, diskFile);
- Block diskBlock = new Block(diskFile, diskFile.length(), diskGS);
- volumeMap.put(diskBlock, diskBlockInfo);
+ ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId,
+ diskFile.length(), diskGS, vol, diskFile.getParentFile());
+ volumeMap.add(diskBlockInfo);
if (datanode.blockScanner != null) {
- datanode.blockScanner.addBlock(diskBlock);
+ datanode.blockScanner.addBlock(diskBlockInfo);
}
- DataNode.LOG.warn("Added missing block to memory " + diskBlock);
+ DataNode.LOG.warn("Added missing block to memory " + (Block)diskBlockInfo);
return;
}
/*
* Block exists in volumeMap and the block file exists on the disk
*/
- // Iterate to get key from volumeMap for the blockId
- Block memBlock = getBlockKey(blockId);
-
// Compare block files
- File memFile = memBlockInfo.getFile();
+ File memFile = memBlockInfo.getBlockFile();
if (memFile.exists()) {
if (memFile.compareTo(diskFile) != 0) {
DataNode.LOG.warn("Block file " + memFile.getAbsolutePath()
@@ -1673,19 +1854,17 @@
+ memFile.getAbsolutePath()
+ " does not exist. Updating it to the file found during scan "
+ diskFile.getAbsolutePath());
- ReplicaInfo info = volumeMap.remove(memBlock);
- info.setFile(diskFile);
+ memBlockInfo.setDir(diskFile.getParentFile());
memFile = diskFile;
DataNode.LOG.warn("Updating generation stamp for block " + blockId
- + " from " + memBlock.getGenerationStamp() + " to " + diskGS);
- memBlock.setGenerationStamp(diskGS);
- volumeMap.put(memBlock, info);
+ + " from " + memBlockInfo.getGenerationStamp() + " to " + diskGS);
+ memBlockInfo.setGenerationStamp(diskGS);
}
// Compare generation stamp
- if (memBlock.getGenerationStamp() != diskGS) {
- File memMetaFile = getMetaFile(diskFile, memBlock);
+ if (memBlockInfo.getGenerationStamp() != diskGS) {
+ File memMetaFile = getMetaFile(diskFile, memBlockInfo);
if (memMetaFile.exists()) {
if (memMetaFile.compareTo(diskMetaFile) != 0) {
DataNode.LOG.warn("Metadata file in memory "
@@ -1699,26 +1878,22 @@
// as the block file, then use the generation stamp from it
long gs = diskMetaFile != null && diskMetaFile.exists()
&& diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
- : Block.GRANDFATHER_GENERATION_STAMP;
+ : GenerationStamp.GRANDFATHER_GENERATION_STAMP;
DataNode.LOG.warn("Updating generation stamp for block " + blockId
- + " from " + memBlock.getGenerationStamp() + " to " + gs);
+ + " from " + memBlockInfo.getGenerationStamp() + " to " + gs);
- ReplicaInfo info = volumeMap.remove(memBlock);
- memBlock.setGenerationStamp(gs);
- volumeMap.put(memBlock, info);
+ memBlockInfo.setGenerationStamp(gs);
}
}
// Compare block size
- if (memBlock.getNumBytes() != memFile.length()) {
+ if (memBlockInfo.getNumBytes() != memFile.length()) {
// Update the length based on the block file
- corruptBlock = new Block(memBlock);
+ corruptBlock = new Block(memBlockInfo);
DataNode.LOG.warn("Updating size of block " + blockId + " from "
- + memBlock.getNumBytes() + " to " + memFile.length());
- ReplicaInfo info = volumeMap.remove(memBlock);
- memBlock.setNumBytes(memFile.length());
- volumeMap.put(memBlock, info);
+ + memBlockInfo.getNumBytes() + " to " + memFile.length());
+ memBlockInfo.setNumBytes(memFile.length());
}
}
@@ -1738,18 +1913,172 @@
}
/**
- * Get reference to the key in the volumeMap. To be called from methods that
- * are synchronized on {@link FSDataset}
- * @param blockId
- * @return key from the volumeMap
+ * @deprecated use {@link #fetchReplicaInfo(long)} instead.
*/
- Block getBlockKey(long blockId) {
+ @Override
+ @Deprecated
+ public ReplicaInfo getReplica(long blockId) {
assert(Thread.holdsLock(this));
- for (Block b : volumeMap.keySet()) {
- if (b.getBlockId() == blockId) {
- return b;
+ return volumeMap.get(blockId);
+ }
+
+ @Override // FSDatasetInterface
+ public synchronized ReplicaRecoveryInfo initReplicaRecovery(
+ RecoveringBlock rBlock) throws IOException {
+ return initReplicaRecovery(
+ volumeMap, rBlock.getBlock(), rBlock.getNewGenerationStamp());
+ }
+
+ /** static version of {@link #initReplicaRecovery(Block, long)}. */
+ static ReplicaRecoveryInfo initReplicaRecovery(
+ ReplicasMap map, Block block, long recoveryId) throws IOException {
+ final ReplicaInfo replica = map.get(block.getBlockId());
+ DataNode.LOG.info("initReplicaRecovery: block=" + block
+ + ", recoveryId=" + recoveryId
+ + ", replica=" + replica);
+
+ //check replica
+ if (replica == null) {
+ return null;
+ }
+
+ //stop writer if there is any
+ if (replica instanceof ReplicaInPipeline) {
+ final ReplicaInPipeline rip = (ReplicaInPipeline)replica;
+ rip.stopWriter();
+
+ //check replica bytes on disk.
+ if (rip.getBytesOnDisk() < rip.getVisibleLength()) {
+ throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
+ + " getBytesOnDisk() < getVisibleLength(), rip=" + rip);
}
+
+ //check the replica's files
+ checkReplicaFiles(rip);
}
- return null;
+
+ //check generation stamp
+ if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+ throw new IOException(
+ "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+ + block + ", replica=" + replica);
+ }
+
+ //check recovery id
+ if (replica.getGenerationStamp() >= recoveryId) {
+ throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
+ + " replica.getGenerationStamp() >= recoveryId = " + recoveryId
+ + ", block=" + block + ", replica=" + replica);
+ }
+
+ //check RUR
+ final ReplicaUnderRecovery rur;
+ if (replica.getState() == ReplicaState.RUR) {
+ rur = (ReplicaUnderRecovery)replica;
+ if (rur.getRecoveryID() >= recoveryId) {
+ throw new RecoveryInProgressException(
+ "rur.getRecoveryID() >= recoveryId = " + recoveryId
+ + ", block=" + block + ", rur=" + rur);
+ }
+ final long oldRecoveryID = rur.getRecoveryID();
+ rur.setRecoveryID(recoveryId);
+ DataNode.LOG.info("initReplicaRecovery: update recovery id for " + block
+ + " from " + oldRecoveryID + " to " + recoveryId);
+ }
+ else {
+ rur = new ReplicaUnderRecovery(replica, recoveryId);
+ map.add(rur);
+ DataNode.LOG.info("initReplicaRecovery: changing replica state for "
+ + block + " from " + replica.getState()
+ + " to " + rur.getState());
+ }
+ return rur.createInfo();
+ }
+
+ @Override // FSDatasetInterface
+ public synchronized ReplicaInfo updateReplicaUnderRecovery(
+ final Block oldBlock,
+ final long recoveryId,
+ final long newlength) throws IOException {
+ //get replica
+ final ReplicaInfo replica = volumeMap.get(oldBlock.getBlockId());
+ DataNode.LOG.info("updateReplica: block=" + oldBlock
+ + ", recoveryId=" + recoveryId
+ + ", length=" + newlength
+ + ", replica=" + replica);
+
+ //check replica
+ if (replica == null) {
+ throw new ReplicaNotFoundException(oldBlock);
+ }
+
+ //check replica state
+ if (replica.getState() != ReplicaState.RUR) {
+ throw new IOException("replica.getState() != " + ReplicaState.RUR
+ + ", replica=" + replica);
+ }
+
+ //check replica's byte on disk
+ if (replica.getBytesOnDisk() != oldBlock.getNumBytes()) {
+ throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
+ + " replica.getBytesOnDisk() != block.getNumBytes(), block="
+ + oldBlock + ", replica=" + replica);
+ }
+
+ //check replica files before update
+ checkReplicaFiles(replica);
+
+ //update replica
+ final FinalizedReplica finalized = updateReplicaUnderRecovery(
+ (ReplicaUnderRecovery)replica, recoveryId, newlength);
+
+ //check replica files after update
+ checkReplicaFiles(finalized);
+ return finalized;
+ }
+
+ private FinalizedReplica updateReplicaUnderRecovery(
+ ReplicaUnderRecovery rur,
+ long recoveryId,
+ long newlength) throws IOException {
+ //check recovery id
+ if (rur.getRecoveryID() != recoveryId) {
+ throw new IOException("rur.getRecoveryID() != recoveryId = " + recoveryId
+ + ", rur=" + rur);
+ }
+
+ // bump rur's GS to be recovery id
+ bumpReplicaGS(rur, recoveryId);
+
+ //update length
+ final File replicafile = rur.getBlockFile();
+ if (rur.getNumBytes() < newlength) {
+ throw new IOException("rur.getNumBytes() < newlength = " + newlength
+ + ", rur=" + rur);
+ }
+ if (rur.getNumBytes() > newlength) {
+ rur.unlinkBlock(1);
+ truncateBlock(replicafile, rur.getMetaFile(), rur.getNumBytes(), newlength);
+ // update RUR with the new length
+ rur.setNumBytes(newlength);
+ }
+
+ // finalize the block
+ return finalizeReplica(rur);
+ }
+
+ @Override // FSDatasetInterface
+ public synchronized long getReplicaVisibleLength(final Block block)
+ throws IOException {
+ final Replica replica = volumeMap.get(block.getBlockId());
+ if (replica == null) {
+ throw new ReplicaNotFoundException(block);
+ }
+ if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+ throw new IOException(
+ "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+ + block + ", replica=" + replica);
+ }
+ return replica.getVisibleLength();
}
}
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Sat Nov 28 20:05:56 2009
@@ -28,7 +28,10 @@
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -94,6 +97,15 @@
public long getLength(Block b) throws IOException;
/**
+ * Get reference to the replica meta info in the replicasMap.
+ * To be called from methods that are synchronized on {@link FSDataset}
+ * @param blockId
+ * @return replica from the replicas map
+ */
+ @Deprecated
+ public Replica getReplica(long blockId);
+
+ /**
* @return the generation stamp stored with the block.
*/
public Block getStoredBlock(long blkid) throws IOException;
@@ -144,6 +156,10 @@
checksumOut = cOut;
}
+ void close() throws IOException {
+ IOUtils.closeStream(dataOut);
+ IOUtils.closeStream(checksumOut);
+ }
}
/**
@@ -167,19 +183,74 @@
}
/**
- * Creates the block and returns output streams to write data and CRC
- * @param b
- * @param isRecovery True if this is part of erro recovery, otherwise false
- * @return a BlockWriteStreams object to allow writing the block data
- * and CRC
+ * Creates a temporary replica and returns the meta information of the replica
+ *
+ * @param b block
+ * @return the meta info of the replica which is being written to
+ * @throws IOException if an error occurs
+ */
+ public ReplicaInPipelineInterface createTemporary(Block b)
+ throws IOException;
+
+ /**
+ * Creates a RBW replica and returns the meta info of the replica
+ *
+ * @param b block
+ * @return the meta info of the replica which is being written to
+ * @throws IOException if an error occurs
+ */
+ public ReplicaInPipelineInterface createRbw(Block b) throws IOException;
+
+ /**
+ * Recovers a RBW replica and returns the meta info of the replica
+ *
+ * @param b block
+ * @param newGS the new generation stamp for the replica
+ * @param minBytesRcvd the minimum number of bytes that the replica could have
+ * @param maxBytesRcvd the maximum number of bytes that the replica could have
+ * @return the meta info of the replica which is being written to
+ * @throws IOException if an error occurs
+ */
+ public ReplicaInPipelineInterface recoverRbw(Block b,
+ long newGS, long minBytesRcvd, long maxBytesRcvd)
+ throws IOException;
+
+ /**
+ * Append to a finalized replica and returns the meta info of the replica
+ *
+ * @param b block
+ * @param newGS the new generation stamp for the replica
+ * @param expectedBlockLen the number of bytes the replica is expected to have
+ * @return the meata info of the replica which is being written to
* @throws IOException
*/
- public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException;
+ public ReplicaInPipelineInterface append(Block b,
+ long newGS, long expectedBlockLen) throws IOException;
/**
- * Update the block to the new generation stamp and length.
+ * Recover a failed append to a finalized replica
+ * and returns the meta info of the replica
+ *
+ * @param b block
+ * @param newGS the new generation stamp for the replica
+ * @param expectedBlockLen the number of bytes the replica is expected to have
+ * @return the meta info of the replica which is being written to
+ * @throws IOException
+ */
+ public ReplicaInPipelineInterface recoverAppend(Block b,
+ long newGS, long expectedBlockLen) throws IOException;
+
+ /**
+ * Recover a failed pipeline close
+ * It bumps the replica's generation stamp and finalize it if RBW replica
+ *
+ * @param b block
+ * @param newGS the new generation stamp for the replica
+ * @param expectedBlockLen the number of bytes the replica is expected to have
+ * @throws IOException
*/
- public void updateBlock(Block oldblock, Block newblock) throws IOException;
+ public void recoverClose(Block b,
+ long newGS, long expectedBlockLen) throws IOException;
/**
* Finalizes the block previously opened for writing using writeToBlock.
@@ -202,7 +273,7 @@
* Returns the block report - the full list of blocks stored
* @return - the block report - the full list of blocks stored
*/
- public Block[] getBlockReport();
+ public BlockListAsLongs getBlockReport();
/**
* Is the block valid?
@@ -235,39 +306,41 @@
public void shutdown();
/**
- * Returns the current offset in the data stream.
- * @param b
- * @param stream The stream to the data file and checksum file
- * @return the position of the file pointer in the data stream
+ * Sets the file pointer of the checksum stream so that the last checksum
+ * will be overwritten
+ * @param b block
+ * @param stream The stream for the data file and checksum file
+ * @param checksumSize number of bytes each checksum has
* @throws IOException
*/
- public long getChannelPosition(Block b, BlockWriteStreams stream) throws IOException;
+ public void adjustCrcChannelPosition(Block b, BlockWriteStreams stream,
+ int checksumSize) throws IOException;
/**
- * Sets the file pointer of the data stream and checksum stream to
- * the specified values.
- * @param b
- * @param stream The stream for the data file and checksum file
- * @param dataOffset The position to which the file pointre for the data stream
- * should be set
- * @param ckOffset The position to which the file pointre for the checksum stream
- * should be set
- * @throws IOException
+ * checks how many valid storage volumes are there in the DataNode
+ * @return true if more then minimum valid volumes left in the FSDataSet
*/
- public void setChannelPosition(Block b, BlockWriteStreams stream, long dataOffset,
- long ckOffset) throws IOException;
+ public boolean hasEnoughResource();
/**
- * Validate that the contents in the Block matches
- * the file on disk. Returns true if everything is fine.
- * @param b The block to be verified.
- * @throws IOException
+ * Get visible length of the specified replica.
*/
- public void validateBlockMetadata(Block b) throws IOException;
+ long getReplicaVisibleLength(final Block block) throws IOException;
/**
- * checks how many valid storage volumes are there in the DataNode
- * @return true if more then minimum valid volumes left in the FSDataSet
+ * Initialize a replica recovery.
+ *
+ * @return actual state of the replica on this data-node or
+ * null if data-node does not have the replica.
*/
- public boolean hasEnoughResource();
+ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+ throws IOException;
+
+ /**
+ * Update replica's generation stamp and length and finalize it.
+ */
+ public ReplicaInfo updateReplicaUnderRecovery(
+ Block oldBlock,
+ long recoveryId,
+ long newLength) throws IOException;
}