You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2016/12/15 09:05:04 UTC
[36/50] [abbrv] hadoop git commit: HDFS-10958. Add instrumentation
hooks around Datanode disk IO.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index dc63238..d3006c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -45,6 +45,10 @@ abstract public class ReplicaInfo extends Block
/** volume where the replica belongs. */
private FsVolumeSpi volume;
+ /** This is used by some tests and FsDatasetUtil#computeChecksum. */
+ private static final FileIoProvider DEFAULT_FILE_IO_PROVIDER =
+ new FileIoProvider(null);
+
/**
* Constructor
* @param vol volume where replica is located
@@ -64,7 +68,18 @@ abstract public class ReplicaInfo extends Block
public FsVolumeSpi getVolume() {
return volume;
}
-
+
+ /**
+ * Get the {@link FileIoProvider} for disk IO operations.
+ */
+ public FileIoProvider getFileIoProvider() {
+ // In tests and when invoked via FsDatasetUtil#computeChecksum, the
+ // target volume for this replica may be unknown and hence null.
+ // Use the DEFAULT_FILE_IO_PROVIDER with no-op hooks.
+ return (volume != null) ? volume.getFileIoProvider()
+ : DEFAULT_FILE_IO_PROVIDER;
+ }
+
/**
* Set the volume where this replica is located on disk.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index a11a207..4947ecf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
@@ -418,4 +419,6 @@ public interface FsVolumeSpi
*/
class VolumeCheckContext {
}
+
+ FileIoProvider getFileIoProvider();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
index 54d0e96..f40315a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
@@ -24,8 +24,8 @@ import java.io.InputStream;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.slf4j.Logger;
@@ -38,12 +38,15 @@ public class ReplicaInputStreams implements Closeable {
private InputStream dataIn;
private InputStream checksumIn;
private FsVolumeReference volumeRef;
+ private final FileIoProvider fileIoProvider;
private FileDescriptor dataInFd = null;
/** Create an object with a data input stream and a checksum input stream. */
- public ReplicaInputStreams(InputStream dataStream,
- InputStream checksumStream, FsVolumeReference volumeRef) {
+ public ReplicaInputStreams(
+ InputStream dataStream, InputStream checksumStream,
+ FsVolumeReference volumeRef, FileIoProvider fileIoProvider) {
this.volumeRef = volumeRef;
+ this.fileIoProvider = fileIoProvider;
this.dataIn = dataStream;
this.checksumIn = checksumStream;
if (dataIn instanceof FileInputStream) {
@@ -103,7 +106,7 @@ public class ReplicaInputStreams implements Closeable {
public void dropCacheBehindReads(String identifier, long offset, long len,
int flags) throws NativeIOException {
assert this.dataInFd != null : "null dataInFd!";
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+ fileIoProvider.posixFadvise(getVolumeRef().getVolume(),
identifier, dataInFd, offset, len, flags);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
index a66847a..1614ba2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
@@ -24,11 +24,10 @@ import java.io.OutputStream;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
/**
@@ -43,21 +42,22 @@ public class ReplicaOutputStreams implements Closeable {
/** Stream to checksum. */
private final OutputStream checksumOut;
private final DataChecksum checksum;
- private final boolean isTransientStorage;
- private final long slowLogThresholdMs;
+ private final FsVolumeSpi volume;
+ private final FileIoProvider fileIoProvider;
/**
* Create an object with a data output stream, a checksum output stream
* and a checksum.
*/
- public ReplicaOutputStreams(OutputStream dataOut,
- OutputStream checksumOut, DataChecksum checksum,
- boolean isTransientStorage, long slowLogThresholdMs) {
+ public ReplicaOutputStreams(
+ OutputStream dataOut, OutputStream checksumOut, DataChecksum checksum,
+ FsVolumeSpi volume, FileIoProvider fileIoProvider) {
+
this.dataOut = dataOut;
this.checksum = checksum;
- this.slowLogThresholdMs = slowLogThresholdMs;
- this.isTransientStorage = isTransientStorage;
this.checksumOut = checksumOut;
+ this.volume = volume;
+ this.fileIoProvider = fileIoProvider;
try {
if (this.dataOut instanceof FileOutputStream) {
@@ -93,7 +93,7 @@ public class ReplicaOutputStreams implements Closeable {
/** @return is writing to a transient storage? */
public boolean isTransientStorage() {
- return isTransientStorage;
+ return volume.isTransientStorage();
}
@Override
@@ -112,7 +112,7 @@ public class ReplicaOutputStreams implements Closeable {
*/
public void syncDataOut() throws IOException {
if (dataOut instanceof FileOutputStream) {
- sync((FileOutputStream)dataOut);
+ fileIoProvider.sync(volume, (FileOutputStream) dataOut);
}
}
@@ -121,7 +121,7 @@ public class ReplicaOutputStreams implements Closeable {
*/
public void syncChecksumOut() throws IOException {
if (checksumOut instanceof FileOutputStream) {
- sync((FileOutputStream)checksumOut);
+ fileIoProvider.sync(volume, (FileOutputStream) checksumOut);
}
}
@@ -129,60 +129,34 @@ public class ReplicaOutputStreams implements Closeable {
* Flush the data stream if it supports it.
*/
public void flushDataOut() throws IOException {
- flush(dataOut);
+ if (dataOut != null) {
+ fileIoProvider.flush(volume, dataOut);
+ }
}
/**
* Flush the checksum stream if it supports it.
*/
public void flushChecksumOut() throws IOException {
- flush(checksumOut);
- }
-
- private void flush(OutputStream dos) throws IOException {
- long begin = Time.monotonicNow();
- dos.flush();
- long duration = Time.monotonicNow() - begin;
- LOG.trace("ReplicaOutputStreams#flush takes {} ms.", duration);
- if (duration > slowLogThresholdMs) {
- LOG.warn("Slow flush took {} ms (threshold={} ms)", duration,
- slowLogThresholdMs);
+ if (checksumOut != null) {
+ fileIoProvider.flush(volume, checksumOut);
}
}
- private void sync(FileOutputStream fos) throws IOException {
- long begin = Time.monotonicNow();
- fos.getChannel().force(true);
- long duration = Time.monotonicNow() - begin;
- LOG.trace("ReplicaOutputStreams#sync takes {} ms.", duration);
- if (duration > slowLogThresholdMs) {
- LOG.warn("Slow fsync took {} ms (threshold={} ms)", duration,
- slowLogThresholdMs);
- }
- }
-
- public long writeToDisk(byte[] b, int off, int len) throws IOException {
- long begin = Time.monotonicNow();
+ public void writeDataToDisk(byte[] b, int off, int len)
+ throws IOException {
dataOut.write(b, off, len);
- long duration = Time.monotonicNow() - begin;
- LOG.trace("DatanodeIO#writeToDisk takes {} ms.", duration);
- if (duration > slowLogThresholdMs) {
- LOG.warn("Slow BlockReceiver write data to disk cost: {} ms " +
- "(threshold={} ms)", duration, slowLogThresholdMs);
- }
- return duration;
}
public void syncFileRangeIfPossible(long offset, long nbytes,
int flags) throws NativeIOException {
- assert this.outFd != null : "null outFd!";
- NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, nbytes, flags);
+ fileIoProvider.syncFileRange(
+ volume, outFd, offset, nbytes, flags);
}
public void dropCacheBehindWrites(String identifier,
long offset, long len, int flags) throws NativeIOException {
- assert this.outFd != null : "null outFd!";
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
- identifier, outFd, offset, len, flags);
+ fileIoProvider.posixFadvise(
+ volume, identifier, outFd, offset, len, flags);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 63e82f3..8273ebb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -32,13 +32,11 @@ import java.util.Iterator;
import java.util.Scanner;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.GetSpaceUsed;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
@@ -46,10 +44,10 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
-import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
@@ -64,7 +62,6 @@ import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.Timer;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.Files;
/**
* A block pool slice represents a portion of a block pool stored on a volume.
@@ -96,6 +93,7 @@ class BlockPoolSlice {
private final long cachedDfsUsedCheckTime;
private final Timer timer;
private final int maxDataLength;
+ private final FileIoProvider fileIoProvider;
// TODO:FEDERATION scalability issue - a thread per DU is needed
private final GetSpaceUsed dfsUsage;
@@ -113,6 +111,7 @@ class BlockPoolSlice {
Configuration conf, Timer timer) throws IOException {
this.bpid = bpid;
this.volume = volume;
+ this.fileIoProvider = volume.getFileIoProvider();
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
this.finalizedDir = new File(
currentDir, DataStorage.STORAGE_DIR_FINALIZED);
@@ -147,19 +146,14 @@ class BlockPoolSlice {
//
this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
if (tmpDir.exists()) {
- DataStorage.fullyDelete(tmpDir);
+ fileIoProvider.fullyDelete(volume, tmpDir);
}
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
- if (!rbwDir.mkdirs()) { // create rbw directory if not exist
- if (!rbwDir.isDirectory()) {
- throw new IOException("Mkdirs failed to create " + rbwDir.toString());
- }
- }
- if (!tmpDir.mkdirs()) {
- if (!tmpDir.isDirectory()) {
- throw new IOException("Mkdirs failed to create " + tmpDir.toString());
- }
- }
+
+ // create the rbw and tmp directories if they don't exist.
+ fileIoProvider.mkdirs(volume, rbwDir);
+ fileIoProvider.mkdirs(volume, tmpDir);
+
// Use cached value initially if available. Or the following call will
// block until the initial du command completes.
this.dfsUsage = new CachingGetSpaceUsed.Builder().setPath(bpDir)
@@ -266,7 +260,7 @@ class BlockPoolSlice {
*/
void saveDfsUsed() {
File outFile = new File(currentDir, DU_CACHE_FILE);
- if (outFile.exists() && !outFile.delete()) {
+ if (!fileIoProvider.deleteWithExistsCheck(volume, outFile)) {
FsDatasetImpl.LOG.warn("Failed to delete old dfsUsed file in " +
outFile.getParent());
}
@@ -277,7 +271,7 @@ class BlockPoolSlice {
new FileOutputStream(outFile), "UTF-8")) {
// mtime is written last, so that truncated writes won't be valid.
out.write(Long.toString(used) + " " + Long.toString(timer.now()));
- out.flush();
+ fileIoProvider.flush(volume, out);
}
} catch (IOException ioe) {
// If write failed, the volume might be bad. Since the cache file is
@@ -292,7 +286,8 @@ class BlockPoolSlice {
*/
File createTmpFile(Block b) throws IOException {
File f = new File(tmpDir, b.getBlockName());
- File tmpFile = DatanodeUtil.createTmpFile(b, f);
+ File tmpFile = DatanodeUtil.createFileWithExistsCheck(
+ volume, b, f, fileIoProvider);
// If any exception during creation, its expected that counter will not be
// incremented, So no need to decrement
incrNumBlocks();
@@ -305,7 +300,8 @@ class BlockPoolSlice {
*/
File createRbwFile(Block b) throws IOException {
File f = new File(rbwDir, b.getBlockName());
- File rbwFile = DatanodeUtil.createTmpFile(b, f);
+ File rbwFile = DatanodeUtil.createFileWithExistsCheck(
+ volume, b, f, fileIoProvider);
// If any exception during creation, its expected that counter will not be
// incremented, So no need to decrement
incrNumBlocks();
@@ -314,11 +310,7 @@ class BlockPoolSlice {
File addFinalizedBlock(Block b, ReplicaInfo replicaInfo) throws IOException {
File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
- if (!blockDir.exists()) {
- if (!blockDir.mkdirs()) {
- throw new IOException("Failed to mkdirs " + blockDir);
- }
- }
+ fileIoProvider.mkdirsWithExistsCheck(volume, blockDir);
File blockFile = FsDatasetImpl.moveBlockFiles(b, replicaInfo, blockDir);
File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
if (dfsUsage instanceof CachingGetSpaceUsed) {
@@ -340,9 +332,9 @@ class BlockPoolSlice {
final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId);
final File targetBlockFile = new File(blockDir, blockFile.getName());
final File targetMetaFile = new File(blockDir, metaFile.getName());
- FileUtils.moveFile(blockFile, targetBlockFile);
+ fileIoProvider.moveFile(volume, blockFile, targetBlockFile);
FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile);
- FileUtils.moveFile(metaFile, targetMetaFile);
+ fileIoProvider.moveFile(volume, metaFile, targetMetaFile);
FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile);
ReplicaInfo newReplicaInfo =
@@ -394,16 +386,13 @@ class BlockPoolSlice {
File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp);
if (blockFile.exists()) {
// If the original block file still exists, then no recovery is needed.
- if (!unlinkedTmp.delete()) {
+ if (!fileIoProvider.delete(volume, unlinkedTmp)) {
throw new IOException("Unable to cleanup unlinked tmp file " +
unlinkedTmp);
}
return null;
} else {
- if (!unlinkedTmp.renameTo(blockFile)) {
- throw new IOException("Unable to rename unlinked tmp file " +
- unlinkedTmp);
- }
+ fileIoProvider.rename(volume, unlinkedTmp, blockFile);
return blockFile;
}
}
@@ -416,7 +405,7 @@ class BlockPoolSlice {
*/
private int moveLazyPersistReplicasToFinalized(File source)
throws IOException {
- File files[] = FileUtil.listFiles(source);
+ File[] files = fileIoProvider.listFiles(volume, source);
int numRecovered = 0;
for (File file : files) {
if (file.isDirectory()) {
@@ -431,24 +420,25 @@ class BlockPoolSlice {
if (blockFile.exists()) {
- if (!targetDir.exists() && !targetDir.mkdirs()) {
+ try {
+ fileIoProvider.mkdirsWithExistsCheck(volume, targetDir);
+ } catch(IOException ioe) {
LOG.warn("Failed to mkdirs " + targetDir);
continue;
}
final File targetMetaFile = new File(targetDir, metaFile.getName());
try {
- LocalReplica.rename(metaFile, targetMetaFile);
+ fileIoProvider.rename(volume, metaFile, targetMetaFile);
} catch (IOException e) {
LOG.warn("Failed to move meta file from "
+ metaFile + " to " + targetMetaFile, e);
continue;
-
}
final File targetBlockFile = new File(targetDir, blockFile.getName());
try {
- LocalReplica.rename(blockFile, targetBlockFile);
+ fileIoProvider.rename(volume, blockFile, targetBlockFile);
} catch (IOException e) {
LOG.warn("Failed to move block file from "
+ blockFile + " to " + targetBlockFile, e);
@@ -465,7 +455,7 @@ class BlockPoolSlice {
}
}
- FileUtil.fullyDelete(source);
+ fileIoProvider.fullyDelete(volume, source);
return numRecovered;
}
@@ -508,7 +498,7 @@ class BlockPoolSlice {
loadRwr = false;
}
sc.close();
- if (!restartMeta.delete()) {
+ if (!fileIoProvider.delete(volume, restartMeta)) {
FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
restartMeta.getPath());
}
@@ -568,7 +558,7 @@ class BlockPoolSlice {
final RamDiskReplicaTracker lazyWriteReplicaMap,
boolean isFinalized)
throws IOException {
- File files[] = FileUtil.listFiles(dir);
+ File[] files = fileIoProvider.listFiles(volume, dir);
for (File file : files) {
if (file.isDirectory()) {
addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized);
@@ -581,8 +571,9 @@ class BlockPoolSlice {
continue;
}
}
- if (!Block.isBlockFilename(file))
+ if (!Block.isBlockFilename(file)) {
continue;
+ }
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
files, file);
@@ -700,7 +691,8 @@ class BlockPoolSlice {
return 0;
}
try (DataInputStream checksumIn = new DataInputStream(
- new BufferedInputStream(new FileInputStream(metaFile),
+ new BufferedInputStream(
+ fileIoProvider.getFileInputStream(volume, metaFile),
ioFileBufferSize))) {
// read and handle the common header here. For now just a version
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
@@ -713,9 +705,10 @@ class BlockPoolSlice {
if (numChunks == 0) {
return 0;
}
- try (InputStream blockIn = new FileInputStream(blockFile);
+ try (InputStream blockIn = fileIoProvider.getFileInputStream(
+ volume, blockFile);
ReplicaInputStreams ris = new ReplicaInputStreams(blockIn,
- checksumIn, volume.obtainReference())) {
+ checksumIn, volume.obtainReference(), fileIoProvider)) {
ris.skipChecksumFully((numChunks - 1) * checksumSize);
long lastChunkStartPos = (numChunks - 1) * bytesPerChecksum;
ris.skipDataFully(lastChunkStartPos);
@@ -734,7 +727,8 @@ class BlockPoolSlice {
// truncate if extra bytes are present without CRC
if (blockFile.length() > validFileLength) {
try (RandomAccessFile blockRAF =
- new RandomAccessFile(blockFile, "rw")) {
+ fileIoProvider.getRandomAccessFile(
+ volume, blockFile, "rw")) {
// truncate blockFile
blockRAF.setLength(validFileLength);
}
@@ -786,12 +780,14 @@ class BlockPoolSlice {
}
FileInputStream inputStream = null;
try {
- inputStream = new FileInputStream(replicaFile);
+ inputStream = fileIoProvider.getFileInputStream(volume, replicaFile);
BlockListAsLongs blocksList =
BlockListAsLongs.readFrom(inputStream, maxDataLength);
- Iterator<BlockReportReplica> iterator = blocksList.iterator();
- while (iterator.hasNext()) {
- BlockReportReplica replica = iterator.next();
+ if (blocksList == null) {
+ return false;
+ }
+
+ for (BlockReportReplica replica : blocksList) {
switch (replica.getState()) {
case FINALIZED:
addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, true);
@@ -828,7 +824,7 @@ class BlockPoolSlice {
return false;
}
finally {
- if (!replicaFile.delete()) {
+ if (!fileIoProvider.delete(volume, replicaFile)) {
LOG.info("Failed to delete replica cache file: " +
replicaFile.getPath());
}
@@ -842,41 +838,29 @@ class BlockPoolSlice {
blocksListToPersist.getNumberOfBlocks()== 0) {
return;
}
- File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp");
- if (tmpFile.exists() && !tmpFile.delete()) {
- LOG.warn("Failed to delete tmp replicas file in " +
- tmpFile.getPath());
- return;
- }
- File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE);
- if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
- LOG.warn("Failed to delete replicas file in " +
- replicaCacheFile.getPath());
+ final File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp");
+ final File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE);
+ if (!fileIoProvider.deleteWithExistsCheck(volume, tmpFile) ||
+ !fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile)) {
return;
}
FileOutputStream out = null;
try {
- out = new FileOutputStream(tmpFile);
+ out = fileIoProvider.getFileOutputStream(volume, tmpFile);
blocksListToPersist.writeTo(out);
out.close();
// Renaming the tmp file to replicas
- Files.move(tmpFile, replicaCacheFile);
+ fileIoProvider.moveFile(volume, tmpFile, replicaCacheFile);
} catch (Exception e) {
// If write failed, the volume might be bad. Since the cache file is
// not critical, log the error, delete both the files (tmp and cache)
// and continue.
LOG.warn("Failed to write replicas to cache ", e);
- if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
- LOG.warn("Failed to delete replicas file: " +
- replicaCacheFile.getPath());
- }
+ fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile);
} finally {
IOUtils.closeStream(out);
- if (tmpFile.exists() && !tmpFile.delete()) {
- LOG.warn("Failed to delete tmp file in " +
- tmpFile.getPath());
- }
+ fileIoProvider.deleteWithExistsCheck(volume, tmpFile);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index 97dcf8d..416609d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -272,8 +272,10 @@ class FsDatasetAsyncDiskService {
}
File trashDirFile = new File(trashDirectory);
- if (!trashDirFile.exists() && !trashDirFile.mkdirs()) {
- LOG.error("Failed to create trash directory " + trashDirectory);
+ try {
+ volume.getFileIoProvider().mkdirsWithExistsCheck(
+ volume, trashDirFile);
+ } catch (IOException e) {
return false;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 6065df2..35561cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -21,6 +21,7 @@ import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -57,6 +58,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -418,6 +420,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
.setDataset(this)
.setStorageID(sd.getStorageUuid())
.setStorageDirectory(sd)
+ .setFileIoProvider(datanode.getFileIoProvider())
.setConf(this.conf)
.build();
FsVolumeReference ref = fsVolume.obtainReference();
@@ -437,6 +440,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
.setDataset(this)
.setStorageID(storageUuid)
.setStorageDirectory(sd)
+ .setFileIoProvider(datanode.getFileIoProvider())
.setConf(conf)
.build();
}
@@ -818,7 +822,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
InputStream blockInStream = info.getDataInputStream(blkOffset);
try {
InputStream metaInStream = info.getMetadataInputStream(metaOffset);
- return new ReplicaInputStreams(blockInStream, metaInStream, ref);
+ return new ReplicaInputStreams(
+ blockInStream, metaInStream, ref, datanode.getFileIoProvider());
} catch (IOException e) {
IOUtils.cleanup(null, blockInStream);
throw e;
@@ -1027,9 +1032,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
static void computeChecksum(ReplicaInfo srcReplica, File dstMeta,
int smallBufferSize, final Configuration conf)
throws IOException {
- File srcMeta = new File(srcReplica.getMetadataURI());
- final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta,
- DFSUtilClient.getIoFileBufferSize(conf));
+ final File srcMeta = new File(srcReplica.getMetadataURI());
+
+ DataChecksum checksum;
+ try (FileInputStream fis =
+ srcReplica.getFileIoProvider().getFileInputStream(
+ srcReplica.getVolume(), srcMeta)) {
+ checksum = BlockMetadataHeader.readDataChecksum(
+ fis, DFSUtilClient.getIoFileBufferSize(conf), srcMeta);
+ }
+
final byte[] data = new byte[1 << 16];
final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
@@ -2161,16 +2173,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return;
}
- final long diskGS = diskMetaFile != null && diskMetaFile.exists() ?
+ final FileIoProvider fileIoProvider = datanode.getFileIoProvider();
+ final boolean diskMetaFileExists = diskMetaFile != null &&
+ fileIoProvider.exists(vol, diskMetaFile);
+ final boolean diskFileExists = diskFile != null &&
+ fileIoProvider.exists(vol, diskFile);
+
+ final long diskGS = diskMetaFileExists ?
Block.getGenerationStamp(diskMetaFile.getName()) :
- HdfsConstants.GRANDFATHER_GENERATION_STAMP;
+ HdfsConstants.GRANDFATHER_GENERATION_STAMP;
- if (diskFile == null || !diskFile.exists()) {
+ if (!diskFileExists) {
if (memBlockInfo == null) {
// Block file does not exist and block does not exist in memory
// If metadata file exists then delete it
- if (diskMetaFile != null && diskMetaFile.exists()
- && diskMetaFile.delete()) {
+ if (diskMetaFileExists && fileIoProvider.delete(vol, diskMetaFile)) {
LOG.warn("Deleted a metadata file without a block "
+ diskMetaFile.getAbsolutePath());
}
@@ -2186,8 +2203,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.warn("Removed block " + blockId
+ " from memory with missing block file on the disk");
// Finally remove the metadata file
- if (diskMetaFile != null && diskMetaFile.exists()
- && diskMetaFile.delete()) {
+ if (diskMetaFileExists && fileIoProvider.delete(vol, diskMetaFile)) {
LOG.warn("Deleted a metadata file for the deleted block "
+ diskMetaFile.getAbsolutePath());
}
@@ -2223,7 +2239,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Compare block files
if (memBlockInfo.blockDataExists()) {
if (memBlockInfo.getBlockURI().compareTo(diskFile.toURI()) != 0) {
- if (diskMetaFile.exists()) {
+ if (diskMetaFileExists) {
if (memBlockInfo.metadataExists()) {
// We have two sets of block+meta files. Decide which one to
// keep.
@@ -2239,7 +2255,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
memBlockInfo, diskBlockInfo, volumeMap);
}
} else {
- if (!diskFile.delete()) {
+ if (!fileIoProvider.delete(vol, diskFile)) {
LOG.warn("Failed to delete " + diskFile);
}
}
@@ -2278,8 +2294,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// as the block file, then use the generation stamp from it
try {
File memFile = new File(memBlockInfo.getBlockURI());
- long gs = diskMetaFile != null && diskMetaFile.exists()
- && diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
+ long gs = diskMetaFileExists &&
+ diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
: HdfsConstants.GRANDFATHER_GENERATION_STAMP;
LOG.warn("Updating generation stamp for block " + blockId
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
index 563f66a..32759c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
+import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -80,7 +81,7 @@ public class FsDatasetUtil {
return matches[0];
}
- public static FileInputStream openAndSeek(File file, long offset)
+ public static FileDescriptor openAndSeek(File file, long offset)
throws IOException {
RandomAccessFile raf = null;
try {
@@ -88,7 +89,7 @@ public class FsDatasetUtil {
if (offset > 0) {
raf.seek(offset);
}
- return new FileInputStream(raf.getFD());
+ return raf.getFD();
} catch(IOException ioe) {
IOUtils.cleanup(null, raf);
throw ioe;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index c317715..74ee063 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -19,14 +19,12 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
-import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
@@ -46,8 +44,8 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -75,7 +73,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.StringUtils;
@@ -132,6 +129,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
// limit the visible capacity for tests. If negative, then we just
// query from the filesystem.
protected volatile long configuredCapacity;
+ private final FileIoProvider fileIoProvider;
/**
* Per-volume worker pool that processes new blocks to cache.
@@ -141,8 +139,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
*/
protected ThreadPoolExecutor cacheExecutor;
- FsVolumeImpl(FsDatasetImpl dataset, String storageID, StorageDirectory sd,
- Configuration conf) throws IOException {
+ FsVolumeImpl(
+ FsDatasetImpl dataset, String storageID, StorageDirectory sd,
+ FileIoProvider fileIoProvider, Configuration conf) throws IOException {
if (sd.getStorageLocation() == null) {
throw new IOException("StorageLocation specified for storage directory " +
@@ -162,6 +161,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT));
this.configuredCapacity = -1;
this.conf = conf;
+ this.fileIoProvider = fileIoProvider;
cacheExecutor = initializeCacheExecutor(parent);
}
@@ -664,8 +664,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
*/
private String getNextSubDir(String prev, File dir)
throws IOException {
- List<String> children =
- IOUtils.listDirectory(dir, SubdirFilter.INSTANCE);
+ List<String> children = fileIoProvider.listDirectory(
+ FsVolumeImpl.this, dir, SubdirFilter.INSTANCE);
cache = null;
cacheMs = 0;
if (children.size() == 0) {
@@ -718,8 +718,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
File dir = Paths.get(bpidDir.getAbsolutePath(), "current", "finalized",
state.curFinalizedDir, state.curFinalizedSubDir).toFile();
- List<String> entries =
- IOUtils.listDirectory(dir, BlockFileFilter.INSTANCE);
+ List<String> entries = fileIoProvider.listDirectory(
+ FsVolumeImpl.this, dir, BlockFileFilter.INSTANCE);
if (entries.size() == 0) {
entries = null;
} else {
@@ -839,19 +839,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
public void save() throws IOException {
state.lastSavedMs = Time.now();
boolean success = false;
- try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
- new FileOutputStream(getTempSaveFile(), false), "UTF-8"))) {
+ try (BufferedWriter writer = new BufferedWriter(
+ new OutputStreamWriter(fileIoProvider.getFileOutputStream(
+ FsVolumeImpl.this, getTempSaveFile()), "UTF-8"))) {
WRITER.writeValue(writer, state);
success = true;
} finally {
if (!success) {
- if (getTempSaveFile().delete()) {
- LOG.debug("save({}, {}): error deleting temporary file.",
- storageID, bpid);
- }
+ fileIoProvider.delete(FsVolumeImpl.this, getTempSaveFile());
}
}
- Files.move(getTempSaveFile().toPath(), getSaveFile().toPath(),
+ fileIoProvider.move(FsVolumeImpl.this,
+ getTempSaveFile().toPath(), getSaveFile().toPath(),
StandardCopyOption.ATOMIC_MOVE);
if (LOG.isTraceEnabled()) {
LOG.trace("save({}, {}): saved {}", storageID, bpid,
@@ -1042,11 +1041,12 @@ public class FsVolumeImpl implements FsVolumeSpi {
File finalizedDir = new File(bpCurrentDir,
DataStorage.STORAGE_DIR_FINALIZED);
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
- if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive(
- finalizedDir)) {
+ if (fileIoProvider.exists(this, finalizedDir) &&
+ !DatanodeUtil.dirNoFilesRecursive(this, finalizedDir, fileIoProvider)) {
return false;
}
- if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
+ if (fileIoProvider.exists(this, rbwDir) &&
+ fileIoProvider.list(this, rbwDir).length != 0) {
return false;
}
return true;
@@ -1067,35 +1067,38 @@ public class FsVolumeImpl implements FsVolumeSpi {
DataStorage.STORAGE_DIR_LAZY_PERSIST);
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
if (force) {
- DataStorage.fullyDelete(bpDir);
+ fileIoProvider.fullyDelete(this, bpDir);
} else {
- if (!rbwDir.delete()) {
+ if (!fileIoProvider.delete(this, rbwDir)) {
throw new IOException("Failed to delete " + rbwDir);
}
- if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) ||
- !FileUtil.fullyDelete(finalizedDir)) {
+ if (!DatanodeUtil.dirNoFilesRecursive(
+ this, finalizedDir, fileIoProvider) ||
+ !fileIoProvider.fullyDelete(
+ this, finalizedDir)) {
throw new IOException("Failed to delete " + finalizedDir);
}
if (lazypersistDir.exists() &&
- ((!DatanodeUtil.dirNoFilesRecursive(lazypersistDir) ||
- !FileUtil.fullyDelete(lazypersistDir)))) {
+ ((!DatanodeUtil.dirNoFilesRecursive(
+ this, lazypersistDir, fileIoProvider) ||
+ !fileIoProvider.fullyDelete(this, lazypersistDir)))) {
throw new IOException("Failed to delete " + lazypersistDir);
}
- DataStorage.fullyDelete(tmpDir);
- for (File f : FileUtil.listFiles(bpCurrentDir)) {
- if (!f.delete()) {
+ fileIoProvider.fullyDelete(this, tmpDir);
+ for (File f : fileIoProvider.listFiles(this, bpCurrentDir)) {
+ if (!fileIoProvider.delete(this, f)) {
throw new IOException("Failed to delete " + f);
}
}
- if (!bpCurrentDir.delete()) {
+ if (!fileIoProvider.delete(this, bpCurrentDir)) {
throw new IOException("Failed to delete " + bpCurrentDir);
}
- for (File f : FileUtil.listFiles(bpDir)) {
- if (!f.delete()) {
+ for (File f : fileIoProvider.listFiles(this, bpDir)) {
+ if (!fileIoProvider.delete(this, f)) {
throw new IOException("Failed to delete " + f);
}
}
- if (!bpDir.delete()) {
+ if (!fileIoProvider.delete(this, bpDir)) {
throw new IOException("Failed to delete " + bpDir);
}
}
@@ -1118,7 +1121,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
private byte[] loadLastPartialChunkChecksum(
File blockFile, File metaFile) throws IOException {
- DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
+ // readHeader closes the temporary FileInputStream.
+ DataChecksum dcs = BlockMetadataHeader
+ .readHeader(fileIoProvider.getFileInputStream(this, metaFile))
+ .getChecksum();
final int checksumSize = dcs.getChecksumSize();
final long onDiskLen = blockFile.length();
final int bytesPerChecksum = dcs.getBytesPerChecksum();
@@ -1132,7 +1138,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
int offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
(int)(onDiskLen / bytesPerChecksum * checksumSize);
byte[] lastChecksum = new byte[checksumSize];
- try (RandomAccessFile raf = new RandomAccessFile(metaFile, "r")) {
+ try (RandomAccessFile raf = fileIoProvider.getRandomAccessFile(
+ this, metaFile, "r")) {
raf.seek(offsetInChecksum);
raf.read(lastChecksum, 0, checksumSize);
}
@@ -1246,8 +1253,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId);
File blockFile = copiedReplicaFiles[1];
File metaFile = copiedReplicaFiles[0];
- LocalReplica.truncateBlock(blockFile, metaFile,
- rur.getNumBytes(), newlength);
+ LocalReplica.truncateBlock(rur.getVolume(), blockFile, metaFile,
+ rur.getNumBytes(), newlength, fileIoProvider);
LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW)
.setBlockId(newBlockId)
@@ -1283,6 +1290,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
getFinalizedDir(bpid), report, reportCompiler);
}
+ @Override
+ public FileIoProvider getFileIoProvider() {
+ return fileIoProvider;
+ }
+
private LinkedList<ScanInfo> compileReport(File bpFinalizedDir,
File dir, LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException {
@@ -1291,7 +1303,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
List <String> fileNames;
try {
- fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE);
+ fileNames = fileIoProvider.listDirectory(
+ this, dir, BlockDirFilter.INSTANCE);
} catch (IOException ioe) {
LOG.warn("Exception occured while compiling report: ", ioe);
// Initiate a check on disk failure.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
index a1f7e91..5371eda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
/**
* This class is to be used as a builder for {@link FsVolumeImpl} objects.
@@ -31,6 +32,7 @@ public class FsVolumeImplBuilder {
private String storageID;
private StorageDirectory sd;
private Configuration conf;
+ private FileIoProvider fileIoProvider;
public FsVolumeImplBuilder() {
dataset = null;
@@ -59,7 +61,15 @@ public class FsVolumeImplBuilder {
return this;
}
+ FsVolumeImplBuilder setFileIoProvider(FileIoProvider fileIoProvider) {
+ this.fileIoProvider = fileIoProvider;
+ return this;
+ }
+
FsVolumeImpl build() throws IOException {
- return new FsVolumeImpl(dataset, storageID, sd, conf);
+ return new FsVolumeImpl(
+ dataset, storageID, sd,
+ fileIoProvider != null ? fileIoProvider : new FileIoProvider(null),
+ conf);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
index e963d41..20cec6a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
@@ -701,7 +701,7 @@ public class TestFileAppend{
ReplicaBeingWritten rbw =
(ReplicaBeingWritten)replicaHandler.getReplica();
ReplicaOutputStreams
- outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM, 300);
+ outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM);
OutputStream dataOutput = outputStreams.getDataOut();
byte[] appendBytes = new byte[1];
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index ae52905..a0041dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -122,6 +122,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
static final byte[] nullCrcFileData;
private final AutoCloseableLock datasetLock;
+ private final FileIoProvider fileIoProvider;
static {
DataChecksum checksum = DataChecksum.newDataChecksum(
@@ -260,7 +261,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
synchronized public ReplicaOutputStreams createStreams(boolean isCreate,
- DataChecksum requestedChecksum, long slowLogThresholdMs)
+ DataChecksum requestedChecksum)
throws IOException {
if (finalized) {
throw new IOException("Trying to write to a finalized replica "
@@ -268,7 +269,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} else {
SimulatedOutputStream crcStream = new SimulatedOutputStream();
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
- volume.isTransientStorage(), slowLogThresholdMs);
+ volume, fileIoProvider);
}
}
@@ -474,9 +475,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
static class SimulatedVolume implements FsVolumeSpi {
private final SimulatedStorage storage;
+ private final FileIoProvider fileIoProvider;
- SimulatedVolume(final SimulatedStorage storage) {
+ SimulatedVolume(final SimulatedStorage storage,
+ final FileIoProvider fileIoProvider) {
this.storage = storage;
+ this.fileIoProvider = fileIoProvider;
}
@Override
@@ -560,6 +564,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
+ public FileIoProvider getFileIoProvider() {
+ return fileIoProvider;
+ }
+
+ @Override
public VolumeCheckResult check(VolumeCheckContext context)
throws Exception {
return VolumeCheckResult.HEALTHY;
@@ -590,10 +599,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
registerMBean(datanodeUuid);
+ this.fileIoProvider = new FileIoProvider(conf);
this.storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
- this.volume = new SimulatedVolume(this.storage);
+ this.volume = new SimulatedVolume(this.storage, this.fileIoProvider);
this.datasetLock = new AutoCloseableLock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 8439991..619eda0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -673,7 +673,7 @@ public class TestBlockRecovery {
ReplicaOutputStreams streams = null;
try {
streams = replicaInfo.createStreams(true,
- DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
+ DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
streams.getChecksumOut().write('a');
dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index d7c8383..cc0915d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -905,6 +905,11 @@ public class TestDirectoryScanner {
return null;
}
+ @Override
+ public FileIoProvider getFileIoProvider() {
+ return null;
+ }
+
@Override
public VolumeCheckResult check(VolumeCheckContext context)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index fa980c2..4e724bc7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -83,7 +83,7 @@ public class TestSimulatedFSDataset {
ReplicaInPipeline bInfo = fsdataset.createRbw(
StorageType.DEFAULT, b, false).getReplica();
ReplicaOutputStreams out = bInfo.createStreams(true,
- DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
+ DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
try {
OutputStream dataOut = out.getDataOut();
assertEquals(0, fsdataset.getLength(b));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 2417c9d..5cd86e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -134,7 +134,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
@Override
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
long ckoff) throws IOException {
- return new ReplicaInputStreams(null, null, null);
+ return new ReplicaInputStreams(null, null, null, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
index 6fa2830..5c172e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
@@ -58,10 +58,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipeline {
@Override
public ReplicaOutputStreams createStreams(boolean isCreate,
- DataChecksum requestedChecksum, long slowLogThresholdMs)
+ DataChecksum requestedChecksum)
throws IOException {
- return new ReplicaOutputStreams(null, null, requestedChecksum, false,
- slowLogThresholdMs);
+ return new ReplicaOutputStreams(null, null, requestedChecksum,
+ null, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
index 2753a61..e607de5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
@@ -115,6 +116,11 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
}
@Override
+ public FileIoProvider getFileIoProvider() {
+ return null;
+ }
+
+ @Override
public VolumeCheckResult check(VolumeCheckContext context)
throws Exception {
return VolumeCheckResult.HEALTHY;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
index a089d39..3bac7b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
@@ -99,6 +99,8 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
.add(DFSConfigKeys.DFS_DATANODE_STARTUP_KEY);
configurationPropsToSkipCompare
.add(DFSConfigKeys.DFS_NAMENODE_STARTUP_KEY);
+ configurationPropsToSkipCompare
+ .add(DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY);
// Allocate
xmlPropsToSkipCompare = new HashSet<String>();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org