You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/12/18 02:16:47 UTC
[13/29] hadoop git commit: HDFS-10958. Add instrumentation hooks
around Datanode disk IO.
HDFS-10958. Add instrumentation hooks around Datanode disk IO.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6ba9587d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6ba9587d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6ba9587d
Branch: refs/heads/YARN-5085
Commit: 6ba9587d370fbf39c129c08c00ebbb894ccc1389
Parents: 72bff19
Author: Arpit Agarwal <ar...@apache.org>
Authored: Wed Dec 14 11:18:58 2016 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Wed Dec 14 11:18:58 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/io/nativeio/NativeIO.java | 40 +-
.../server/datanode/BlockMetadataHeader.java | 29 +-
.../dev-support/findbugsExcludeFile.xml | 27 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 +
.../hdfs/server/datanode/BlockReceiver.java | 14 +-
.../hdfs/server/datanode/BlockSender.java | 10 +-
.../server/datanode/CountingFileIoEvents.java | 107 ++
.../hadoop/hdfs/server/datanode/DataNode.java | 12 +
.../hdfs/server/datanode/DataNodeMXBean.java | 5 +
.../hdfs/server/datanode/DataStorage.java | 6 +
.../hdfs/server/datanode/DatanodeUtil.java | 19 +-
.../server/datanode/DefaultFileIoEvents.java | 67 ++
.../hdfs/server/datanode/FileIoEvents.java | 97 ++
.../hdfs/server/datanode/FileIoProvider.java | 1006 ++++++++++++++++++
.../hdfs/server/datanode/LocalReplica.java | 133 +--
.../server/datanode/LocalReplicaInPipeline.java | 43 +-
.../hdfs/server/datanode/ReplicaInPipeline.java | 4 +-
.../hdfs/server/datanode/ReplicaInfo.java | 17 +-
.../server/datanode/fsdataset/FsVolumeSpi.java | 3 +
.../datanode/fsdataset/ReplicaInputStreams.java | 11 +-
.../fsdataset/ReplicaOutputStreams.java | 72 +-
.../datanode/fsdataset/impl/BlockPoolSlice.java | 122 +--
.../impl/FsDatasetAsyncDiskService.java | 6 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 46 +-
.../datanode/fsdataset/impl/FsDatasetUtil.java | 5 +-
.../datanode/fsdataset/impl/FsVolumeImpl.java | 89 +-
.../fsdataset/impl/FsVolumeImplBuilder.java | 12 +-
.../org/apache/hadoop/hdfs/TestFileAppend.java | 2 +-
.../server/datanode/SimulatedFSDataset.java | 18 +-
.../hdfs/server/datanode/TestBlockRecovery.java | 2 +-
.../server/datanode/TestDirectoryScanner.java | 5 +
.../server/datanode/TestSimulatedFSDataset.java | 2 +-
.../extdataset/ExternalDatasetImpl.java | 2 +-
.../extdataset/ExternalReplicaInPipeline.java | 6 +-
.../datanode/extdataset/ExternalVolumeImpl.java | 6 +
.../hadoop/tools/TestHdfsConfigFields.java | 2 +
36 files changed, 1684 insertions(+), 365 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index a123f18..f3ff1c7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -742,47 +742,19 @@ public class NativeIO {
}
/**
- * Create a FileInputStream that shares delete permission on the
- * file opened, i.e. other process can delete the file the
- * FileInputStream is reading. Only Windows implementation uses
- * the native interface.
- */
- public static FileInputStream getShareDeleteFileInputStream(File f)
- throws IOException {
- if (!Shell.WINDOWS) {
- // On Linux the default FileInputStream shares delete permission
- // on the file opened.
- //
- return new FileInputStream(f);
- } else {
- // Use Windows native interface to create a FileInputStream that
- // shares delete permission on the file opened.
- //
- FileDescriptor fd = Windows.createFile(
- f.getAbsolutePath(),
- Windows.GENERIC_READ,
- Windows.FILE_SHARE_READ |
- Windows.FILE_SHARE_WRITE |
- Windows.FILE_SHARE_DELETE,
- Windows.OPEN_EXISTING);
- return new FileInputStream(fd);
- }
- }
-
- /**
- * Create a FileInputStream that shares delete permission on the
+ * Create a FileDescriptor that shares delete permission on the
* file opened at a given offset, i.e. other process can delete
- * the file the FileInputStream is reading. Only Windows implementation
+ * the file the FileDescriptor is reading. Only Windows implementation
* uses the native interface.
*/
- public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
- throws IOException {
+ public static FileDescriptor getShareDeleteFileDescriptor(
+ File f, long seekOffset) throws IOException {
if (!Shell.WINDOWS) {
RandomAccessFile rf = new RandomAccessFile(f, "r");
if (seekOffset > 0) {
rf.seek(seekOffset);
}
- return new FileInputStream(rf.getFD());
+ return rf.getFD();
} else {
// Use Windows native interface to create a FileInputStream that
// shares delete permission on the file opened, and set it to the
@@ -797,7 +769,7 @@ public class NativeIO {
NativeIO.Windows.OPEN_EXISTING);
if (seekOffset > 0)
NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN);
- return new FileInputStream(fd);
+ return fd;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
index eb19492..738f496 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
@@ -31,7 +31,6 @@ import java.nio.channels.FileChannel;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
@@ -79,18 +78,15 @@ public class BlockMetadataHeader {
/**
* Read the checksum header from the meta file.
+ * inputStream must be closed by the caller.
* @return the data checksum obtained from the header.
*/
- public static DataChecksum readDataChecksum(File metaFile, int bufSize)
+ public static DataChecksum readDataChecksum(
+ FileInputStream inputStream, int bufSize, File metaFile)
throws IOException {
- DataInputStream in = null;
- try {
- in = new DataInputStream(new BufferedInputStream(
- new FileInputStream(metaFile), bufSize));
- return readDataChecksum(in, metaFile);
- } finally {
- IOUtils.closeStream(in);
- }
+ DataInputStream in = new DataInputStream(new BufferedInputStream(
+ inputStream, bufSize));
+ return readDataChecksum(in, metaFile);
}
/**
@@ -111,6 +107,7 @@ public class BlockMetadataHeader {
/**
* Read the header without changing the position of the FileChannel.
+ * This is used by the client for short-circuit reads.
*
* @param fc The FileChannel to read.
* @return the Metadata Header.
@@ -144,18 +141,16 @@ public class BlockMetadataHeader {
/**
* Reads header at the top of metadata file and returns the header.
+ * Closes the input stream after reading the header.
*
* @return metadata header for the block
* @throws IOException
*/
- public static BlockMetadataHeader readHeader(File file) throws IOException {
- DataInputStream in = null;
- try {
- in = new DataInputStream(new BufferedInputStream(
- new FileInputStream(file)));
+ public static BlockMetadataHeader readHeader(
+ FileInputStream fis) throws IOException {
+ try (DataInputStream in = new DataInputStream(
+ new BufferedInputStream(fis))) {
return readHeader(in);
- } finally {
- IOUtils.closeStream(in);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index e6e4057..3fa4e8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -74,6 +74,33 @@
</Match>
<!--
+ This class exposes stream constructors. The newly created streams are not
+ supposed to be closed in the constructor. Ignore the OBL warning.
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedFileOutputStream" />
+ <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+ </Match>
+
+ <!--
+ This class exposes stream constructors. The newly created streams are not
+ supposed to be closed in the constructor. Ignore the OBL warning.
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedFileInputStream" />
+ <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+ </Match>
+
+ <!--
+ This class exposes stream constructors. The newly created streams are not
+ supposed to be closed in the constructor. Ignore the OBL warning.
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedRandomAccessFile" />
+ <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+ </Match>
+
+ <!--
lastAppliedTxid is carefully unsynchronized in the BackupNode in a couple spots.
See the comments in BackupImage for justification.
-->
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index df21857..cffc4bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -687,6 +687,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
public static final String DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY = "dfs.datanode.fsdataset.volume.choosing.policy";
+ public static final String DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY =
+ "dfs.datanode.fileio.events.class";
public static final String DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold";
public static final long DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT = 1024L * 1024L * 1024L * 10L; // 10 GB
public static final String DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index f372072..441bd91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -244,8 +244,7 @@ class BlockReceiver implements Closeable {
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
- streams = replicaInfo.createStreams(isCreate, requestedChecksum,
- datanodeSlowLogThresholdMs);
+ streams = replicaInfo.createStreams(isCreate, requestedChecksum);
assert streams != null : "null streams!";
// read checksum meta information
@@ -400,9 +399,8 @@ class BlockReceiver implements Closeable {
checksumOut.flush();
long flushEndNanos = System.nanoTime();
if (isSync) {
- long fsyncStartNanos = flushEndNanos;
streams.syncChecksumOut();
- datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
+ datanode.metrics.addFsyncNanos(System.nanoTime() - flushEndNanos);
}
flushTotalNanos += flushEndNanos - flushStartNanos;
}
@@ -703,8 +701,10 @@ class BlockReceiver implements Closeable {
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
// Write data to disk.
- long duration = streams.writeToDisk(dataBuf.array(),
+ long begin = Time.monotonicNow();
+ streams.writeDataToDisk(dataBuf.array(),
startByteToDisk, numBytesToDisk);
+ long duration = Time.monotonicNow() - begin;
if (duration > maxWriteToDiskMs) {
maxWriteToDiskMs = duration;
@@ -1029,9 +1029,7 @@ class BlockReceiver implements Closeable {
* will be overwritten.
*/
private void adjustCrcFilePosition() throws IOException {
- if (streams.getDataOut() != null) {
- streams.flushDataOut();
- }
+ streams.flushDataOut();
if (checksumOut != null) {
checksumOut.flush();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 9182c88..d7aebd8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -166,6 +166,7 @@ class BlockSender implements java.io.Closeable {
private final boolean dropCacheBehindAllReads;
private long lastCacheDropOffset;
+ private final FileIoProvider fileIoProvider;
@VisibleForTesting
static long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
@@ -197,6 +198,7 @@ class BlockSender implements java.io.Closeable {
InputStream blockIn = null;
DataInputStream checksumIn = null;
FsVolumeReference volumeRef = null;
+ this.fileIoProvider = datanode.getFileIoProvider();
try {
this.block = block;
this.corruptChecksumOk = corruptChecksumOk;
@@ -401,7 +403,8 @@ class BlockSender implements java.io.Closeable {
DataNode.LOG.debug("replica=" + replica);
}
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
- ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef);
+ ris = new ReplicaInputStreams(
+ blockIn, checksumIn, volumeRef, fileIoProvider);
} catch (IOException ioe) {
IOUtils.closeStream(this);
throw ioe;
@@ -568,8 +571,9 @@ class BlockSender implements java.io.Closeable {
FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
- sockOut.transferToFully(fileCh, blockInPosition, dataLen,
- waitTime, transferTime);
+ fileIoProvider.transferToSocketFully(
+ ris.getVolumeRef().getVolume(), sockOut, fileCh, blockInPosition,
+ dataLen, waitTime, transferTime);
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
blockInPosition += dataLen;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
new file mode 100644
index 0000000..a70c151
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * {@link FileIoEvents} that simply counts the number of operations.
+ * Not meant to be used outside of testing.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class CountingFileIoEvents implements FileIoEvents {
+ private final Map<OPERATION, Counts> counts;
+
+ private static class Counts {
+ private final AtomicLong successes = new AtomicLong(0);
+ private final AtomicLong failures = new AtomicLong(0);
+
+ @JsonProperty("Successes")
+ public long getSuccesses() {
+ return successes.get();
+ }
+
+ @JsonProperty("Failures")
+ public long getFailures() {
+ return failures.get();
+ }
+ }
+
+ public CountingFileIoEvents() {
+ counts = new HashMap<>();
+ for (OPERATION op : OPERATION.values()) {
+ counts.put(op, new Counts());
+ }
+ }
+
+ @Override
+ public long beforeMetadataOp(
+ @Nullable FsVolumeSpi volume, OPERATION op) {
+ return 0;
+ }
+
+ @Override
+ public void afterMetadataOp(
+ @Nullable FsVolumeSpi volume, OPERATION op, long begin) {
+ counts.get(op).successes.incrementAndGet();
+ }
+
+ @Override
+ public long beforeFileIo(
+ @Nullable FsVolumeSpi volume, OPERATION op, long len) {
+ return 0;
+ }
+
+ @Override
+ public void afterFileIo(
+ @Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) {
+ counts.get(op).successes.incrementAndGet();
+ }
+
+ @Override
+ public void onFailure(
+ @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
+ counts.get(op).failures.incrementAndGet();
+
+ }
+
+ @Override
+ public String getStatistics() {
+ ObjectMapper objectMapper = new ObjectMapper();
+ try {
+ return objectMapper.writeValueAsString(counts);
+ } catch (JsonProcessingException e) {
+ // Failed to serialize. Don't log the exception call stack.
+ FileIoProvider.LOG.error("Failed to serialize statistics" + e);
+ return null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index b845da0..794b1ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -299,6 +299,7 @@ public class DataNode extends ReconfigurableBase
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
private static final String DATANODE_HTRACE_PREFIX = "datanode.htrace.";
+ private final FileIoProvider fileIoProvider;
/**
* Use {@link NetUtils#createSocketAddr(String)} instead.
@@ -411,6 +412,7 @@ public class DataNode extends ReconfigurableBase
this.tracer = createTracer(conf);
this.tracerConfigurationManager =
new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
+ this.fileIoProvider = new FileIoProvider(conf);
this.fileDescriptorPassingDisabledReason = null;
this.maxNumberOfBlocksToLog = 0;
this.confVersion = null;
@@ -437,6 +439,7 @@ public class DataNode extends ReconfigurableBase
this.tracer = createTracer(conf);
this.tracerConfigurationManager =
new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
+ this.fileIoProvider = new FileIoProvider(conf);
this.blockScanner = new BlockScanner(this);
this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
@@ -617,6 +620,10 @@ public class DataNode extends ReconfigurableBase
PipelineAck.ECN.SUPPORTED;
}
+ public FileIoProvider getFileIoProvider() {
+ return fileIoProvider;
+ }
+
/**
* Contains the StorageLocations for changed data volumes.
*/
@@ -3008,6 +3015,11 @@ public class DataNode extends ReconfigurableBase
}
}
+ @Override // DataNodeMXBean
+ public String getFileIoProviderStatistics() {
+ return fileIoProvider.getStatistics();
+ }
+
public void refreshNamenodes(Configuration conf) throws IOException {
blockPoolManager.refreshNamenodes(conf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
index 90c38d7..37f9635 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
@@ -120,4 +120,9 @@ public interface DataNodeMXBean {
* @return DiskBalancer Status
*/
String getDiskBalancerStatus();
+
+ /**
+ * Gets the {@link FileIoProvider} statistics.
+ */
+ String getFileIoProviderStatistics();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index f4deb6d..5163e6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -1356,6 +1356,12 @@ public class DataStorage extends Storage {
bpStorageMap.remove(bpId);
}
+ /**
+ * Prefer FileIoProvider#fullydelete.
+ * @param dir
+ * @return
+ */
+ @Deprecated
public static boolean fullyDelete(final File dir) {
boolean result = FileUtil.fullyDelete(dir);
return result;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
index ad054a8..c98ff54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
/** Provide utility methods for Datanode. */
@@ -55,15 +56,17 @@ public class DatanodeUtil {
* @throws IOException
* if the file already exists or if the file cannot be created.
*/
- public static File createTmpFile(Block b, File f) throws IOException {
- if (f.exists()) {
+ public static File createFileWithExistsCheck(
+ FsVolumeSpi volume, Block b, File f,
+ FileIoProvider fileIoProvider) throws IOException {
+ if (fileIoProvider.exists(volume, f)) {
throw new IOException("Failed to create temporary file for " + b
+ ". File " + f + " should not be present, but is.");
}
// Create the zero-length temp file
final boolean fileCreated;
try {
- fileCreated = f.createNewFile();
+ fileCreated = fileIoProvider.createFile(volume, f);
} catch (IOException ioe) {
throw new IOException(DISK_ERROR + "Failed to create " + f, ioe);
}
@@ -92,13 +95,17 @@ public class DatanodeUtil {
* @return true if there are no files
* @throws IOException if unable to list subdirectories
*/
- public static boolean dirNoFilesRecursive(File dir) throws IOException {
- File[] contents = dir.listFiles();
+ public static boolean dirNoFilesRecursive(
+ FsVolumeSpi volume, File dir,
+ FileIoProvider fileIoProvider) throws IOException {
+ File[] contents = fileIoProvider.listFiles(volume, dir);
if (contents == null) {
throw new IOException("Cannot list contents of " + dir);
}
for (File f : contents) {
- if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) {
+ if (!f.isDirectory() ||
+ (f.isDirectory() && !dirNoFilesRecursive(
+ volume, f, fileIoProvider))) {
return false;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
new file mode 100644
index 0000000..bd4932b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+
+import javax.annotation.Nullable;
+
+/**
+ * The default implementation of {@link FileIoEvents} that do nothing.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class DefaultFileIoEvents implements FileIoEvents {
+ @Override
+ public long beforeMetadataOp(
+ @Nullable FsVolumeSpi volume, OPERATION op) {
+ return 0;
+ }
+
+ @Override
+ public void afterMetadataOp(
+ @Nullable FsVolumeSpi volume, OPERATION op, long begin) {
+ }
+
+ @Override
+ public long beforeFileIo(
+ @Nullable FsVolumeSpi volume, OPERATION op, long len) {
+ return 0;
+ }
+
+ @Override
+ public void afterFileIo(
+ @Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) {
+ }
+
+ @Override
+ public void onFailure(
+ @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
+ }
+
+ @Override
+ public @Nullable String getStatistics() {
+ // null is valid JSON.
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
new file mode 100644
index 0000000..48e703f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+
+import javax.annotation.Nullable;
+
+/**
+ * The following hooks can be implemented for instrumentation/fault
+ * injection.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface FileIoEvents {
+
+ /**
+ * Invoked before a filesystem metadata operation.
+ *
+ * @param volume target volume for the operation. Null if unavailable.
+ * @param op type of operation.
+ * @return timestamp at which the operation was started. 0 if
+ * unavailable.
+ */
+ long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op);
+
+ /**
+ * Invoked after a filesystem metadata operation has completed.
+ *
+ * @param volume target volume for the operation. Null if unavailable.
+ * @param op type of operation.
+ * @param begin timestamp at which the operation was started. 0
+ * if unavailable.
+ */
+ void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op, long begin);
+
+ /**
+ * Invoked before a read/write/flush/channel transfer operation.
+ *
+ * @param volume target volume for the operation. Null if unavailable.
+ * @param op type of operation.
+ * @param len length of the file IO. 0 for flush.
+ * @return timestamp at which the operation was started. 0 if
+ * unavailable.
+ */
+ long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op, long len);
+
+
+ /**
+ * Invoked after a read/write/flush/channel transfer operation
+ * has completed.
+ *
+ * @param volume target volume for the operation. Null if unavailable.
+ * @param op type of operation.
+ * @param len of the file IO. 0 for flush.
+ * @return timestamp at which the operation was started. 0 if
+ * unavailable.
+ */
+ void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
+ long begin, long len);
+
+ /**
+ * Invoked if an operation fails with an exception.
+ * @param volume target volume for the operation. Null if unavailable.
+ * @param op type of operation.
+ * @param e Exception encountered during the operation.
+ * @param begin time at which the operation was started.
+ */
+ void onFailure(
+ @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin);
+
+ /**
+ * Return statistics as a JSON string.
+ * @return
+ */
+ @Nullable String getStatistics();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
new file mode 100644
index 0000000..2344114
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
@@ -0,0 +1,1006 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIOException;
+import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.file.CopyOption;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.*;
+
+/**
+ * This class abstracts out various file IO operations performed by the
+ * DataNode and invokes event hooks before and after each file IO.
+ *
+ * Behavior can be injected into these events by implementing
+ * {@link FileIoEvents} and replacing the default implementation
+ * with {@link DFSConfigKeys#DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY}.
+ *
+ * Most functions accept an optional {@link FsVolumeSpi} parameter for
+ * instrumentation/logging.
+ *
+ * Some methods may look redundant, especially the multiple variations of
+ * move/rename/list. They exist to retain behavior compatibility for existing
+ * code.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class FileIoProvider {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ FileIoProvider.class);
+
+ private final FileIoEvents eventHooks;
+
+ /**
+ * @param conf Configuration object. May be null. When null,
+ * the event handlers are no-ops.
+ */
+ public FileIoProvider(@Nullable Configuration conf) {
+ if (conf != null) {
+ final Class<? extends FileIoEvents> clazz = conf.getClass(
+ DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY,
+ DefaultFileIoEvents.class,
+ FileIoEvents.class);
+ eventHooks = ReflectionUtils.newInstance(clazz, conf);
+ } else {
+ eventHooks = new DefaultFileIoEvents();
+ }
+ }
+
+ /**
+ * Lists the types of file system operations. Passed to the
+ * IO hooks so implementations can choose behavior based on
+ * specific operations.
+ */
+ public enum OPERATION {
+ OPEN,
+ EXISTS,
+ LIST,
+ DELETE,
+ MOVE,
+ MKDIRS,
+ TRANSFER,
+ SYNC,
+ FADVISE,
+ READ,
+ WRITE,
+ FLUSH,
+ NATIVE_COPY
+ }
+
+ /**
+ * Retrieve statistics from the underlying {@link FileIoEvents}
+ * implementation as a JSON string, if it maintains them.
+ * @return statistics as a JSON string. May be null.
+ */
+ public @Nullable String getStatistics() {
+ return eventHooks.getStatistics();
+ }
+
+ /**
+ * See {@link Flushable#flush()}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @throws IOException
+ */
+ public void flush(
+ @Nullable FsVolumeSpi volume, Flushable f) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, FLUSH, 0);
+ try {
+ f.flush();
+ eventHooks.afterFileIo(volume, FLUSH, begin, 0);
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, FLUSH, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Sync the given {@link FileOutputStream}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @throws IOException
+ */
+ public void sync(
+ @Nullable FsVolumeSpi volume, FileOutputStream fos) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, SYNC, 0);
+ try {
+ fos.getChannel().force(true);
+ eventHooks.afterFileIo(volume, SYNC, begin, 0);
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, SYNC, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Call sync_file_range on the given file descriptor.
+ *
+ * @param volume target volume. null if unavailable.
+ * @throws IOException
+ */
+ public void syncFileRange(
+ @Nullable FsVolumeSpi volume, FileDescriptor outFd,
+ long offset, long numBytes, int flags) throws NativeIOException {
+ final long begin = eventHooks.beforeFileIo(volume, SYNC, 0);
+ try {
+ NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags);
+ eventHooks.afterFileIo(volume, SYNC, begin, 0);
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, SYNC, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Call posix_fadvise on the given file descriptor.
+ *
+ * @param volume target volume. null if unavailable.
+ * @throws IOException
+ */
+ public void posixFadvise(
+ @Nullable FsVolumeSpi volume, String identifier, FileDescriptor outFd,
+ long offset, long length, int flags) throws NativeIOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, FADVISE);
+ try {
+ NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+ identifier, outFd, offset, length, flags);
+ eventHooks.afterMetadataOp(volume, FADVISE, begin);
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, FADVISE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Delete a file.
+ * @param volume target volume. null if unavailable.
+ * @param f File to delete.
+ * @return true if the file was successfully deleted.
+ */
+ public boolean delete(@Nullable FsVolumeSpi volume, File f) {
+ final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
+ try {
+ boolean deleted = f.delete();
+ eventHooks.afterMetadataOp(volume, DELETE, begin);
+ return deleted;
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, DELETE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Delete a file, first checking to see if it exists.
+ * @param volume target volume. null if unavailable.
+ * @param f File to delete
+ * @return true if the file was successfully deleted or if it never
+ * existed.
+ */
+ public boolean deleteWithExistsCheck(@Nullable FsVolumeSpi volume, File f) {
+ final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
+ try {
+ boolean deleted = !f.exists() || f.delete();
+ eventHooks.afterMetadataOp(volume, DELETE, begin);
+ if (!deleted) {
+ LOG.warn("Failed to delete file {}", f);
+ }
+ return deleted;
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, DELETE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Transfer data from a FileChannel to a SocketOutputStream.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param sockOut SocketOutputStream to write the data.
+ * @param fileCh FileChannel from which to read data.
+ * @param position position within the channel where the transfer begins.
+ * @param count number of bytes to transfer.
+ * @param waitTime returns the nanoseconds spent waiting for the socket
+ * to become writable.
+ * @param transferTime returns the nanoseconds spent transferring data.
+ * @throws IOException
+ */
+ public void transferToSocketFully(
+ @Nullable FsVolumeSpi volume, SocketOutputStream sockOut,
+ FileChannel fileCh, long position, int count,
+ LongWritable waitTime, LongWritable transferTime) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, TRANSFER, count);
+ try {
+ sockOut.transferToFully(fileCh, position, count,
+ waitTime, transferTime);
+ eventHooks.afterFileIo(volume, TRANSFER, begin, count);
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, TRANSFER, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Create a file.
+ * @param volume target volume. null if unavailable.
+ * @param f File to be created.
+ * @return true if the file does not exist and was successfully created.
+ * false if the file already exists.
+ * @throws IOException
+ */
+ public boolean createFile(
+ @Nullable FsVolumeSpi volume, File f) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+ try {
+ boolean created = f.createNewFile();
+ eventHooks.afterMetadataOp(volume, OPEN, begin);
+ return created;
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, OPEN, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Create a FileInputStream using
+ * {@link FileInputStream#FileInputStream(File)}.
+ *
+ * Wraps the created input stream to intercept read calls
+ * before delegating to the wrapped stream.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f File object.
+ * @return FileInputStream to the given file.
+ * @throws FileNotFoundException
+ */
+ public FileInputStream getFileInputStream(
+ @Nullable FsVolumeSpi volume, File f) throws FileNotFoundException {
+ final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+ FileInputStream fis = null;
+ try {
+ fis = new WrappedFileInputStream(volume, f);
+ eventHooks.afterMetadataOp(volume, OPEN, begin);
+ return fis;
+ } catch(Exception e) {
+ org.apache.commons.io.IOUtils.closeQuietly(fis);
+ eventHooks.onFailure(volume, OPEN, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Create a FileOutputStream using
+ * {@link FileOutputStream#FileOutputStream(File, boolean)}.
+ *
+ * Wraps the created output stream to intercept write calls
+ * before delegating to the wrapped stream.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f File object.
+ * @param append if true, then bytes will be written to the end of the
+ * file rather than the beginning.
+ * @param FileOutputStream to the given file object.
+ * @throws FileNotFoundException
+ */
+ public FileOutputStream getFileOutputStream(
+ @Nullable FsVolumeSpi volume, File f,
+ boolean append) throws FileNotFoundException {
+ final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+ FileOutputStream fos = null;
+ try {
+ fos = new WrappedFileOutputStream(volume, f, append);
+ eventHooks.afterMetadataOp(volume, OPEN, begin);
+ return fos;
+ } catch(Exception e) {
+ org.apache.commons.io.IOUtils.closeQuietly(fos);
+ eventHooks.onFailure(volume, OPEN, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Create a FileOutputStream using
+ * {@link FileOutputStream#FileOutputStream(File, boolean)}.
+ *
+ * Wraps the created output stream to intercept write calls
+ * before delegating to the wrapped stream.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f File object.
+ * @return FileOutputStream to the given file object.
+ * @throws FileNotFoundException
+ */
+ public FileOutputStream getFileOutputStream(
+ @Nullable FsVolumeSpi volume, File f) throws FileNotFoundException {
+ return getFileOutputStream(volume, f, false);
+ }
+
+ /**
+ * Create a FileOutputStream using
+ * {@link FileOutputStream#FileOutputStream(FileDescriptor)}.
+ *
+ * Wraps the created output stream to intercept write calls
+ * before delegating to the wrapped stream.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f File object.
+ * @return FileOutputStream to the given file object.
+ * @throws FileNotFoundException
+ */
+ public FileOutputStream getFileOutputStream(
+ @Nullable FsVolumeSpi volume, FileDescriptor fd) {
+ return new WrappedFileOutputStream(volume, fd);
+ }
+
+ /**
+ * Create a FileInputStream using
+ * {@link NativeIO#getShareDeleteFileDescriptor}.
+ * Wraps the created input stream to intercept input calls
+ * before delegating to the wrapped stream.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f File object.
+ * @param offset the offset position, measured in bytes from the
+ * beginning of the file, at which to set the file
+ * pointer.
+ * @return FileOutputStream to the given file object.
+ * @throws FileNotFoundException
+ */
+ public FileInputStream getShareDeleteFileInputStream(
+ @Nullable FsVolumeSpi volume, File f,
+ long offset) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+ FileInputStream fis = null;
+ try {
+ fis = new WrappedFileInputStream(volume,
+ NativeIO.getShareDeleteFileDescriptor(f, offset));
+ eventHooks.afterMetadataOp(volume, OPEN, begin);
+ return fis;
+ } catch(Exception e) {
+ org.apache.commons.io.IOUtils.closeQuietly(fis);
+ eventHooks.onFailure(volume, OPEN, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Create a FileInputStream using
+ * {@link FileInputStream#FileInputStream(File)} and position
+ * it at the given offset.
+ *
+ * Wraps the created input stream to intercept read calls
+ * before delegating to the wrapped stream.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f File object.
+ * @param offset the offset position, measured in bytes from the
+ * beginning of the file, at which to set the file
+ * pointer.
+ * @throws FileNotFoundException
+ */
+ public FileInputStream openAndSeek(
+ @Nullable FsVolumeSpi volume, File f, long offset) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+ FileInputStream fis = null;
+ try {
+ fis = new WrappedFileInputStream(volume,
+ FsDatasetUtil.openAndSeek(f, offset));
+ eventHooks.afterMetadataOp(volume, OPEN, begin);
+ return fis;
+ } catch(Exception e) {
+ org.apache.commons.io.IOUtils.closeQuietly(fis);
+ eventHooks.onFailure(volume, OPEN, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Create a RandomAccessFile using
+ * {@link RandomAccessFile#RandomAccessFile(File, String)}.
+ *
+ * Wraps the created input stream to intercept IO calls
+ * before delegating to the wrapped RandomAccessFile.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f File object.
+ * @param mode See {@link RandomAccessFile} for a description
+ * of the mode string.
+ * @return RandomAccessFile representing the given file.
+ * @throws FileNotFoundException
+ */
+ public RandomAccessFile getRandomAccessFile(
+ @Nullable FsVolumeSpi volume, File f,
+ String mode) throws FileNotFoundException {
+ final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+ RandomAccessFile raf = null;
+ try {
+ raf = new WrappedRandomAccessFile(volume, f, mode);
+ eventHooks.afterMetadataOp(volume, OPEN, begin);
+ return raf;
+ } catch(Exception e) {
+ org.apache.commons.io.IOUtils.closeQuietly(raf);
+ eventHooks.onFailure(volume, OPEN, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Delete the given directory using {@link FileUtil#fullyDelete(File)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param dir directory to be deleted.
+ * @return true on success false on failure.
+ */
+ public boolean fullyDelete(@Nullable FsVolumeSpi volume, File dir) {
+ final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
+ try {
+ boolean deleted = FileUtil.fullyDelete(dir);
+ eventHooks.afterMetadataOp(volume, DELETE, begin);
+ return deleted;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, DELETE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Move the src file to the target using
+ * {@link FileUtil#replaceFile(File, File)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param src source path.
+ * @param target target path.
+ * @throws IOException
+ */
+ public void replaceFile(
+ @Nullable FsVolumeSpi volume, File src, File target) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+ try {
+ FileUtil.replaceFile(src, target);
+ eventHooks.afterMetadataOp(volume, MOVE, begin);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, MOVE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Move the src file to the target using
+ * {@link Storage#rename(File, File)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param src source path.
+ * @param target target path.
+ * @throws IOException
+ */
+ public void rename(
+ @Nullable FsVolumeSpi volume, File src, File target)
+ throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+ try {
+ Storage.rename(src, target);
+ eventHooks.afterMetadataOp(volume, MOVE, begin);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, MOVE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Move the src file to the target using
+ * {@link FileUtils#moveFile(File, File)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param src source path.
+ * @param target target path.
+ * @throws IOException
+ */
+ public void moveFile(
+ @Nullable FsVolumeSpi volume, File src, File target)
+ throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+ try {
+ FileUtils.moveFile(src, target);
+ eventHooks.afterMetadataOp(volume, MOVE, begin);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, MOVE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Move the src file to the target using
+ * {@link Files#move(Path, Path, CopyOption...)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param src source path.
+ * @param target target path.
+ * @param options See {@link Files#move} for a description
+ * of the options.
+ * @throws IOException
+ */
+ public void move(
+ @Nullable FsVolumeSpi volume, Path src, Path target,
+ CopyOption... options) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+ try {
+ Files.move(src, target, options);
+ eventHooks.afterMetadataOp(volume, MOVE, begin);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, MOVE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * See {@link Storage#nativeCopyFileUnbuffered(File, File, boolean)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param src an existing file to copy, must not be {@code null}
+ * @param target the new file, must not be {@code null}
+ * @param preserveFileDate true if the file date of the copy
+ * should be the same as the original
+ * @throws IOException
+ */
+ public void nativeCopyFileUnbuffered(
+ @Nullable FsVolumeSpi volume, File src, File target,
+ boolean preserveFileDate) throws IOException {
+ final long length = src.length();
+ final long begin = eventHooks.beforeFileIo(volume, NATIVE_COPY, length);
+ try {
+ Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate);
+ eventHooks.afterFileIo(volume, NATIVE_COPY, begin, length);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, NATIVE_COPY, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * See {@link File#mkdirs()}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param dir directory to be created.
+ * @return true only if the directory was created. false if
+ * the directory already exists.
+ * @throws IOException if a directory with the given name does
+ * not exist and could not be created.
+ */
+ public boolean mkdirs(
+ @Nullable FsVolumeSpi volume, File dir) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, MKDIRS);
+ boolean created = false;
+ boolean isDirectory;
+ try {
+ created = dir.mkdirs();
+ isDirectory = !created && dir.isDirectory();
+ eventHooks.afterMetadataOp(volume, MKDIRS, begin);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, MKDIRS, e, begin);
+ throw e;
+ }
+
+ if (!created && !isDirectory) {
+ throw new IOException("Mkdirs failed to create " + dir);
+ }
+ return created;
+ }
+
+ /**
+ * Create the target directory using {@link File#mkdirs()} only if
+ * it doesn't exist already.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param dir directory to be created.
+ * @throws IOException if the directory could not created
+ */
+ public void mkdirsWithExistsCheck(
+ @Nullable FsVolumeSpi volume, File dir) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, MKDIRS);
+ boolean succeeded = false;
+ try {
+ succeeded = dir.isDirectory() || dir.mkdirs();
+ eventHooks.afterMetadataOp(volume, MKDIRS, begin);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, MKDIRS, e, begin);
+ throw e;
+ }
+
+ if (!succeeded) {
+ throw new IOException("Mkdirs failed to create " + dir);
+ }
+ }
+
+ /**
+ * Get a listing of the given directory using
+ * {@link FileUtil#listFiles(File)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param dir Directory to be listed.
+ * @return array of file objects representing the directory entries.
+ * @throws IOException
+ */
+ public File[] listFiles(
+ @Nullable FsVolumeSpi volume, File dir) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+ try {
+ File[] children = FileUtil.listFiles(dir);
+ eventHooks.afterMetadataOp(volume, LIST, begin);
+ return children;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, LIST, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Get a listing of the given directory using
+ * {@link FileUtil#listFiles(File)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param Driectory to be listed.
+ * @return array of strings representing the directory entries.
+ * @throws IOException
+ */
+ public String[] list(
+ @Nullable FsVolumeSpi volume, File dir) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+ try {
+ String[] children = FileUtil.list(dir);
+ eventHooks.afterMetadataOp(volume, LIST, begin);
+ return children;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, LIST, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Get a listing of the given directory using
+ * {@link IOUtils#listDirectory(File, FilenameFilter)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param dir Directory to list.
+ * @param filter {@link FilenameFilter} to filter the directory entries.
+ * @throws IOException
+ */
+ public List<String> listDirectory(
+ @Nullable FsVolumeSpi volume, File dir,
+ FilenameFilter filter) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+ try {
+ List<String> children = IOUtils.listDirectory(dir, filter);
+ eventHooks.afterMetadataOp(volume, LIST, begin);
+ return children;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, LIST, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Retrieves the number of links to the specified file.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f file whose link count is being queried.
+ * @return number of hard-links to the given file, including the
+ * given path itself.
+ * @throws IOException
+ */
+ public int getHardLinkCount(
+ @Nullable FsVolumeSpi volume, File f) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+ try {
+ int count = HardLink.getLinkCount(f);
+ eventHooks.afterMetadataOp(volume, LIST, begin);
+ return count;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, LIST, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Check for file existence using {@link File#exists()}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f file object.
+ * @return true if the file exists.
+ */
+ public boolean exists(@Nullable FsVolumeSpi volume, File f) {
+ final long begin = eventHooks.beforeMetadataOp(volume, EXISTS);
+ try {
+ boolean exists = f.exists();
+ eventHooks.afterMetadataOp(volume, EXISTS, begin);
+ return exists;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, EXISTS, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * A thin wrapper over {@link FileInputStream} that allows
+ * instrumenting disk IO.
+ */
+ private final class WrappedFileInputStream extends FileInputStream {
+ private @Nullable final FsVolumeSpi volume;
+
+ /**
+ * {@inheritDoc}.
+ */
+ private WrappedFileInputStream(@Nullable FsVolumeSpi volume, File f)
+ throws FileNotFoundException {
+ super(f);
+ this.volume = volume;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ private WrappedFileInputStream(
+ @Nullable FsVolumeSpi volume, FileDescriptor fd) {
+ super(fd);
+ this.volume = volume;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public int read() throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, READ, 1);
+ try {
+ int b = super.read();
+ eventHooks.afterFileIo(volume, READ, begin, 1);
+ return b;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, READ, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public int read(@Nonnull byte[] b) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, READ, b.length);
+ try {
+ int numBytesRead = super.read(b);
+ eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+ return numBytesRead;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, READ, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public int read(@Nonnull byte[] b, int off, int len) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, READ, len);
+ try {
+ int numBytesRead = super.read(b, off, len);
+ eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+ return numBytesRead;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, READ, e, begin);
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * A thin wrapper over {@link FileOutputStream} that allows
+ * instrumenting disk IO.
+ */
+ private final class WrappedFileOutputStream extends FileOutputStream {
+ private @Nullable final FsVolumeSpi volume;
+
+ /**
+ * {@inheritDoc}.
+ */
+ private WrappedFileOutputStream(
+ @Nullable FsVolumeSpi volume, File f,
+ boolean append) throws FileNotFoundException {
+ super(f, append);
+ this.volume = volume;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ private WrappedFileOutputStream(
+ @Nullable FsVolumeSpi volume, FileDescriptor fd) {
+ super(fd);
+ this.volume = volume;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void write(int b) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, WRITE, 1);
+ try {
+ super.write(b);
+ eventHooks.afterFileIo(volume, WRITE, begin, 1);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, WRITE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void write(@Nonnull byte[] b) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, WRITE, b.length);
+ try {
+ super.write(b);
+ eventHooks.afterFileIo(volume, WRITE, begin, b.length);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, WRITE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void write(@Nonnull byte[] b, int off, int len) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, WRITE, len);
+ try {
+ super.write(b, off, len);
+ eventHooks.afterFileIo(volume, WRITE, begin, len);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, WRITE, e, begin);
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * A thin wrapper over {@link FileInputStream} that allows
+ * instrumenting IO.
+ */
+ private final class WrappedRandomAccessFile extends RandomAccessFile {
+ private @Nullable final FsVolumeSpi volume;
+
+ public WrappedRandomAccessFile(
+ @Nullable FsVolumeSpi volume, File f, String mode)
+ throws FileNotFoundException {
+ super(f, mode);
+ this.volume = volume;
+ }
+
+ @Override
+ public int read() throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, READ, 1);
+ try {
+ int b = super.read();
+ eventHooks.afterFileIo(volume, READ, begin, 1);
+ return b;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, READ, e, begin);
+ throw e;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, READ, len);
+ try {
+ int numBytesRead = super.read(b, off, len);
+ eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+ return numBytesRead;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, READ, e, begin);
+ throw e;
+ }
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, READ, b.length);
+ try {
+ int numBytesRead = super.read(b);
+ eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+ return numBytesRead;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, READ, e, begin);
+ throw e;
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, WRITE, 1);
+ try {
+ super.write(b);
+ eventHooks.afterFileIo(volume, WRITE, begin, 1);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, WRITE, e, begin);
+ throw e;
+ }
+ }
+
+ @Override
+ public void write(@Nonnull byte[] b) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, WRITE, b.length);
+ try {
+ super.write(b);
+ eventHooks.afterFileIo(volume, WRITE, begin, b.length);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, WRITE, e, begin);
+ throw e;
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, WRITE, len);
+ try {
+ super.write(b, off, len);
+ eventHooks.afterFileIo(volume, WRITE, begin, len);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, WRITE, e, begin);
+ throw e;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
index e6f7e12..1d46ddd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
@@ -29,17 +29,13 @@ import java.net.URI;
import java.util.HashMap;
import java.util.Map;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
@@ -187,20 +183,23 @@ abstract public class LocalReplica extends ReplicaInfo {
* be recovered (especially on Windows) on datanode restart.
*/
private void breakHardlinks(File file, Block b) throws IOException {
- File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
- try (FileInputStream in = new FileInputStream(file)) {
- try (FileOutputStream out = new FileOutputStream(tmpFile)){
- copyBytes(in, out, 16 * 1024);
+ final FileIoProvider fileIoProvider = getFileIoProvider();
+ final File tmpFile = DatanodeUtil.createFileWithExistsCheck(
+ getVolume(), b, DatanodeUtil.getUnlinkTmpFile(file), fileIoProvider);
+ try (FileInputStream in = fileIoProvider.getFileInputStream(
+ getVolume(), file)) {
+ try (FileOutputStream out = fileIoProvider.getFileOutputStream(
+ getVolume(), tmpFile)) {
+ IOUtils.copyBytes(in, out, 16 * 1024);
}
if (file.length() != tmpFile.length()) {
throw new IOException("Copy of file " + file + " size " + file.length()+
" into file " + tmpFile +
" resulted in a size of " + tmpFile.length());
}
- replaceFile(tmpFile, file);
+ fileIoProvider.replaceFile(getVolume(), tmpFile, file);
} catch (IOException e) {
- boolean done = tmpFile.delete();
- if (!done) {
+ if (!fileIoProvider.delete(getVolume(), tmpFile)) {
DataNode.LOG.info("detachFile failed to delete temporary file " +
tmpFile);
}
@@ -226,19 +225,20 @@ abstract public class LocalReplica extends ReplicaInfo {
* @throws IOException
*/
public boolean breakHardLinksIfNeeded() throws IOException {
- File file = getBlockFile();
+ final File file = getBlockFile();
+ final FileIoProvider fileIoProvider = getFileIoProvider();
if (file == null || getVolume() == null) {
throw new IOException("detachBlock:Block not found. " + this);
}
File meta = getMetaFile();
- int linkCount = getHardLinkCount(file);
+ int linkCount = fileIoProvider.getHardLinkCount(getVolume(), file);
if (linkCount > 1) {
DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
"block " + this);
breakHardlinks(file, this);
}
- if (getHardLinkCount(meta) > 1) {
+ if (fileIoProvider.getHardLinkCount(getVolume(), meta) > 1) {
breakHardlinks(meta, this);
}
return true;
@@ -256,17 +256,18 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public OutputStream getDataOutputStream(boolean append) throws IOException {
- return new FileOutputStream(getBlockFile(), append);
+ return getFileIoProvider().getFileOutputStream(
+ getVolume(), getBlockFile(), append);
}
@Override
public boolean blockDataExists() {
- return getBlockFile().exists();
+ return getFileIoProvider().exists(getVolume(), getBlockFile());
}
@Override
public boolean deleteBlockData() {
- return fullyDelete(getBlockFile());
+ return getFileIoProvider().fullyDelete(getVolume(), getBlockFile());
}
@Override
@@ -282,9 +283,10 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public LengthInputStream getMetadataInputStream(long offset)
throws IOException {
- File meta = getMetaFile();
+ final File meta = getMetaFile();
return new LengthInputStream(
- FsDatasetUtil.openAndSeek(meta, offset), meta.length());
+ getFileIoProvider().openAndSeek(getVolume(), meta, offset),
+ meta.length());
}
@Override
@@ -295,12 +297,12 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public boolean metadataExists() {
- return getMetaFile().exists();
+ return getFileIoProvider().exists(getVolume(), getMetaFile());
}
@Override
public boolean deleteMetadata() {
- return fullyDelete(getMetaFile());
+ return getFileIoProvider().fullyDelete(getVolume(), getMetaFile());
}
@Override
@@ -320,7 +322,7 @@ abstract public class LocalReplica extends ReplicaInfo {
private boolean renameFile(File srcfile, File destfile) throws IOException {
try {
- rename(srcfile, destfile);
+ getFileIoProvider().rename(getVolume(), srcfile, destfile);
return true;
} catch (IOException e) {
throw new IOException("Failed to move block file for " + this
@@ -360,9 +362,9 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public void bumpReplicaGS(long newGS) throws IOException {
long oldGS = getGenerationStamp();
- File oldmeta = getMetaFile();
+ final File oldmeta = getMetaFile();
setGenerationStamp(newGS);
- File newmeta = getMetaFile();
+ final File newmeta = getMetaFile();
// rename meta file to new GS
if (LOG.isDebugEnabled()) {
@@ -370,7 +372,7 @@ abstract public class LocalReplica extends ReplicaInfo {
}
try {
// calling renameMeta on the ReplicaInfo doesn't work here
- rename(oldmeta, newmeta);
+ getFileIoProvider().rename(getVolume(), oldmeta, newmeta);
} catch (IOException e) {
setGenerationStamp(oldGS); // restore old GS
throw new IOException("Block " + this + " reopen failed. " +
@@ -381,7 +383,8 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public void truncateBlock(long newLength) throws IOException {
- truncateBlock(getBlockFile(), getMetaFile(), getNumBytes(), newLength);
+ truncateBlock(getVolume(), getBlockFile(), getMetaFile(),
+ getNumBytes(), newLength, getFileIoProvider());
}
@Override
@@ -392,32 +395,15 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public void copyMetadata(URI destination) throws IOException {
//for local replicas, we assume the destination URI is file
- nativeCopyFileUnbuffered(getMetaFile(), new File(destination), true);
+ getFileIoProvider().nativeCopyFileUnbuffered(
+ getVolume(), getMetaFile(), new File(destination), true);
}
@Override
public void copyBlockdata(URI destination) throws IOException {
//for local replicas, we assume the destination URI is file
- nativeCopyFileUnbuffered(getBlockFile(), new File(destination), true);
- }
-
- public void renameMeta(File newMetaFile) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming " + getMetaFile() + " to " + newMetaFile);
- }
- renameFile(getMetaFile(), newMetaFile);
- }
-
- public void renameBlock(File newBlockFile) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming " + getBlockFile() + " to " + newBlockFile
- + ", file length=" + getBlockFile().length());
- }
- renameFile(getBlockFile(), newBlockFile);
- }
-
- public static void rename(File from, File to) throws IOException {
- Storage.rename(from, to);
+ getFileIoProvider().nativeCopyFileUnbuffered(
+ getVolume(), getBlockFile(), new File(destination), true);
}
/**
@@ -430,11 +416,13 @@ abstract public class LocalReplica extends ReplicaInfo {
private FileInputStream getDataInputStream(File f, long seekOffset)
throws IOException {
FileInputStream fis;
+ final FileIoProvider fileIoProvider = getFileIoProvider();
if (NativeIO.isAvailable()) {
- fis = NativeIO.getShareDeleteFileInputStream(f, seekOffset);
+ fis = fileIoProvider.getShareDeleteFileInputStream(
+ getVolume(), f, seekOffset);
} else {
try {
- fis = FsDatasetUtil.openAndSeek(f, seekOffset);
+ fis = fileIoProvider.openAndSeek(getVolume(), f, seekOffset);
} catch (FileNotFoundException fnfe) {
throw new IOException("Expected block file at " + f +
" does not exist.");
@@ -443,30 +431,6 @@ abstract public class LocalReplica extends ReplicaInfo {
return fis;
}
- private void nativeCopyFileUnbuffered(File srcFile, File destFile,
- boolean preserveFileDate) throws IOException {
- Storage.nativeCopyFileUnbuffered(srcFile, destFile, preserveFileDate);
- }
-
- private void copyBytes(InputStream in, OutputStream out, int
- buffSize) throws IOException{
- IOUtils.copyBytes(in, out, buffSize);
- }
-
- private void replaceFile(File src, File target) throws IOException {
- FileUtil.replaceFile(src, target);
- }
-
- public static boolean fullyDelete(final File dir) {
- boolean result = DataStorage.fullyDelete(dir);
- return result;
- }
-
- public static int getHardLinkCount(File fileName) throws IOException {
- int linkCount = HardLink.getLinkCount(fileName);
- return linkCount;
- }
-
/**
* Get pin status of a file by checking the sticky bit.
* @param localFS local file system
@@ -495,8 +459,10 @@ abstract public class LocalReplica extends ReplicaInfo {
localFS.setPermission(path, permission);
}
- public static void truncateBlock(File blockFile, File metaFile,
- long oldlen, long newlen) throws IOException {
+ public static void truncateBlock(
+ FsVolumeSpi volume, File blockFile, File metaFile,
+ long oldlen, long newlen, FileIoProvider fileIoProvider)
+ throws IOException {
LOG.info("truncateBlock: blockFile=" + blockFile
+ ", metaFile=" + metaFile
+ ", oldlen=" + oldlen
@@ -510,7 +476,10 @@ abstract public class LocalReplica extends ReplicaInfo {
+ ") to newlen (=" + newlen + ")");
}
- DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
+ // fis is closed by BlockMetadataHeader.readHeader.
+ final FileInputStream fis = fileIoProvider.getFileInputStream(
+ volume, metaFile);
+ DataChecksum dcs = BlockMetadataHeader.readHeader(fis).getChecksum();
int checksumsize = dcs.getChecksumSize();
int bpc = dcs.getBytesPerChecksum();
long n = (newlen - 1)/bpc + 1;
@@ -519,16 +488,14 @@ abstract public class LocalReplica extends ReplicaInfo {
int lastchunksize = (int)(newlen - lastchunkoffset);
byte[] b = new byte[Math.max(lastchunksize, checksumsize)];
- RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
- try {
+ try (RandomAccessFile blockRAF = fileIoProvider.getRandomAccessFile(
+ volume, blockFile, "rw")) {
//truncate blockFile
blockRAF.setLength(newlen);
//read last chunk
blockRAF.seek(lastchunkoffset);
blockRAF.readFully(b, 0, lastchunksize);
- } finally {
- blockRAF.close();
}
//compute checksum
@@ -536,13 +503,11 @@ abstract public class LocalReplica extends ReplicaInfo {
dcs.writeValue(b, 0, false);
//update metaFile
- RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
- try {
+ try (RandomAccessFile metaRAF = fileIoProvider.getRandomAccessFile(
+ volume, metaFile, "rw")) {
metaRAF.setLength(newmetalen);
metaRAF.seek(newmetalen - checksumsize);
metaRAF.write(b, 0, checksumsize);
- } finally {
- metaRAF.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
index 1387155..003f96f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
@@ -245,10 +245,9 @@ public class LocalReplicaInPipeline extends LocalReplica
@Override // ReplicaInPipeline
public ReplicaOutputStreams createStreams(boolean isCreate,
- DataChecksum requestedChecksum, long slowLogThresholdMs)
- throws IOException {
- File blockFile = getBlockFile();
- File metaFile = getMetaFile();
+ DataChecksum requestedChecksum) throws IOException {
+ final File blockFile = getBlockFile();
+ final File metaFile = getMetaFile();
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("writeTo blockfile is " + blockFile +
" of size " + blockFile.length());
@@ -262,14 +261,16 @@ public class LocalReplicaInPipeline extends LocalReplica
// may differ from requestedChecksum for appends.
final DataChecksum checksum;
- RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+ final RandomAccessFile metaRAF =
+ getFileIoProvider().getRandomAccessFile(getVolume(), metaFile, "rw");
if (!isCreate) {
// For append or recovery, we must enforce the existing checksum.
// Also, verify that the file has correct lengths, etc.
boolean checkedMeta = false;
try {
- BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
+ BlockMetadataHeader header =
+ BlockMetadataHeader.readHeader(metaRAF);
checksum = header.getChecksum();
if (checksum.getBytesPerChecksum() !=
@@ -302,20 +303,24 @@ public class LocalReplicaInPipeline extends LocalReplica
checksum = requestedChecksum;
}
+ final FileIoProvider fileIoProvider = getFileIoProvider();
FileOutputStream blockOut = null;
FileOutputStream crcOut = null;
try {
- blockOut = new FileOutputStream(
- new RandomAccessFile(blockFile, "rw").getFD());
- crcOut = new FileOutputStream(metaRAF.getFD());
+ blockOut = fileIoProvider.getFileOutputStream(
+ getVolume(),
+ fileIoProvider.getRandomAccessFile(getVolume(), blockFile, "rw")
+ .getFD());
+ crcOut = fileIoProvider.getFileOutputStream(getVolume(), metaRAF.getFD());
if (!isCreate) {
blockOut.getChannel().position(blockDiskSize);
crcOut.getChannel().position(crcDiskSize);
}
return new ReplicaOutputStreams(blockOut, crcOut, checksum,
- getVolume().isTransientStorage(), slowLogThresholdMs);
+ getVolume(), fileIoProvider);
} catch (IOException e) {
IOUtils.closeStream(blockOut);
+ IOUtils.closeStream(crcOut);
IOUtils.closeStream(metaRAF);
throw e;
}
@@ -326,11 +331,11 @@ public class LocalReplicaInPipeline extends LocalReplica
File blockFile = getBlockFile();
File restartMeta = new File(blockFile.getParent() +
File.pathSeparator + "." + blockFile.getName() + ".restart");
- if (restartMeta.exists() && !restartMeta.delete()) {
+ if (!getFileIoProvider().deleteWithExistsCheck(getVolume(), restartMeta)) {
DataNode.LOG.warn("Failed to delete restart meta file: " +
restartMeta.getPath());
}
- return new FileOutputStream(restartMeta);
+ return getFileIoProvider().getFileOutputStream(getVolume(), restartMeta);
}
@Override
@@ -373,12 +378,14 @@ public class LocalReplicaInPipeline extends LocalReplica
+ " should be derived from LocalReplica");
}
- LocalReplica oldReplica = (LocalReplica) oldReplicaInfo;
- File oldmeta = oldReplica.getMetaFile();
- File newmeta = getMetaFile();
+ final LocalReplica oldReplica = (LocalReplica) oldReplicaInfo;
+ final File oldBlockFile = oldReplica.getBlockFile();
+ final File oldmeta = oldReplica.getMetaFile();
+ final File newmeta = getMetaFile();
+ final FileIoProvider fileIoProvider = getFileIoProvider();
try {
- oldReplica.renameMeta(newmeta);
+ fileIoProvider.rename(getVolume(), oldmeta, newmeta);
} catch (IOException e) {
throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
" Unable to move meta file " + oldmeta +
@@ -386,10 +393,10 @@ public class LocalReplicaInPipeline extends LocalReplica
}
try {
- oldReplica.renameBlock(newBlkFile);
+ fileIoProvider.rename(getVolume(), oldBlockFile, newBlkFile);
} catch (IOException e) {
try {
- renameMeta(oldmeta);
+ fileIoProvider.rename(getVolume(), newmeta, oldmeta);
} catch (IOException ex) {
LOG.warn("Cannot move meta file " + newmeta +
"back to the finalized directory " + oldmeta, ex);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index 5fdbec0..efa6ea6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -69,13 +69,11 @@ public interface ReplicaInPipeline extends Replica {
*
* @param isCreate if it is for creation
* @param requestedChecksum the checksum the writer would prefer to use
- * @param slowLogThresholdMs slow io threshold for logging
* @return output streams for writing
* @throws IOException if any error occurs
*/
public ReplicaOutputStreams createStreams(boolean isCreate,
- DataChecksum requestedChecksum, long slowLogThresholdMs)
- throws IOException;
+ DataChecksum requestedChecksum) throws IOException;
/**
* Create an output stream to write restart metadata in case of datanode
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org