You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/04/02 19:38:58 UTC
svn commit: r1308437 [2/3] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/
src/main/java/org/apache/hadoop/...
Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (from r1308436, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java&r1=1308436&r2=1308437&rev=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Mon Apr 2 17:38:56 2012
@@ -15,52 +15,53 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-import java.io.BufferedInputStream;
-import java.io.BufferedReader;
-import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
-import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
-import java.io.PrintStream;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.DF;
-import org.apache.hadoop.fs.DU;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.Replica;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
@@ -72,10 +73,8 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ReflectionUtils;
@@ -86,951 +85,18 @@ import org.apache.hadoop.util.Reflection
*
***************************************************/
@InterfaceAudience.Private
-public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
- /**
- * A factory for creating FSDataset objects.
- */
- public static class Factory extends FsDatasetSpi.Factory<FSDataset> {
- @Override
- public FSDataset newInstance(DataNode datanode,
- DataStorage storage, Configuration conf) throws IOException {
- return new FSDataset(datanode, storage, conf);
- }
- }
-
- /**
- * A node type that can be built into a tree reflecting the
- * hierarchy of blocks on the local disk.
- */
- private static class FSDir {
- final int maxBlocksPerDir;
- final File dir;
- int numBlocks = 0;
- FSDir children[];
- int lastChildIdx = 0;
-
- private FSDir(File dir, int maxBlocksPerDir)
- throws IOException {
- this.dir = dir;
- this.maxBlocksPerDir = maxBlocksPerDir;
-
- this.children = null;
- if (!dir.exists()) {
- if (!dir.mkdirs()) {
- throw new IOException("Mkdirs failed to create " +
- dir.toString());
- }
- } else {
- File[] files = FileUtil.listFiles(dir);
- List<FSDir> dirList = new ArrayList<FSDir>();
- for (int idx = 0; idx < files.length; idx++) {
- if (files[idx].isDirectory()) {
- dirList.add(new FSDir(files[idx], maxBlocksPerDir));
- } else if (Block.isBlockFilename(files[idx])) {
- numBlocks++;
- }
- }
- if (dirList.size() > 0) {
- children = dirList.toArray(new FSDir[dirList.size()]);
- }
- }
- }
-
- private File addBlock(Block b, File src) throws IOException {
- //First try without creating subdirectories
- File file = addBlock(b, src, false, false);
- return (file != null) ? file : addBlock(b, src, true, true);
- }
-
- private File addBlock(Block b, File src, boolean createOk,
- boolean resetIdx) throws IOException {
- if (numBlocks < maxBlocksPerDir) {
- final File dest = moveBlockFiles(b, src, dir);
- numBlocks += 1;
- return dest;
- }
-
- if (lastChildIdx < 0 && resetIdx) {
- //reset so that all children will be checked
- lastChildIdx = DFSUtil.getRandom().nextInt(children.length);
- }
-
- if (lastChildIdx >= 0 && children != null) {
- //Check if any child-tree has room for a block.
- for (int i=0; i < children.length; i++) {
- int idx = (lastChildIdx + i)%children.length;
- File file = children[idx].addBlock(b, src, false, resetIdx);
- if (file != null) {
- lastChildIdx = idx;
- return file;
- }
- }
- lastChildIdx = -1;
- }
-
- if (!createOk) {
- return null;
- }
-
- if (children == null || children.length == 0) {
- children = new FSDir[maxBlocksPerDir];
- for (int idx = 0; idx < maxBlocksPerDir; idx++) {
- final File sub = new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx);
- children[idx] = new FSDir(sub, maxBlocksPerDir);
- }
- }
-
- //now pick a child randomly for creating a new set of subdirs.
- lastChildIdx = DFSUtil.getRandom().nextInt(children.length);
- return children[ lastChildIdx ].addBlock(b, src, true, false);
- }
-
- private void getVolumeMap(String bpid, ReplicasMap volumeMap, FSVolume volume)
- throws IOException {
- if (children != null) {
- for (int i = 0; i < children.length; i++) {
- children[i].getVolumeMap(bpid, volumeMap, volume);
- }
- }
-
- recoverTempUnlinkedBlock();
- volume.addToReplicasMap(bpid, volumeMap, dir, true);
- }
-
- /**
- * Recover unlinked tmp files on datanode restart. If the original block
- * does not exist, then the tmp file is renamed to be the
- * original file name; otherwise the tmp file is deleted.
- */
- private void recoverTempUnlinkedBlock() throws IOException {
- File files[] = FileUtil.listFiles(dir);
- for (File file : files) {
- if (!FSDataset.isUnlinkTmpFile(file)) {
- continue;
- }
- File blockFile = getOrigFile(file);
- if (blockFile.exists()) {
- //
- // If the original block file still exists, then no recovery
- // is needed.
- //
- if (!file.delete()) {
- throw new IOException("Unable to cleanup unlinked tmp file " +
- file);
- }
- } else {
- if (!file.renameTo(blockFile)) {
- throw new IOException("Unable to cleanup detached file " +
- file);
- }
- }
- }
- }
-
- /**
- * check if a data diretory is healthy
- * @throws DiskErrorException
- */
- private void checkDirTree() throws DiskErrorException {
- DiskChecker.checkDir(dir);
-
- if (children != null) {
- for (int i = 0; i < children.length; i++) {
- children[i].checkDirTree();
- }
- }
- }
-
- private void clearPath(File f) {
- String root = dir.getAbsolutePath();
- String dir = f.getAbsolutePath();
- if (dir.startsWith(root)) {
- String[] dirNames = dir.substring(root.length()).
- split(File.separator + "subdir");
- if (clearPath(f, dirNames, 1))
- return;
- }
- clearPath(f, null, -1);
- }
-
- /*
- * dirNames is an array of string integers derived from
- * usual directory structure data/subdirN/subdirXY/subdirM ...
- * If dirName array is non-null, we only check the child at
- * the children[dirNames[idx]]. This avoids iterating over
- * children in common case. If directory structure changes
- * in later versions, we need to revisit this.
- */
- private boolean clearPath(File f, String[] dirNames, int idx) {
- if ((dirNames == null || idx == dirNames.length) &&
- dir.compareTo(f) == 0) {
- numBlocks--;
- return true;
- }
-
- if (dirNames != null) {
- //guess the child index from the directory name
- if (idx > (dirNames.length - 1) || children == null) {
- return false;
- }
- int childIdx;
- try {
- childIdx = Integer.parseInt(dirNames[idx]);
- } catch (NumberFormatException ignored) {
- // layout changed? we could print a warning.
- return false;
- }
- return (childIdx >= 0 && childIdx < children.length) ?
- children[childIdx].clearPath(f, dirNames, idx+1) : false;
- }
-
- //guesses failed. back to blind iteration.
- if (children != null) {
- for(int i=0; i < children.length; i++) {
- if (children[i].clearPath(f, null, -1)){
- return true;
- }
- }
- }
- return false;
- }
-
- @Override
- public String toString() {
- return "FSDir{" +
- "dir=" + dir +
- ", children=" + (children == null ? null : Arrays.asList(children)) +
- "}";
- }
- }
-
- /**
- * A BlockPoolSlice represents a portion of a BlockPool stored on a volume.
- * Taken together, all BlockPoolSlices sharing a block pool ID across a
- * cluster represent a single block pool.
- *
- * This class is synchronized by {@link FSVolume}.
- */
- private static class BlockPoolSlice {
- private final String bpid;
- private final FSVolume volume; // volume to which this BlockPool belongs to
- private final File currentDir; // StorageDirectory/current/bpid/current
- private final FSDir finalizedDir; // directory store Finalized replica
- private final File rbwDir; // directory store RBW replica
- private final File tmpDir; // directory store Temporary replica
-
- // TODO:FEDERATION scalability issue - a thread per DU is needed
- private final DU dfsUsage;
-
- /**
- *
- * @param bpid Block pool Id
- * @param volume {@link FSVolume} to which this BlockPool belongs to
- * @param bpDir directory corresponding to the BlockPool
- * @param conf
- * @throws IOException
- */
- BlockPoolSlice(String bpid, FSVolume volume, File bpDir, Configuration conf)
- throws IOException {
- this.bpid = bpid;
- this.volume = volume;
- this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
- final File finalizedDir = new File(
- currentDir, DataStorage.STORAGE_DIR_FINALIZED);
-
- // Files that were being written when the datanode was last shutdown
- // are now moved back to the data directory. It is possible that
- // in the future, we might want to do some sort of datanode-local
- // recovery for these blocks. For example, crc validation.
- //
- this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
- if (tmpDir.exists()) {
- FileUtil.fullyDelete(tmpDir);
- }
- this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
- final boolean supportAppends = conf.getBoolean(
- DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
- DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
- if (rbwDir.exists() && !supportAppends) {
- FileUtil.fullyDelete(rbwDir);
- }
- final int maxBlocksPerDir = conf.getInt(
- DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
- DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
- this.finalizedDir = new FSDir(finalizedDir, maxBlocksPerDir);
- if (!rbwDir.mkdirs()) { // create rbw directory if not exist
- if (!rbwDir.isDirectory()) {
- throw new IOException("Mkdirs failed to create " + rbwDir.toString());
- }
- }
- if (!tmpDir.mkdirs()) {
- if (!tmpDir.isDirectory()) {
- throw new IOException("Mkdirs failed to create " + tmpDir.toString());
- }
- }
- this.dfsUsage = new DU(bpDir, conf);
- this.dfsUsage.start();
- }
-
- File getDirectory() {
- return currentDir.getParentFile();
- }
-
- File getFinalizedDir() {
- return finalizedDir.dir;
- }
-
- File getRbwDir() {
- return rbwDir;
- }
-
- /**
- * This should be used only by {@link FSVolume#decDfsUsed(String, long)}
- * and it will be synchronized there.
- */
- void decDfsUsed(long value) {
- dfsUsage.decDfsUsed(value);
- }
-
- long getDfsUsed() throws IOException {
- return dfsUsage.getUsed();
- }
-
- /**
- * Temporary files. They get moved to the finalized block directory when
- * the block is finalized.
- */
- File createTmpFile(Block b) throws IOException {
- File f = new File(tmpDir, b.getBlockName());
- return DatanodeUtil.createTmpFile(b, f);
- }
-
- /**
- * RBW files. They get moved to the finalized block directory when
- * the block is finalized.
- */
- File createRbwFile(Block b) throws IOException {
- File f = new File(rbwDir, b.getBlockName());
- return DatanodeUtil.createTmpFile(b, f);
- }
-
- File addBlock(Block b, File f) throws IOException {
- File blockFile = finalizedDir.addBlock(b, f);
- File metaFile = DatanodeUtil.getMetaFile(blockFile, b.getGenerationStamp());
- dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
- return blockFile;
- }
-
- void checkDirs() throws DiskErrorException {
- finalizedDir.checkDirTree();
- DiskChecker.checkDir(tmpDir);
- DiskChecker.checkDir(rbwDir);
- }
-
- void getVolumeMap(ReplicasMap volumeMap) throws IOException {
- // add finalized replicas
- finalizedDir.getVolumeMap(bpid, volumeMap, volume);
- // add rbw replicas
- addToReplicasMap(volumeMap, rbwDir, false);
- }
-
- /**
- * Add replicas under the given directory to the volume map
- * @param volumeMap the replicas map
- * @param dir an input directory
- * @param isFinalized true if the directory has finalized replicas;
- * false if the directory has rbw replicas
- */
- private void addToReplicasMap(ReplicasMap volumeMap, File dir,
- boolean isFinalized) throws IOException {
- File blockFiles[] = FileUtil.listFiles(dir);
- for (File blockFile : blockFiles) {
- if (!Block.isBlockFilename(blockFile))
- continue;
-
- long genStamp = getGenerationStampFromFile(blockFiles, blockFile);
- long blockId = Block.filename2id(blockFile.getName());
- ReplicaInfo newReplica = null;
- if (isFinalized) {
- newReplica = new FinalizedReplica(blockId,
- blockFile.length(), genStamp, volume, blockFile.getParentFile());
- } else {
- newReplica = new ReplicaWaitingToBeRecovered(blockId,
- validateIntegrity(blockFile, genStamp),
- genStamp, volume, blockFile.getParentFile());
- }
-
- ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
- if (oldReplica != null) {
- DataNode.LOG.warn("Two block files with the same block id exist " +
- "on disk: " + oldReplica.getBlockFile() +
- " and " + blockFile );
- }
- }
- }
-
- /**
- * Find out the number of bytes in the block that match its crc.
- *
- * This algorithm assumes that data corruption caused by unexpected
- * datanode shutdown occurs only in the last crc chunk. So it checks
- * only the last chunk.
- *
- * @param blockFile the block file
- * @param genStamp generation stamp of the block
- * @return the number of valid bytes
- */
- private long validateIntegrity(File blockFile, long genStamp) {
- DataInputStream checksumIn = null;
- InputStream blockIn = null;
- try {
- final File metaFile = DatanodeUtil.getMetaFile(blockFile, genStamp);
- long blockFileLen = blockFile.length();
- long metaFileLen = metaFile.length();
- int crcHeaderLen = DataChecksum.getChecksumHeaderSize();
- if (!blockFile.exists() || blockFileLen == 0 ||
- !metaFile.exists() || metaFileLen < crcHeaderLen) {
- return 0;
- }
- checksumIn = new DataInputStream(
- new BufferedInputStream(new FileInputStream(metaFile),
- HdfsConstants.IO_FILE_BUFFER_SIZE));
-
- // read and handle the common header here. For now just a version
- BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
- short version = header.getVersion();
- if (version != BlockMetadataHeader.VERSION) {
- DataNode.LOG.warn("Wrong version (" + version + ") for metadata file "
- + metaFile + " ignoring ...");
- }
- DataChecksum checksum = header.getChecksum();
- int bytesPerChecksum = checksum.getBytesPerChecksum();
- int checksumSize = checksum.getChecksumSize();
- long numChunks = Math.min(
- (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum,
- (metaFileLen - crcHeaderLen)/checksumSize);
- if (numChunks == 0) {
- return 0;
- }
- IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize);
- blockIn = new FileInputStream(blockFile);
- long lastChunkStartPos = (numChunks-1)*bytesPerChecksum;
- IOUtils.skipFully(blockIn, lastChunkStartPos);
- int lastChunkSize = (int)Math.min(
- bytesPerChecksum, blockFileLen-lastChunkStartPos);
- byte[] buf = new byte[lastChunkSize+checksumSize];
- checksumIn.readFully(buf, lastChunkSize, checksumSize);
- IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
-
- checksum.update(buf, 0, lastChunkSize);
- if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
- return lastChunkStartPos + lastChunkSize;
- } else { // last chunck is corrupt
- return lastChunkStartPos;
- }
- } catch (IOException e) {
- DataNode.LOG.warn(e);
- return 0;
- } finally {
- IOUtils.closeStream(checksumIn);
- IOUtils.closeStream(blockIn);
- }
- }
-
- void clearPath(File f) {
- finalizedDir.clearPath(f);
- }
-
- public String toString() {
- return currentDir.getAbsolutePath();
- }
-
- public void shutdown() {
- dfsUsage.shutdown();
- }
- }
-
- /**
- * The underlying volume used to store replica.
- *
- * It uses the {@link FSDataset} object for synchronization.
- */
- static class FSVolume implements FsVolumeSpi {
- private final FSDataset dataset;
- private final String storageID;
- private final Map<String, BlockPoolSlice> map = new HashMap<String, BlockPoolSlice>();
- private final File currentDir; // <StorageDirectory>/current
- private final DF usage;
- private final long reserved;
-
- FSVolume(FSDataset dataset, String storageID, File currentDir,
- Configuration conf) throws IOException {
- this.dataset = dataset;
- this.storageID = storageID;
- this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
- DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
- this.currentDir = currentDir;
- File parent = currentDir.getParentFile();
- this.usage = new DF(parent, conf);
- }
-
- File getCurrentDir() {
- return currentDir;
- }
-
- File getRbwDir(String bpid) throws IOException {
- BlockPoolSlice bp = getBlockPoolSlice(bpid);
- return bp.getRbwDir();
- }
-
- void decDfsUsed(String bpid, long value) {
- synchronized(dataset) {
- BlockPoolSlice bp = map.get(bpid);
- if (bp != null) {
- bp.decDfsUsed(value);
- }
- }
- }
-
- long getDfsUsed() throws IOException {
- long dfsUsed = 0;
- synchronized(dataset) {
- for(BlockPoolSlice s : map.values()) {
- dfsUsed += s.getDfsUsed();
- }
- }
- return dfsUsed;
- }
-
- long getBlockPoolUsed(String bpid) throws IOException {
- return getBlockPoolSlice(bpid).getDfsUsed();
- }
-
- /**
- * Calculate the capacity of the filesystem, after removing any
- * reserved capacity.
- * @return the unreserved number of bytes left in this filesystem. May be zero.
- */
- long getCapacity() {
- long remaining = usage.getCapacity() - reserved;
- return remaining > 0 ? remaining : 0;
- }
-
- @Override
- public long getAvailable() throws IOException {
- long remaining = getCapacity()-getDfsUsed();
- long available = usage.getAvailable();
- if (remaining>available) {
- remaining = available;
- }
- return (remaining > 0) ? remaining : 0;
- }
-
- long getReserved(){
- return reserved;
- }
-
- String getMount() throws IOException {
- return usage.getMount();
- }
-
- private BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException {
- BlockPoolSlice bp = map.get(bpid);
- if (bp == null) {
- throw new IOException("block pool " + bpid + " is not found");
- }
- return bp;
- }
-
- @Override
- public String getPath(String bpid) throws IOException {
- return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
- }
-
- @Override
- public File getFinalizedDir(String bpid) throws IOException {
- return getBlockPoolSlice(bpid).getFinalizedDir();
- }
-
- /**
- * Make a deep copy of the list of currently active BPIDs
- */
- @Override
- public String[] getBlockPoolList() {
- synchronized(dataset) {
- return map.keySet().toArray(new String[map.keySet().size()]);
- }
- }
-
- /**
- * Temporary files. They get moved to the finalized block directory when
- * the block is finalized.
- */
- File createTmpFile(String bpid, Block b) throws IOException {
- BlockPoolSlice bp = getBlockPoolSlice(bpid);
- return bp.createTmpFile(b);
- }
-
- /**
- * RBW files. They get moved to the finalized block directory when
- * the block is finalized.
- */
- File createRbwFile(String bpid, Block b) throws IOException {
- BlockPoolSlice bp = getBlockPoolSlice(bpid);
- return bp.createRbwFile(b);
- }
-
- File addBlock(String bpid, Block b, File f) throws IOException {
- BlockPoolSlice bp = getBlockPoolSlice(bpid);
- return bp.addBlock(b, f);
- }
-
- /**
- * This should be used only by {@link FSVolumeSet#checkDirs()}
- * and it will be synchronized there.
- */
- void checkDirs() throws DiskErrorException {
- // TODO:FEDERATION valid synchronization
- for(BlockPoolSlice s : map.values()) {
- s.checkDirs();
- }
- }
-
- void getVolumeMap(ReplicasMap volumeMap) throws IOException {
- Set<Entry<String, BlockPoolSlice>> set = map.entrySet();
- for (Entry<String, BlockPoolSlice> entry : set) {
- entry.getValue().getVolumeMap(volumeMap);
- }
- }
-
- void getVolumeMap(String bpid, ReplicasMap volumeMap) throws IOException {
- BlockPoolSlice bp = getBlockPoolSlice(bpid);
- bp.getVolumeMap(volumeMap);
- }
-
- /**
- * Add replicas under the given directory to the volume map
- * @param volumeMap the replicas map
- * @param dir an input directory
- * @param isFinalized true if the directory has finalized replicas;
- * false if the directory has rbw replicas
- * @throws IOException
- */
- private void addToReplicasMap(String bpid, ReplicasMap volumeMap,
- File dir, boolean isFinalized) throws IOException {
- BlockPoolSlice bp = getBlockPoolSlice(bpid);
- // TODO move this up
- // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
- bp.addToReplicasMap(volumeMap, dir, isFinalized);
- }
-
- void clearPath(String bpid, File f) throws IOException {
- BlockPoolSlice bp = getBlockPoolSlice(bpid);
- bp.clearPath(f);
- }
-
- @Override
- public String toString() {
- return currentDir.getAbsolutePath();
- }
-
- public void shutdown() {
- Set<Entry<String, BlockPoolSlice>> set = map.entrySet();
- for (Entry<String, BlockPoolSlice> entry : set) {
- entry.getValue().shutdown();
- }
- }
-
- public void addBlockPool(String bpid, Configuration conf)
- throws IOException {
- File bpdir = new File(currentDir, bpid);
- BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf);
- map.put(bpid, bp);
- }
-
- public void shutdownBlockPool(String bpid) {
- BlockPoolSlice bp = map.get(bpid);
- if (bp!=null) {
- bp.shutdown();
- }
- map.remove(bpid);
- }
-
- private boolean isBPDirEmpty(String bpid)
- throws IOException {
- File volumeCurrentDir = this.getCurrentDir();
- File bpDir = new File(volumeCurrentDir, bpid);
- File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
- File finalizedDir = new File(bpCurrentDir,
- DataStorage.STORAGE_DIR_FINALIZED);
- File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
- if (finalizedDir.exists() && FileUtil.list(finalizedDir).length != 0) {
- return false;
- }
- if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
- return false;
- }
- return true;
- }
-
- private void deleteBPDirectories(String bpid, boolean force)
- throws IOException {
- File volumeCurrentDir = this.getCurrentDir();
- File bpDir = new File(volumeCurrentDir, bpid);
- if (!bpDir.isDirectory()) {
- // nothing to be deleted
- return;
- }
- File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
- File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
- File finalizedDir = new File(bpCurrentDir,
- DataStorage.STORAGE_DIR_FINALIZED);
- File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
- if (force) {
- FileUtil.fullyDelete(bpDir);
- } else {
- if (!rbwDir.delete()) {
- throw new IOException("Failed to delete " + rbwDir);
- }
- if (!finalizedDir.delete()) {
- throw new IOException("Failed to delete " + finalizedDir);
- }
- FileUtil.fullyDelete(tmpDir);
- for (File f : FileUtil.listFiles(bpCurrentDir)) {
- if (!f.delete()) {
- throw new IOException("Failed to delete " + f);
- }
- }
- if (!bpCurrentDir.delete()) {
- throw new IOException("Failed to delete " + bpCurrentDir);
- }
- for (File f : FileUtil.listFiles(bpDir)) {
- if (!f.delete()) {
- throw new IOException("Failed to delete " + f);
- }
- }
- if (!bpDir.delete()) {
- throw new IOException("Failed to delete " + bpDir);
- }
- }
- }
-
- String getStorageID() {
- return storageID;
- }
- }
-
- static class FSVolumeSet {
- /*
- * Read access to this unmodifiable list is not synchronized.
- * This list is replaced on modification holding "this" lock.
- */
- private volatile List<FSVolume> volumes = null;
-
- final VolumeChoosingPolicy<FSVolume> blockChooser;
- int numFailedVolumes;
-
- FSVolumeSet(List<FSVolume> volumes, int failedVols,
- VolumeChoosingPolicy<FSVolume> blockChooser) {
- this.volumes = Collections.unmodifiableList(volumes);
- this.blockChooser = blockChooser;
- this.numFailedVolumes = failedVols;
- }
-
- private int numberOfFailedVolumes() {
- return numFailedVolumes;
- }
-
- /**
- * Get next volume. Synchronized to ensure {@link #curVolume} is updated
- * by a single thread and next volume is chosen with no concurrent
- * update to {@link #volumes}.
- * @param blockSize free space needed on the volume
- * @return next volume to store the block in.
- */
- synchronized FSVolume getNextVolume(long blockSize) throws IOException {
- return blockChooser.chooseVolume(volumes, blockSize);
- }
-
- private long getDfsUsed() throws IOException {
- long dfsUsed = 0L;
- for (FSVolume v : volumes) {
- dfsUsed += v.getDfsUsed();
- }
- return dfsUsed;
- }
-
- private long getBlockPoolUsed(String bpid) throws IOException {
- long dfsUsed = 0L;
- for (FSVolume v : volumes) {
- dfsUsed += v.getBlockPoolUsed(bpid);
- }
- return dfsUsed;
- }
-
- private long getCapacity() {
- long capacity = 0L;
- for (FSVolume v : volumes) {
- capacity += v.getCapacity();
- }
- return capacity;
- }
-
- private long getRemaining() throws IOException {
- long remaining = 0L;
- for (FsVolumeSpi vol : volumes) {
- remaining += vol.getAvailable();
- }
- return remaining;
- }
-
- private void getVolumeMap(ReplicasMap volumeMap) throws IOException {
- for (FSVolume v : volumes) {
- v.getVolumeMap(volumeMap);
- }
- }
-
- private void getVolumeMap(String bpid, ReplicasMap volumeMap)
- throws IOException {
- for (FSVolume v : volumes) {
- v.getVolumeMap(bpid, volumeMap);
- }
- }
-
- /**
- * Calls {@link FSVolume#checkDirs()} on each volume, removing any
- * volumes from the active list that result in a DiskErrorException.
- *
- * This method is synchronized to allow only one instance of checkDirs()
- * call
- * @return list of all the removed volumes.
- */
- private synchronized List<FSVolume> checkDirs() {
- ArrayList<FSVolume> removedVols = null;
-
- // Make a copy of volumes for performing modification
- final List<FSVolume> volumeList = new ArrayList<FSVolume>(volumes);
-
- for(Iterator<FSVolume> i = volumeList.iterator(); i.hasNext(); ) {
- final FSVolume fsv = i.next();
- try {
- fsv.checkDirs();
- } catch (DiskErrorException e) {
- DataNode.LOG.warn("Removing failed volume " + fsv + ": ",e);
- if (removedVols == null) {
- removedVols = new ArrayList<FSVolume>(2);
- }
- removedVols.add(fsv);
- fsv.shutdown();
- i.remove(); // Remove the volume
- numFailedVolumes++;
- }
- }
-
- if (removedVols != null && removedVols.size() > 0) {
- // Replace volume list
- volumes = Collections.unmodifiableList(volumeList);
- DataNode.LOG.info("Completed FSVolumeSet.checkDirs. Removed "
- + removedVols.size() + " volumes. List of current volumes: "
- + this);
- }
-
- return removedVols;
- }
-
- @Override
- public String toString() {
- return volumes.toString();
- }
-
-
- private void addBlockPool(String bpid, Configuration conf)
- throws IOException {
- for (FSVolume v : volumes) {
- v.addBlockPool(bpid, conf);
- }
- }
-
- private void removeBlockPool(String bpid) {
- for (FSVolume v : volumes) {
- v.shutdownBlockPool(bpid);
- }
- }
-
- private void shutdown() {
- for (FSVolume volume : volumes) {
- if(volume != null) {
- volume.shutdown();
- }
- }
- }
- }
-
- //////////////////////////////////////////////////////
- //
- // FSDataSet
- //
- //////////////////////////////////////////////////////
-
- private static boolean isUnlinkTmpFile(File f) {
- String name = f.getName();
- return name.endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX);
- }
-
- private static File getOrigFile(File unlinkTmpFile) {
- String fileName = unlinkTmpFile.getName();
- return new File(unlinkTmpFile.getParentFile(),
- fileName.substring(0,
- fileName.length() - DatanodeUtil.UNLINK_BLOCK_SUFFIX.length()));
- }
-
- protected File getMetaFile(ExtendedBlock b) throws IOException {
- return DatanodeUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp());
- }
-
- /** Find the metadata file for the specified block file.
- * Return the generation stamp from the name of the metafile.
- */
- private static long getGenerationStampFromFile(File[] listdir, File blockFile) {
- String blockName = blockFile.getName();
- for (int j = 0; j < listdir.length; j++) {
- String path = listdir[j].getName();
- if (!path.startsWith(blockName)) {
- continue;
- }
- if (blockFile == listdir[j]) {
- continue;
- }
- return Block.getGenerationStamp(listdir[j].getName());
- }
- DataNode.LOG.warn("Block " + blockFile +
- " does not have a metafile!");
- return GenerationStamp.GRANDFATHER_GENERATION_STAMP;
- }
-
- /** Find the corresponding meta data file from a given block file */
- private static long parseGenerationStamp(File blockFile, File metaFile
- ) throws IOException {
- String metaname = metaFile.getName();
- String gs = metaname.substring(blockFile.getName().length() + 1,
- metaname.length() - DatanodeUtil.METADATA_EXTENSION.length());
- try {
- return Long.parseLong(gs);
- } catch(NumberFormatException nfe) {
- throw (IOException)new IOException("blockFile=" + blockFile
- + ", metaFile=" + metaFile).initCause(nfe);
- }
- }
+class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
+ static final Log LOG = LogFactory.getLog(FsDatasetImpl.class);
@Override // FsDatasetSpi
- public List<FSVolume> getVolumes() {
+ public List<FsVolumeImpl> getVolumes() {
return volumes.volumes;
}
@Override
- public synchronized FSVolume getVolume(final ExtendedBlock b) {
+ public synchronized FsVolumeImpl getVolume(final ExtendedBlock b) {
final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
- return r != null? (FSVolume)r.getVolume(): null;
+ return r != null? (FsVolumeImpl)r.getVolume(): null;
}
@Override // FsDatasetSpi
@@ -1040,11 +106,12 @@ public class FSDataset implements FsData
if (blockfile == null) {
return null;
}
- final File metafile = DatanodeUtil.findMetaFile(blockfile);
- return new Block(blkid, blockfile.length(),
- parseGenerationStamp(blockfile, metafile));
+ final File metafile = FsDatasetUtil.findMetaFile(blockfile);
+ final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile);
+ return new Block(blkid, blockfile.length(), gs);
}
+
/**
* Returns a clone of a replica stored in data-node memory.
* Should be primarily used for testing.
@@ -1069,21 +136,21 @@ public class FSDataset implements FsData
}
return null;
}
-
+
@Override // FsDatasetSpi
public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
throws IOException {
- final File meta = getMetaFile(b);
+ File meta = FsDatasetUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp());
if (meta == null || !meta.exists()) {
return null;
}
return new LengthInputStream(new FileInputStream(meta), meta.length());
}
- private final DataNode datanode;
- final FSVolumeSet volumes;
- final ReplicasMap volumeMap;
- final FSDatasetAsyncDiskService asyncDiskService;
+ final DataNode datanode;
+ final FsVolumeList volumes;
+ final ReplicaMap volumeMap;
+ final FsDatasetAsyncDiskService asyncDiskService;
private final int validVolsRequired;
// Used for synchronizing access to usage stats
@@ -1092,7 +159,7 @@ public class FSDataset implements FsData
/**
* An FSDataset has a directory where it loads its data files.
*/
- private FSDataset(DataNode datanode, DataStorage storage, Configuration conf
+ FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
) throws IOException {
this.datanode = datanode;
// The number of volumes required for operation is the total number
@@ -1119,29 +186,29 @@ public class FSDataset implements FsData
+ ", volume failures tolerated: " + volFailuresTolerated);
}
- final List<FSVolume> volArray = new ArrayList<FSVolume>(
+ final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
storage.getNumStorageDirs());
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
final File dir = storage.getStorageDir(idx).getCurrentDir();
- volArray.add(new FSVolume(this, storage.getStorageID(), dir, conf));
- DataNode.LOG.info("FSDataset added volume - " + dir);
+ volArray.add(new FsVolumeImpl(this, storage.getStorageID(), dir, conf));
+ LOG.info("Added volume - " + dir);
}
- volumeMap = new ReplicasMap(this);
+ volumeMap = new ReplicaMap(this);
@SuppressWarnings("unchecked")
- final VolumeChoosingPolicy<FSVolume> blockChooserImpl =
+ final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
ReflectionUtils.newInstance(conf.getClass(
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
RoundRobinVolumeChoosingPolicy.class,
VolumeChoosingPolicy.class), conf);
- volumes = new FSVolumeSet(volArray, volsFailed, blockChooserImpl);
+ volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl);
volumes.getVolumeMap(volumeMap);
File[] roots = new File[storage.getNumStorageDirs()];
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
roots[idx] = storage.getStorageDir(idx).getCurrentDir();
}
- asyncDiskService = new FSDatasetAsyncDiskService(this, roots);
+ asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots);
registerMBean(storage.getStorageID());
}
@@ -1221,8 +288,8 @@ public class FSDataset implements FsData
File getBlockFile(String bpid, Block b) throws IOException {
File f = validateBlockFile(bpid, b);
if(f == null) {
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
}
throw new IOException("Block " + b + " is not valid.");
}
@@ -1322,27 +389,12 @@ public class FSDataset implements FsData
return new ReplicaInputStreams(new FileInputStream(blockInFile.getFD()),
new FileInputStream(metaInFile.getFD()));
}
-
- /**
- * Make a copy of the block if this block is linked to an existing
- * snapshot. This ensures that modifying this block does not modify
- * data in any existing snapshots.
- * @param block Block
- * @param numLinks Unlink if the number of links exceed this value
- * @throws IOException
- * @return - true if the specified block was unlinked or the block
- * is not in any snapshot.
- */
- public boolean unlinkBlock(ExtendedBlock block, int numLinks) throws IOException {
- ReplicaInfo info = getReplicaInfo(block);
- return info.unlinkBlock(numLinks);
- }
- private static File moveBlockFiles(Block b, File srcfile, File destdir
+ static File moveBlockFiles(Block b, File srcfile, File destdir
) throws IOException {
final File dstfile = new File(destdir, b.getBlockName());
- final File srcmeta = DatanodeUtil.getMetaFile(srcfile, b.getGenerationStamp());
- final File dstmeta = DatanodeUtil.getMetaFile(dstfile, b.getGenerationStamp());
+ final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
+ final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
if (!srcmeta.renameTo(dstmeta)) {
throw new IOException("Failed to move meta file for " + b
+ " from " + srcmeta + " to " + dstmeta);
@@ -1351,16 +403,16 @@ public class FSDataset implements FsData
throw new IOException("Failed to move block file for " + b
+ " from " + srcfile + " to " + dstfile.getAbsolutePath());
}
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta);
- DataNode.LOG.debug("addBlock: Moved " + srcfile + " to " + dstfile);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta
+ + " and " + srcfile + " to " + dstfile);
}
return dstfile;
}
static private void truncateBlock(File blockFile, File metaFile,
long oldlen, long newlen) throws IOException {
- DataNode.LOG.info("truncateBlock: blockFile=" + blockFile
+ LOG.info("truncateBlock: blockFile=" + blockFile
+ ", metaFile=" + metaFile
+ ", oldlen=" + oldlen
+ ", newlen=" + newlen);
@@ -1411,7 +463,7 @@ public class FSDataset implements FsData
@Override // FsDatasetSpi
- public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
+ public synchronized ReplicaInPipeline append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
@@ -1425,7 +477,7 @@ public class FSDataset implements FsData
" should be greater than the replica " + b + "'s generation stamp");
}
ReplicaInfo replicaInfo = getReplicaInfo(b);
- DataNode.LOG.info("Appending to replica " + replicaInfo);
+ LOG.info("Appending to replica " + replicaInfo);
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new ReplicaNotFoundException(
ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
@@ -1460,7 +512,7 @@ public class FSDataset implements FsData
// construct a RBW replica with the new GS
File blkfile = replicaInfo.getBlockFile();
- FSVolume v = (FSVolume)replicaInfo.getVolume();
+ FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
if (v.getAvailable() < estimateBlockLen - replicaInfo.getNumBytes()) {
throw new DiskOutOfSpaceException("Insufficient space for appending to "
+ replicaInfo);
@@ -1473,8 +525,8 @@ public class FSDataset implements FsData
File newmeta = newReplicaInfo.getMetaFile();
// rename meta file to rbw directory
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming " + oldmeta + " to " + newmeta);
}
if (!oldmeta.renameTo(newmeta)) {
throw new IOException("Block " + replicaInfo + " reopen failed. " +
@@ -1483,13 +535,13 @@ public class FSDataset implements FsData
}
// rename block file to rbw directory
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("Renaming " + blkfile + " to " + newBlkFile);
- DataNode.LOG.debug("Old block file length is " + blkfile.length());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming " + blkfile + " to " + newBlkFile
+ + ", file length=" + blkfile.length());
}
if (!blkfile.renameTo(newBlkFile)) {
if (!newmeta.renameTo(oldmeta)) { // restore the meta file
- DataNode.LOG.warn("Cannot move meta file " + newmeta +
+ LOG.warn("Cannot move meta file " + newmeta +
"back to the finalized directory " + oldmeta);
}
throw new IOException("Block " + replicaInfo + " reopen failed. " +
@@ -1552,9 +604,9 @@ public class FSDataset implements FsData
}
@Override // FsDatasetSpi
- public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
+ public synchronized ReplicaInPipeline recoverAppend(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
- DataNode.LOG.info("Recover failed append to " + b);
+ LOG.info("Recover failed append to " + b);
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
@@ -1571,10 +623,9 @@ public class FSDataset implements FsData
@Override // FsDatasetSpi
public void recoverClose(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException {
- DataNode.LOG.info("Recover failed close " + b);
+ LOG.info("Recover failed close " + b);
// check replica's state
- ReplicaInfo replicaInfo = recoverCheck(b, newGS,
- expectedBlockLen);
+ ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// bump the replica's GS
bumpReplicaGS(replicaInfo, newGS);
// finalize the replica if RBW
@@ -1599,8 +650,8 @@ public class FSDataset implements FsData
File newmeta = replicaInfo.getMetaFile();
// rename meta file to new GS
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming " + oldmeta + " to " + newmeta);
}
if (!oldmeta.renameTo(newmeta)) {
replicaInfo.setGenerationStamp(oldGS); // restore old GS
@@ -1611,7 +662,7 @@ public class FSDataset implements FsData
}
@Override // FsDatasetSpi
- public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
+ public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)
throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
@@ -1621,7 +672,7 @@ public class FSDataset implements FsData
" and thus cannot be created.");
}
// create a new block
- FSVolume v = volumes.getNextVolume(b.getNumBytes());
+ FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
// create a rbw file to hold block in the designated volume
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
@@ -1631,10 +682,10 @@ public class FSDataset implements FsData
}
@Override // FsDatasetSpi
- public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
+ public synchronized ReplicaInPipeline recoverRbw(ExtendedBlock b,
long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
- DataNode.LOG.info("Recover the RBW replica " + b);
+ LOG.info("Recover the RBW replica " + b);
ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
@@ -1645,7 +696,7 @@ public class FSDataset implements FsData
}
ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
- DataNode.LOG.info("Recovering replica " + rbw);
+ LOG.info("Recovering replica " + rbw);
// Stop the previous writer
rbw.stopWriter();
@@ -1676,12 +727,12 @@ public class FSDataset implements FsData
}
@Override // FsDatasetSpi
- public synchronized ReplicaInPipelineInterface convertTemporaryToRbw(
+ public synchronized ReplicaInPipeline convertTemporaryToRbw(
final ExtendedBlock b) throws IOException {
final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp();
final long visible = b.getNumBytes();
- DataNode.LOG.info("Convert replica " + b
+ LOG.info("Convert replica " + b
+ " from Temporary to RBW, visible length=" + visible);
final ReplicaInPipeline temp;
@@ -1717,7 +768,7 @@ public class FSDataset implements FsData
+ visible + ", temp=" + temp);
}
// check volume
- final FSVolume v = (FSVolume)temp.getVolume();
+ final FsVolumeImpl v = (FsVolumeImpl)temp.getVolume();
if (v == null) {
throw new IOException("r.getVolume() = null, temp=" + temp);
}
@@ -1737,7 +788,7 @@ public class FSDataset implements FsData
}
@Override // FsDatasetSpi
- public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
+ public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b)
throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (replicaInfo != null) {
@@ -1746,7 +797,7 @@ public class FSDataset implements FsData
" and thus cannot be created.");
}
- FSVolume v = volumes.getNextVolume(b.getNumBytes());
+ FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
// create a temporary file to hold block in the designated volume
File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
@@ -1763,12 +814,12 @@ public class FSDataset implements FsData
@Override // FsDatasetSpi
public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams,
int checksumSize) throws IOException {
- FileOutputStream file = (FileOutputStream) streams.getChecksumOut();
+ FileOutputStream file = (FileOutputStream)streams.getChecksumOut();
FileChannel channel = file.getChannel();
long oldPos = channel.position();
long newPos = oldPos - checksumSize;
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("Changing meta file offset of block " + b + " from " +
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Changing meta file offset of block " + b + " from " +
oldPos + " to " + newPos);
}
channel.position(newPos);
@@ -1805,7 +856,7 @@ public class FSDataset implements FsData
newReplicaInfo = (FinalizedReplica)
((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
} else {
- FSVolume v = (FSVolume)replicaInfo.getVolume();
+ FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
File f = replicaInfo.getBlockFile();
if (v == null) {
throw new IOException("No volume for temporary file " + f +
@@ -1833,7 +884,7 @@ public class FSDataset implements FsData
// delete the on-disk temp file
if (delBlockFromDisk(replicaInfo.getBlockFile(),
replicaInfo.getMetaFile(), b.getLocalBlock())) {
- DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+ LOG.warn("Block " + b + " unfinalized and removed. " );
}
}
}
@@ -1847,17 +898,16 @@ public class FSDataset implements FsData
*/
private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) {
if (blockFile == null) {
- DataNode.LOG.warn("No file exists for block: " + b);
+ LOG.warn("No file exists for block: " + b);
return true;
}
if (!blockFile.delete()) {
- DataNode.LOG.warn("Not able to delete the block file: " + blockFile);
+ LOG.warn("Not able to delete the block file: " + blockFile);
return false;
} else { // remove the meta file
if (metaFile != null && !metaFile.delete()) {
- DataNode.LOG.warn(
- "Not able to delete the meta block file: " + metaFile);
+ LOG.warn("Not able to delete the meta block file: " + metaFile);
return false;
}
}
@@ -1958,8 +1008,8 @@ public class FSDataset implements FsData
datanode.checkDiskError();
}
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("b=" + b + ", f=" + f);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("b=" + b + ", f=" + f);
}
return null;
}
@@ -1977,7 +1027,7 @@ public class FSDataset implements FsData
}
//check replica's meta file
- final File metafile = DatanodeUtil.getMetaFile(f, r.getGenerationStamp());
+ final File metafile = FsDatasetUtil.getMetaFile(f, r.getGenerationStamp());
if (!metafile.exists()) {
throw new IOException("Metafile " + metafile + " does not exist, r=" + r);
}
@@ -1995,69 +1045,64 @@ public class FSDataset implements FsData
public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
boolean error = false;
for (int i = 0; i < invalidBlks.length; i++) {
- File f = null;
- final FSVolume v;
+ final File f;
+ final FsVolumeImpl v;
synchronized (this) {
f = getFile(bpid, invalidBlks[i].getBlockId());
- ReplicaInfo dinfo = volumeMap.get(bpid, invalidBlks[i]);
- if (dinfo == null ||
- dinfo.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) {
- DataNode.LOG.warn("Unexpected error trying to delete block "
- + invalidBlks[i] +
- ". BlockInfo not found in volumeMap.");
+ ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
+ if (info == null) {
+ LOG.warn("Failed to delete replica " + invalidBlks[i]
+ + ": ReplicaInfo not found.");
+ error = true;
+ continue;
+ }
+ if (info.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) {
+ LOG.warn("Failed to delete replica " + invalidBlks[i]
+ + ": GenerationStamp not matched, info=" + info);
error = true;
continue;
}
- v = (FSVolume)dinfo.getVolume();
+ v = (FsVolumeImpl)info.getVolume();
if (f == null) {
- DataNode.LOG.warn("Unexpected error trying to delete block "
- + invalidBlks[i] +
- ". Block not found in blockMap." +
- ((v == null) ? " " : " Block found in volumeMap."));
+ LOG.warn("Failed to delete replica " + invalidBlks[i]
+ + ": File not found, volume=" + v);
error = true;
continue;
}
if (v == null) {
- DataNode.LOG.warn("Unexpected error trying to delete block "
- + invalidBlks[i] +
- ". No volume for this block." +
- " Block found in blockMap. " + f + ".");
+ LOG.warn("Failed to delete replica " + invalidBlks[i]
+ + ". No volume for this replica, file=" + f + ".");
error = true;
continue;
}
File parent = f.getParentFile();
if (parent == null) {
- DataNode.LOG.warn("Unexpected error trying to delete block "
- + invalidBlks[i] +
- ". Parent not found for file " + f + ".");
+ LOG.warn("Failed to delete replica " + invalidBlks[i]
+ + ". Parent not found for file " + f + ".");
error = true;
continue;
}
- ReplicaState replicaState = dinfo.getState();
+ ReplicaState replicaState = info.getState();
if (replicaState == ReplicaState.FINALIZED ||
(replicaState == ReplicaState.RUR &&
- ((ReplicaUnderRecovery)dinfo).getOriginalReplica().getState() ==
+ ((ReplicaUnderRecovery)info).getOriginalReplica().getState() ==
ReplicaState.FINALIZED)) {
v.clearPath(bpid, parent);
}
volumeMap.remove(bpid, invalidBlks[i]);
}
- File metaFile = DatanodeUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp());
// Delete the block asynchronously to make sure we can do it fast enough
- asyncDiskService.deleteAsync(v, f, metaFile,
+ asyncDiskService.deleteAsync(v, f,
+ FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
new ExtendedBlock(bpid, invalidBlks[i]));
}
if (error) {
throw new IOException("Error in deleting blocks.");
}
}
-
- public void notifyNamenodeDeletedBlock(ExtendedBlock block){
- datanode.notifyNamenodeDeletedBlock(block);
- }
- @Override // {@link FsDatasetSpi}
+ @Override // FsDatasetSpi
public synchronized boolean contains(final ExtendedBlock block) {
final long blockId = block.getLocalBlock().getBlockId();
return getFile(block.getBlockPoolId(), blockId) != null;
@@ -2085,7 +1130,7 @@ public class FSDataset implements FsData
@Override // FsDatasetSpi
public void checkDataDir() throws DiskErrorException {
long totalBlocks=0, removedBlocks=0;
- List<FSVolume> failedVols = volumes.checkDirs();
+ List<FsVolumeImpl> failedVols = volumes.checkDirs();
// If there no failed volumes return
if (failedVols == null) {
@@ -2095,16 +1140,16 @@ public class FSDataset implements FsData
// Otherwise remove blocks for the failed volumes
long mlsec = System.currentTimeMillis();
synchronized (this) {
- for (FSVolume fv: failedVols) {
- for (String bpid : fv.map.keySet()) {
+ for (FsVolumeImpl fv: failedVols) {
+ for (String bpid : fv.getBlockPoolList()) {
Iterator<ReplicaInfo> ib = volumeMap.replicas(bpid).iterator();
while(ib.hasNext()) {
ReplicaInfo b = ib.next();
totalBlocks++;
// check if the volume block belongs to still valid
if(b.getVolume() == fv) {
- DataNode.LOG.warn("Removing replica " + bpid + ":" + b.getBlockId()
- + " on failed volume " + fv.currentDir.getAbsolutePath());
+ LOG.warn("Removing replica " + bpid + ":" + b.getBlockId()
+ + " on failed volume " + fv.getCurrentDir().getAbsolutePath());
ib.remove();
removedBlocks++;
}
@@ -2113,16 +1158,15 @@ public class FSDataset implements FsData
}
} // end of sync
mlsec = System.currentTimeMillis() - mlsec;
- DataNode.LOG.warn("Removed " + removedBlocks + " out of " + totalBlocks +
+ LOG.warn("Removed " + removedBlocks + " out of " + totalBlocks +
"(took " + mlsec + " millisecs)");
// report the error
StringBuilder sb = new StringBuilder();
- for (FSVolume fv : failedVols) {
- sb.append(fv.currentDir.getAbsolutePath() + ";");
+ for (FsVolumeImpl fv : failedVols) {
+ sb.append(fv.getCurrentDir().getAbsolutePath() + ";");
}
-
- throw new DiskErrorException("DataNode failed volumes:" + sb);
+ throw new DiskErrorException("DataNode failed volumes:" + sb);
}
@@ -2152,9 +1196,9 @@ public class FSDataset implements FsData
bean = new StandardMBean(this,FSDatasetMBean.class);
mbeanName = MBeans.register("DataNode", "FSDatasetState-" + storageName, bean);
} catch (NotCompliantMBeanException e) {
- DataNode.LOG.warn("Error registering FSDatasetState MBean", e);
+ LOG.warn("Error registering FSDatasetState MBean", e);
}
- DataNode.LOG.info("Registered FSDatasetState MBean");
+ LOG.info("Registered FSDatasetState MBean");
}
@Override // FsDatasetSpi
@@ -2221,7 +1265,7 @@ public class FSDataset implements FsData
// If metadata file exists then delete it
if (diskMetaFile != null && diskMetaFile.exists()
&& diskMetaFile.delete()) {
- DataNode.LOG.warn("Deleted a metadata file without a block "
+ LOG.warn("Deleted a metadata file without a block "
+ diskMetaFile.getAbsolutePath());
}
return;
@@ -2230,15 +1274,16 @@ public class FSDataset implements FsData
// Block is in memory and not on the disk
// Remove the block from volumeMap
volumeMap.remove(bpid, blockId);
- if (datanode.blockScanner != null) {
- datanode.blockScanner.deleteBlock(bpid, new Block(blockId));
+ final DataBlockScanner blockScanner = datanode.getBlockScanner();
+ if (blockScanner != null) {
+ blockScanner.deleteBlock(bpid, new Block(blockId));
}
- DataNode.LOG.warn("Removed block " + blockId
+ LOG.warn("Removed block " + blockId
+ " from memory with missing block file on the disk");
// Finally remove the metadata file
if (diskMetaFile != null && diskMetaFile.exists()
&& diskMetaFile.delete()) {
- DataNode.LOG.warn("Deleted a metadata file for the deleted block "
+ LOG.warn("Deleted a metadata file for the deleted block "
+ diskMetaFile.getAbsolutePath());
}
}
@@ -2252,10 +1297,11 @@ public class FSDataset implements FsData
ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId,
diskFile.length(), diskGS, vol, diskFile.getParentFile());
volumeMap.add(bpid, diskBlockInfo);
- if (datanode.blockScanner != null) {
- datanode.blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
+ final DataBlockScanner blockScanner = datanode.getBlockScanner();
+ if (blockScanner != null) {
+ blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
}
- DataNode.LOG.warn("Added missing block to memory " + diskBlockInfo);
+ LOG.warn("Added missing block to memory " + diskBlockInfo);
return;
}
/*
@@ -2265,7 +1311,7 @@ public class FSDataset implements FsData
File memFile = memBlockInfo.getBlockFile();
if (memFile.exists()) {
if (memFile.compareTo(diskFile) != 0) {
- DataNode.LOG.warn("Block file " + memFile.getAbsolutePath()
+ LOG.warn("Block file " + memFile.getAbsolutePath()
+ " does not match file found by scan "
+ diskFile.getAbsolutePath());
// TODO: Should the diskFile be deleted?
@@ -2275,25 +1321,25 @@ public class FSDataset implements FsData
// Update the block with the file found on the disk. Since the block
// file and metadata file are found as a pair on the disk, update
// the block based on the metadata file found on the disk
- DataNode.LOG.warn("Block file in volumeMap "
+ LOG.warn("Block file in volumeMap "
+ memFile.getAbsolutePath()
+ " does not exist. Updating it to the file found during scan "
+ diskFile.getAbsolutePath());
memBlockInfo.setDir(diskFile.getParentFile());
memFile = diskFile;
- DataNode.LOG.warn("Updating generation stamp for block " + blockId
+ LOG.warn("Updating generation stamp for block " + blockId
+ " from " + memBlockInfo.getGenerationStamp() + " to " + diskGS);
memBlockInfo.setGenerationStamp(diskGS);
}
// Compare generation stamp
if (memBlockInfo.getGenerationStamp() != diskGS) {
- File memMetaFile = DatanodeUtil.getMetaFile(diskFile,
+ File memMetaFile = FsDatasetUtil.getMetaFile(diskFile,
memBlockInfo.getGenerationStamp());
if (memMetaFile.exists()) {
if (memMetaFile.compareTo(diskMetaFile) != 0) {
- DataNode.LOG.warn("Metadata file in memory "
+ LOG.warn("Metadata file in memory "
+ memMetaFile.getAbsolutePath()
+ " does not match file found by scan "
+ (diskMetaFile == null? null: diskMetaFile.getAbsolutePath()));
@@ -2306,7 +1352,7 @@ public class FSDataset implements FsData
&& diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
: GenerationStamp.GRANDFATHER_GENERATION_STAMP;
- DataNode.LOG.warn("Updating generation stamp for block " + blockId
+ LOG.warn("Updating generation stamp for block " + blockId
+ " from " + memBlockInfo.getGenerationStamp() + " to " + gs);
memBlockInfo.setGenerationStamp(gs);
@@ -2317,7 +1363,7 @@ public class FSDataset implements FsData
if (memBlockInfo.getNumBytes() != memFile.length()) {
// Update the length based on the block file
corruptBlock = new Block(memBlockInfo);
- DataNode.LOG.warn("Updating size of block " + blockId + " from "
+ LOG.warn("Updating size of block " + blockId + " from "
+ memBlockInfo.getNumBytes() + " to " + memFile.length());
memBlockInfo.setNumBytes(memFile.length());
}
@@ -2325,12 +1371,12 @@ public class FSDataset implements FsData
// Send corrupt block report outside the lock
if (corruptBlock != null) {
- DataNode.LOG.warn("Reporting the block " + corruptBlock
+ LOG.warn("Reporting the block " + corruptBlock
+ " as corrupt due to length mismatch");
try {
datanode.reportBadBlocks(new ExtendedBlock(bpid, corruptBlock));
} catch (IOException e) {
- DataNode.LOG.warn("Failed to repot bad block " + corruptBlock, e);
+ LOG.warn("Failed to repot bad block " + corruptBlock, e);
}
}
}
@@ -2359,9 +1405,9 @@ public class FSDataset implements FsData
/** static version of {@link #initReplicaRecovery(Block, long)}. */
static ReplicaRecoveryInfo initReplicaRecovery(String bpid,
- ReplicasMap map, Block block, long recoveryId) throws IOException {
+ ReplicaMap map, Block block, long recoveryId) throws IOException {
final ReplicaInfo replica = map.get(bpid, block.getBlockId());
- DataNode.LOG.info("initReplicaRecovery: block=" + block
+ LOG.info("initReplicaRecovery: block=" + block
+ ", recoveryId=" + recoveryId
+ ", replica=" + replica);
@@ -2410,13 +1456,13 @@ public class FSDataset implements FsData
}
final long oldRecoveryID = rur.getRecoveryID();
rur.setRecoveryID(recoveryId);
- DataNode.LOG.info("initReplicaRecovery: update recovery id for " + block
+ LOG.info("initReplicaRecovery: update recovery id for " + block
+ " from " + oldRecoveryID + " to " + recoveryId);
}
else {
rur = new ReplicaUnderRecovery(replica, recoveryId);
map.add(bpid, rur);
- DataNode.LOG.info("initReplicaRecovery: changing replica state for "
+ LOG.info("initReplicaRecovery: changing replica state for "
+ block + " from " + replica.getState()
+ " to " + rur.getState());
}
@@ -2431,7 +1477,7 @@ public class FSDataset implements FsData
//get replica
final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
- DataNode.LOG.info("updateReplica: block=" + oldBlock
+ LOG.info("updateReplica: block=" + oldBlock
+ ", recoveryId=" + recoveryId
+ ", length=" + newlength
+ ", replica=" + replica);
@@ -2518,16 +1564,18 @@ public class FSDataset implements FsData
return replica.getVisibleLength();
}
+ @Override
public synchronized void addBlockPool(String bpid, Configuration conf)
throws IOException {
- DataNode.LOG.info("Adding block pool " + bpid);
+ LOG.info("Adding block pool " + bpid);
volumes.addBlockPool(bpid, conf);
volumeMap.initBlockPool(bpid);
volumes.getVolumeMap(bpid, volumeMap);
}
-
+
+ @Override
public synchronized void shutdownBlockPool(String bpid) {
- DataNode.LOG.info("Removing block pool " + bpid);
+ LOG.info("Removing block pool " + bpid);
volumeMap.cleanUpBlockPool(bpid);
volumes.removeBlockPool(bpid);
}
@@ -2546,30 +1594,29 @@ public class FSDataset implements FsData
final long freeSpace;
final long reservedSpace;
- VolumeInfo(String dir, long usedSpace, long freeSpace, long reservedSpace) {
- this.directory = dir;
+ VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) {
+ this.directory = v.toString();
this.usedSpace = usedSpace;
this.freeSpace = freeSpace;
- this.reservedSpace = reservedSpace;
+ this.reservedSpace = v.getReserved();
}
}
private Collection<VolumeInfo> getVolumeInfo() {
Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
- for (FSVolume volume : volumes.volumes) {
+ for (FsVolumeImpl volume : volumes.volumes) {
long used = 0;
long free = 0;
try {
used = volume.getDfsUsed();
free = volume.getAvailable();
} catch (IOException e) {
- DataNode.LOG.warn(e.getMessage());
+ LOG.warn(e.getMessage());
used = 0;
free = 0;
}
- info.add(new VolumeInfo(volume.toString(), used, free,
- volume.getReserved()));
+ info.add(new VolumeInfo(volume, used, free));
}
return info;
}
@@ -2592,16 +1639,15 @@ public class FSDataset implements FsData
public synchronized void deleteBlockPool(String bpid, boolean force)
throws IOException {
if (!force) {
- for (FSVolume volume : volumes.volumes) {
+ for (FsVolumeImpl volume : volumes.volumes) {
if (!volume.isBPDirEmpty(bpid)) {
- DataNode.LOG.warn(bpid
- + " has some block files, cannot delete unless forced");
+ LOG.warn(bpid + " has some block files, cannot delete unless forced");
throw new IOException("Cannot delete block pool, "
+ "it contains some block files");
}
}
}
- for (FSVolume volume : volumes.volumes) {
+ for (FsVolumeImpl volume : volumes.volumes) {
volume.deleteBPDirectories(bpid, force);
}
}
@@ -2610,7 +1656,7 @@ public class FSDataset implements FsData
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException {
File datafile = getBlockFile(block);
- File metafile = DatanodeUtil.getMetaFile(datafile, block.getGenerationStamp());
+ File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp());
BlockLocalPathInfo info = new BlockLocalPathInfo(block,
datafile.getAbsolutePath(), metafile.getAbsolutePath());
return info;
@@ -2620,8 +1666,8 @@ public class FSDataset implements FsData
public RollingLogs createRollingLogs(String bpid, String prefix
) throws IOException {
String dir = null;
- final List<FSVolume> volumes = getVolumes();
- for (FSVolume vol : volumes) {
+ final List<FsVolumeImpl> volumes = getVolumes();
+ for (FsVolumeImpl vol : volumes) {
String bpDir = vol.getPath(bpid);
if (RollingLogsImpl.isFilePresent(bpDir, prefix)) {
dir = bpDir;
@@ -2633,202 +1679,4 @@ public class FSDataset implements FsData
}
return new RollingLogsImpl(dir, prefix);
}
-
- static class RollingLogsImpl implements RollingLogs {
- private static final String CURR_SUFFIX = ".curr";
- private static final String PREV_SUFFIX = ".prev";
-
- static boolean isFilePresent(String dir, String filePrefix) {
- return new File(dir, filePrefix + CURR_SUFFIX).exists() ||
- new File(dir, filePrefix + PREV_SUFFIX).exists();
- }
-
- private final File curr;
- private final File prev;
- private PrintStream out; //require synchronized access
-
- private Appender appender = new Appender() {
- @Override
- public Appendable append(CharSequence csq) {
- synchronized(RollingLogsImpl.this) {
- if (out == null) {
- throw new IllegalStateException(RollingLogsImpl.this
- + " is not yet opened.");
- }
- out.print(csq);
- }
- return this;
- }
-
- @Override
- public Appendable append(char c) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Appendable append(CharSequence csq, int start, int end) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() {
- synchronized(RollingLogsImpl.this) {
- if (out != null) {
- out.close();
- out = null;
- }
- }
- }
- };
-
-
- private final AtomicInteger numReaders = new AtomicInteger();
-
- private RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{
- curr = new File(dir, filePrefix + CURR_SUFFIX);
- prev = new File(dir, filePrefix + PREV_SUFFIX);
- out = new PrintStream(new FileOutputStream(curr, true));
- }
-
- @Override
- public Reader iterator(boolean skipPrevFile) throws IOException {
- numReaders.incrementAndGet();
- return new Reader(skipPrevFile);
- }
-
- @Override
- public Appender appender() {
- return appender;
- }
-
- @Override
- public boolean roll() throws IOException {
- if (numReaders.get() > 0) {
- return false;
- }
- if (!prev.delete() && prev.exists()) {
- throw new IOException("Failed to delete " + prev);
- }
-
- synchronized(this) {
- appender.close();
- final boolean renamed = curr.renameTo(prev);
- out = new PrintStream(new FileOutputStream(curr, true));
- if (!renamed) {
- throw new IOException("Failed to rename " + curr + " to " + prev);
- }
- }
- return true;
- }
-
- @Override
- public String toString() {
- return curr.toString();
- }
-
- /**
- * This is used to read the lines in order.
- * If the data is not read completely (i.e, untill hasNext() returns
- * false), it needs to be explicitly
- */
- private class Reader implements RollingLogs.LineIterator {
- private File file;
- private BufferedReader reader;
- private String line;
- private boolean closed = false;
-
- private Reader(boolean skipPrevFile) throws IOException {
- reader = null;
- file = skipPrevFile? curr : prev;
- readNext();
- }
-
- @Override
- public boolean isPrevious() {
- return file == prev;
- }
-
- private boolean openFile() throws IOException {
-
- for(int i=0; i<2; i++) {
- if (reader != null || i > 0) {
- // move to next file
- file = isPrevious()? curr : null;
- }
- if (file == null) {
- return false;
- }
- if (file.exists()) {
- break;
- }
- }
-
- if (reader != null ) {
- reader.close();
- reader = null;
- }
-
- reader = new BufferedReader(new FileReader(file));
- return true;
- }
-
- // read next line if possible.
- private void readNext() throws IOException {
- line = null;
- try {
- if (reader != null && (line = reader.readLine()) != null) {
- return;
- }
- if (line == null) {
- // move to the next file.
- if (openFile()) {
- readNext();
- }
- }
- } finally {
- if (!hasNext()) {
- close();
- }
- }
- }
-
- @Override
- public boolean hasNext() {
- return line != null;
- }
-
- @Override
- public String next() {
- String curLine = line;
- try {
- readNext();
- } catch (IOException e) {
- DataBlockScanner.LOG.warn("Failed to read next line.", e);
- }
- return curLine;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() throws IOException {
- if (!closed) {
- try {
- if (reader != null) {
- reader.close();
- }
- } finally {
- file = null;
- reader = null;
- closed = true;
- final int n = numReaders.decrementAndGet();
- assert(n >= 0);
- }
- }
- }
- }
- }
}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java?rev=1308437&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java Mon Apr 2 17:38:56 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+
+/** Utility methods. */
+@InterfaceAudience.Private
+public class FsDatasetUtil {
+ static boolean isUnlinkTmpFile(File f) {
+ return f.getName().endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX);
+ }
+
+ static File getOrigFile(File unlinkTmpFile) {
+ final String name = unlinkTmpFile.getName();
+ if (!name.endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX)) {
+ throw new IllegalArgumentException("unlinkTmpFile=" + unlinkTmpFile
+ + " does not end with " + DatanodeUtil.UNLINK_BLOCK_SUFFIX);
+ }
+ final int n = name.length() - DatanodeUtil.UNLINK_BLOCK_SUFFIX.length();
+ return new File(unlinkTmpFile.getParentFile(), name.substring(0, n));
+ }
+
+ static File getMetaFile(File f, long gs) {
+ return new File(f.getParent(),
+ DatanodeUtil.getMetaName(f.getName(), gs));
+ }
+
+ /** Find the corresponding meta data file from a given block file */
+ static File findMetaFile(final File blockFile) throws IOException {
+ final String prefix = blockFile.getName() + "_";
+ final File parent = blockFile.getParentFile();
+ final File[] matches = parent.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return dir.equals(parent) && name.startsWith(prefix)
+ && name.endsWith(Block.METADATA_EXTENSION);
+ }
+ });
+
+ if (matches == null || matches.length == 0) {
+ throw new IOException("Meta file not found, blockFile=" + blockFile);
+ }
+ if (matches.length > 1) {
+ throw new IOException("Found more than one meta files: "
+ + Arrays.asList(matches));
+ }
+ return matches[0];
+ }
+
+ /**
+ * Find the meta-file for the specified block file
+ * and then return the generation stamp from the name of the meta-file.
+ */
+ static long getGenerationStampFromFile(File[] listdir, File blockFile) {
+ String blockName = blockFile.getName();
+ for (int j = 0; j < listdir.length; j++) {
+ String path = listdir[j].getName();
+ if (!path.startsWith(blockName)) {
+ continue;
+ }
+ if (blockFile == listdir[j]) {
+ continue;
+ }
+ return Block.getGenerationStamp(listdir[j].getName());
+ }
+ FsDatasetImpl.LOG.warn("Block " + blockFile + " does not have a metafile!");
+ return GenerationStamp.GRANDFATHER_GENERATION_STAMP;
+ }
+
+ /** Find the corresponding meta data file from a given block file */
+ static long parseGenerationStamp(File blockFile, File metaFile
+ ) throws IOException {
+ final String metaname = metaFile.getName();
+ final String gs = metaname.substring(blockFile.getName().length() + 1,
+ metaname.length() - Block.METADATA_EXTENSION.length());
+ try {
+ return Long.parseLong(gs);
+ } catch(NumberFormatException nfe) {
+ throw new IOException("Failed to parse generation stamp: blockFile="
+ + blockFile + ", metaFile=" + metaFile, nfe);
+ }
+ }
+}