You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xg...@apache.org on 2016/12/14 21:59:53 UTC
[1/9] hadoop git commit: HADOOP-13709. Ability to clean up
subprocesses spawned by Shell when the process exits. Contributed by Eric
Badger
Repository: hadoop
Updated Branches:
refs/heads/YARN-5734 ef34bf2bb -> f5e0bd30f
HADOOP-13709. Ability to clean up subprocesses spawned by Shell when the process exits. Contributed by Eric Badger
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9947aeb6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9947aeb6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9947aeb6
Branch: refs/heads/YARN-5734
Commit: 9947aeb60c3dd075544866fd6e4dab0ad8b4afa2
Parents: ef34bf2
Author: Jason Lowe <jl...@apache.org>
Authored: Tue Dec 13 22:55:09 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Tue Dec 13 22:55:09 2016 +0000
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/util/Shell.java | 24 +++++++
.../java/org/apache/hadoop/util/TestShell.java | 68 ++++++++++++++++++++
2 files changed, 92 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9947aeb6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
index 5fc9869..83877b7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
@@ -26,9 +26,11 @@ import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.charset.Charset;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
@@ -48,6 +50,8 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class Shell {
+ private static final Map <Process, Object> CHILD_PROCESSES =
+ Collections.synchronizedMap(new WeakHashMap<Process, Object>());
public static final Logger LOG = LoggerFactory.getLogger(Shell.class);
/**
@@ -916,6 +920,7 @@ public abstract class Shell {
} else {
process = builder.start();
}
+ CHILD_PROCESSES.put(process, null);
if (timeOutInterval > 0) {
timeOutTimer = new Timer("Shell command timeout");
@@ -1012,6 +1017,7 @@ public abstract class Shell {
LOG.warn("Error while closing the error stream", ioe);
}
process.destroy();
+ CHILD_PROCESSES.remove(process);
lastTime = Time.monotonicNow();
}
}
@@ -1310,4 +1316,22 @@ public abstract class Shell {
}
}
}
+
+ /**
+ * Static method to destroy all running <code>Shell</code> processes
+ * Iterates through a list of all currently running <code>Shell</code>
+ * processes and destroys them one by one. This method is thread safe and
+ * is intended to be used in a shutdown hook.
+ */
+ public static void destroyAllProcesses() {
+ synchronized (CHILD_PROCESSES) {
+ for (Process key : CHILD_PROCESSES.keySet()) {
+ Process process = key;
+ if (key != null) {
+ process.destroy();
+ }
+ }
+ CHILD_PROCESSES.clear();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9947aeb6/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
index 67903f7..5cc011b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.util;
+import com.google.common.base.Supplier;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.security.alias.AbstractJavaKeyStoreProvider;
import org.junit.Assert;
@@ -471,4 +472,71 @@ public class TestShell extends Assert {
assertEquals("'foo'\\''bar'", Shell.bashQuote("foo'bar"));
assertEquals("''\\''foo'\\''bar'\\'''", Shell.bashQuote("'foo'bar'"));
}
+
+ @Test(timeout=120000)
+ public void testShellKillAllProcesses() throws Throwable {
+ Assume.assumeFalse(WINDOWS);
+ StringBuffer sleepCommand = new StringBuffer();
+ sleepCommand.append("sleep 200");
+ String[] shellCmd = {"bash", "-c", sleepCommand.toString()};
+ final ShellCommandExecutor shexc1 = new ShellCommandExecutor(shellCmd);
+ final ShellCommandExecutor shexc2 = new ShellCommandExecutor(shellCmd);
+
+ Thread shellThread1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ shexc1.execute();
+ } catch(IOException ioe) {
+ //ignore IOException from thread interrupt
+ }
+ }
+ };
+ Thread shellThread2 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ shexc2.execute();
+ } catch(IOException ioe) {
+ //ignore IOException from thread interrupt
+ }
+ }
+ };
+
+ shellThread1.start();
+ shellThread2.start();
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return shexc1.getProcess() != null;
+ }
+ }, 10, 10000);
+
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return shexc2.getProcess() != null;
+ }
+ }, 10, 10000);
+
+ Shell.destroyAllProcesses();
+ final Process process1 = shexc1.getProcess();
+ final Process process2 = shexc2.getProcess();
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return !process1.isAlive();
+ }
+ }, 10, 10000);
+
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return !process2.isAlive();
+ }
+ }, 10, 10000);
+
+ assertFalse("Process 1 was not killed within timeout", process1.isAlive());
+ assertFalse("Process 2 was not killed within timeout", process2.isAlive());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[8/9] hadoop git commit: HDFS-10958. Add instrumentation hooks around
Datanode disk IO.
Posted by xg...@apache.org.
HDFS-10958. Add instrumentation hooks around Datanode disk IO.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6ba9587d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6ba9587d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6ba9587d
Branch: refs/heads/YARN-5734
Commit: 6ba9587d370fbf39c129c08c00ebbb894ccc1389
Parents: 72bff19
Author: Arpit Agarwal <ar...@apache.org>
Authored: Wed Dec 14 11:18:58 2016 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Wed Dec 14 11:18:58 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/io/nativeio/NativeIO.java | 40 +-
.../server/datanode/BlockMetadataHeader.java | 29 +-
.../dev-support/findbugsExcludeFile.xml | 27 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 +
.../hdfs/server/datanode/BlockReceiver.java | 14 +-
.../hdfs/server/datanode/BlockSender.java | 10 +-
.../server/datanode/CountingFileIoEvents.java | 107 ++
.../hadoop/hdfs/server/datanode/DataNode.java | 12 +
.../hdfs/server/datanode/DataNodeMXBean.java | 5 +
.../hdfs/server/datanode/DataStorage.java | 6 +
.../hdfs/server/datanode/DatanodeUtil.java | 19 +-
.../server/datanode/DefaultFileIoEvents.java | 67 ++
.../hdfs/server/datanode/FileIoEvents.java | 97 ++
.../hdfs/server/datanode/FileIoProvider.java | 1006 ++++++++++++++++++
.../hdfs/server/datanode/LocalReplica.java | 133 +--
.../server/datanode/LocalReplicaInPipeline.java | 43 +-
.../hdfs/server/datanode/ReplicaInPipeline.java | 4 +-
.../hdfs/server/datanode/ReplicaInfo.java | 17 +-
.../server/datanode/fsdataset/FsVolumeSpi.java | 3 +
.../datanode/fsdataset/ReplicaInputStreams.java | 11 +-
.../fsdataset/ReplicaOutputStreams.java | 72 +-
.../datanode/fsdataset/impl/BlockPoolSlice.java | 122 +--
.../impl/FsDatasetAsyncDiskService.java | 6 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 46 +-
.../datanode/fsdataset/impl/FsDatasetUtil.java | 5 +-
.../datanode/fsdataset/impl/FsVolumeImpl.java | 89 +-
.../fsdataset/impl/FsVolumeImplBuilder.java | 12 +-
.../org/apache/hadoop/hdfs/TestFileAppend.java | 2 +-
.../server/datanode/SimulatedFSDataset.java | 18 +-
.../hdfs/server/datanode/TestBlockRecovery.java | 2 +-
.../server/datanode/TestDirectoryScanner.java | 5 +
.../server/datanode/TestSimulatedFSDataset.java | 2 +-
.../extdataset/ExternalDatasetImpl.java | 2 +-
.../extdataset/ExternalReplicaInPipeline.java | 6 +-
.../datanode/extdataset/ExternalVolumeImpl.java | 6 +
.../hadoop/tools/TestHdfsConfigFields.java | 2 +
36 files changed, 1684 insertions(+), 365 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index a123f18..f3ff1c7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -742,47 +742,19 @@ public class NativeIO {
}
/**
- * Create a FileInputStream that shares delete permission on the
- * file opened, i.e. other process can delete the file the
- * FileInputStream is reading. Only Windows implementation uses
- * the native interface.
- */
- public static FileInputStream getShareDeleteFileInputStream(File f)
- throws IOException {
- if (!Shell.WINDOWS) {
- // On Linux the default FileInputStream shares delete permission
- // on the file opened.
- //
- return new FileInputStream(f);
- } else {
- // Use Windows native interface to create a FileInputStream that
- // shares delete permission on the file opened.
- //
- FileDescriptor fd = Windows.createFile(
- f.getAbsolutePath(),
- Windows.GENERIC_READ,
- Windows.FILE_SHARE_READ |
- Windows.FILE_SHARE_WRITE |
- Windows.FILE_SHARE_DELETE,
- Windows.OPEN_EXISTING);
- return new FileInputStream(fd);
- }
- }
-
- /**
- * Create a FileInputStream that shares delete permission on the
+ * Create a FileDescriptor that shares delete permission on the
* file opened at a given offset, i.e. other process can delete
- * the file the FileInputStream is reading. Only Windows implementation
+ * the file the FileDescriptor is reading. Only Windows implementation
* uses the native interface.
*/
- public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
- throws IOException {
+ public static FileDescriptor getShareDeleteFileDescriptor(
+ File f, long seekOffset) throws IOException {
if (!Shell.WINDOWS) {
RandomAccessFile rf = new RandomAccessFile(f, "r");
if (seekOffset > 0) {
rf.seek(seekOffset);
}
- return new FileInputStream(rf.getFD());
+ return rf.getFD();
} else {
// Use Windows native interface to create a FileInputStream that
// shares delete permission on the file opened, and set it to the
@@ -797,7 +769,7 @@ public class NativeIO {
NativeIO.Windows.OPEN_EXISTING);
if (seekOffset > 0)
NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN);
- return new FileInputStream(fd);
+ return fd;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
index eb19492..738f496 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
@@ -31,7 +31,6 @@ import java.nio.channels.FileChannel;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
@@ -79,18 +78,15 @@ public class BlockMetadataHeader {
/**
* Read the checksum header from the meta file.
+ * inputStream must be closed by the caller.
* @return the data checksum obtained from the header.
*/
- public static DataChecksum readDataChecksum(File metaFile, int bufSize)
+ public static DataChecksum readDataChecksum(
+ FileInputStream inputStream, int bufSize, File metaFile)
throws IOException {
- DataInputStream in = null;
- try {
- in = new DataInputStream(new BufferedInputStream(
- new FileInputStream(metaFile), bufSize));
- return readDataChecksum(in, metaFile);
- } finally {
- IOUtils.closeStream(in);
- }
+ DataInputStream in = new DataInputStream(new BufferedInputStream(
+ inputStream, bufSize));
+ return readDataChecksum(in, metaFile);
}
/**
@@ -111,6 +107,7 @@ public class BlockMetadataHeader {
/**
* Read the header without changing the position of the FileChannel.
+ * This is used by the client for short-circuit reads.
*
* @param fc The FileChannel to read.
* @return the Metadata Header.
@@ -144,18 +141,16 @@ public class BlockMetadataHeader {
/**
* Reads header at the top of metadata file and returns the header.
+ * Closes the input stream after reading the header.
*
* @return metadata header for the block
* @throws IOException
*/
- public static BlockMetadataHeader readHeader(File file) throws IOException {
- DataInputStream in = null;
- try {
- in = new DataInputStream(new BufferedInputStream(
- new FileInputStream(file)));
+ public static BlockMetadataHeader readHeader(
+ FileInputStream fis) throws IOException {
+ try (DataInputStream in = new DataInputStream(
+ new BufferedInputStream(fis))) {
return readHeader(in);
- } finally {
- IOUtils.closeStream(in);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index e6e4057..3fa4e8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -74,6 +74,33 @@
</Match>
<!--
+ This class exposes stream constructors. The newly created streams are not
+ supposed to be closed in the constructor. Ignore the OBL warning.
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedFileOutputStream" />
+ <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+ </Match>
+
+ <!--
+ This class exposes stream constructors. The newly created streams are not
+ supposed to be closed in the constructor. Ignore the OBL warning.
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedFileInputStream" />
+ <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+ </Match>
+
+ <!--
+ This class exposes stream constructors. The newly created streams are not
+ supposed to be closed in the constructor. Ignore the OBL warning.
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.server.datanode.FileIoProvider$WrappedRandomAccessFile" />
+ <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
+ </Match>
+
+ <!--
lastAppliedTxid is carefully unsynchronized in the BackupNode in a couple spots.
See the comments in BackupImage for justification.
-->
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index df21857..cffc4bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -687,6 +687,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
public static final String DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY = "dfs.datanode.fsdataset.volume.choosing.policy";
+ public static final String DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY =
+ "dfs.datanode.fileio.events.class";
public static final String DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold";
public static final long DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT = 1024L * 1024L * 1024L * 10L; // 10 GB
public static final String DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY = "dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index f372072..441bd91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -244,8 +244,7 @@ class BlockReceiver implements Closeable {
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
- streams = replicaInfo.createStreams(isCreate, requestedChecksum,
- datanodeSlowLogThresholdMs);
+ streams = replicaInfo.createStreams(isCreate, requestedChecksum);
assert streams != null : "null streams!";
// read checksum meta information
@@ -400,9 +399,8 @@ class BlockReceiver implements Closeable {
checksumOut.flush();
long flushEndNanos = System.nanoTime();
if (isSync) {
- long fsyncStartNanos = flushEndNanos;
streams.syncChecksumOut();
- datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
+ datanode.metrics.addFsyncNanos(System.nanoTime() - flushEndNanos);
}
flushTotalNanos += flushEndNanos - flushStartNanos;
}
@@ -703,8 +701,10 @@ class BlockReceiver implements Closeable {
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
// Write data to disk.
- long duration = streams.writeToDisk(dataBuf.array(),
+ long begin = Time.monotonicNow();
+ streams.writeDataToDisk(dataBuf.array(),
startByteToDisk, numBytesToDisk);
+ long duration = Time.monotonicNow() - begin;
if (duration > maxWriteToDiskMs) {
maxWriteToDiskMs = duration;
@@ -1029,9 +1029,7 @@ class BlockReceiver implements Closeable {
* will be overwritten.
*/
private void adjustCrcFilePosition() throws IOException {
- if (streams.getDataOut() != null) {
- streams.flushDataOut();
- }
+ streams.flushDataOut();
if (checksumOut != null) {
checksumOut.flush();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 9182c88..d7aebd8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -166,6 +166,7 @@ class BlockSender implements java.io.Closeable {
private final boolean dropCacheBehindAllReads;
private long lastCacheDropOffset;
+ private final FileIoProvider fileIoProvider;
@VisibleForTesting
static long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
@@ -197,6 +198,7 @@ class BlockSender implements java.io.Closeable {
InputStream blockIn = null;
DataInputStream checksumIn = null;
FsVolumeReference volumeRef = null;
+ this.fileIoProvider = datanode.getFileIoProvider();
try {
this.block = block;
this.corruptChecksumOk = corruptChecksumOk;
@@ -401,7 +403,8 @@ class BlockSender implements java.io.Closeable {
DataNode.LOG.debug("replica=" + replica);
}
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
- ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef);
+ ris = new ReplicaInputStreams(
+ blockIn, checksumIn, volumeRef, fileIoProvider);
} catch (IOException ioe) {
IOUtils.closeStream(this);
throw ioe;
@@ -568,8 +571,9 @@ class BlockSender implements java.io.Closeable {
FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
- sockOut.transferToFully(fileCh, blockInPosition, dataLen,
- waitTime, transferTime);
+ fileIoProvider.transferToSocketFully(
+ ris.getVolumeRef().getVolume(), sockOut, fileCh, blockInPosition,
+ dataLen, waitTime, transferTime);
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
blockInPosition += dataLen;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
new file mode 100644
index 0000000..a70c151
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * {@link FileIoEvents} that simply counts the number of operations.
+ * Not meant to be used outside of testing.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class CountingFileIoEvents implements FileIoEvents {
+ private final Map<OPERATION, Counts> counts;
+
+ private static class Counts {
+ private final AtomicLong successes = new AtomicLong(0);
+ private final AtomicLong failures = new AtomicLong(0);
+
+ @JsonProperty("Successes")
+ public long getSuccesses() {
+ return successes.get();
+ }
+
+ @JsonProperty("Failures")
+ public long getFailures() {
+ return failures.get();
+ }
+ }
+
+ public CountingFileIoEvents() {
+ counts = new HashMap<>();
+ for (OPERATION op : OPERATION.values()) {
+ counts.put(op, new Counts());
+ }
+ }
+
+ @Override
+ public long beforeMetadataOp(
+ @Nullable FsVolumeSpi volume, OPERATION op) {
+ return 0;
+ }
+
+ @Override
+ public void afterMetadataOp(
+ @Nullable FsVolumeSpi volume, OPERATION op, long begin) {
+ counts.get(op).successes.incrementAndGet();
+ }
+
+ @Override
+ public long beforeFileIo(
+ @Nullable FsVolumeSpi volume, OPERATION op, long len) {
+ return 0;
+ }
+
+ @Override
+ public void afterFileIo(
+ @Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) {
+ counts.get(op).successes.incrementAndGet();
+ }
+
+ @Override
+ public void onFailure(
+ @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
+ counts.get(op).failures.incrementAndGet();
+
+ }
+
+ @Override
+ public String getStatistics() {
+ ObjectMapper objectMapper = new ObjectMapper();
+ try {
+ return objectMapper.writeValueAsString(counts);
+ } catch (JsonProcessingException e) {
+ // Failed to serialize. Don't log the exception call stack.
+ FileIoProvider.LOG.error("Failed to serialize statistics" + e);
+ return null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index b845da0..794b1ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -299,6 +299,7 @@ public class DataNode extends ReconfigurableBase
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
private static final String DATANODE_HTRACE_PREFIX = "datanode.htrace.";
+ private final FileIoProvider fileIoProvider;
/**
* Use {@link NetUtils#createSocketAddr(String)} instead.
@@ -411,6 +412,7 @@ public class DataNode extends ReconfigurableBase
this.tracer = createTracer(conf);
this.tracerConfigurationManager =
new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
+ this.fileIoProvider = new FileIoProvider(conf);
this.fileDescriptorPassingDisabledReason = null;
this.maxNumberOfBlocksToLog = 0;
this.confVersion = null;
@@ -437,6 +439,7 @@ public class DataNode extends ReconfigurableBase
this.tracer = createTracer(conf);
this.tracerConfigurationManager =
new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf);
+ this.fileIoProvider = new FileIoProvider(conf);
this.blockScanner = new BlockScanner(this);
this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
@@ -617,6 +620,10 @@ public class DataNode extends ReconfigurableBase
PipelineAck.ECN.SUPPORTED;
}
+ public FileIoProvider getFileIoProvider() {
+ return fileIoProvider;
+ }
+
/**
* Contains the StorageLocations for changed data volumes.
*/
@@ -3008,6 +3015,11 @@ public class DataNode extends ReconfigurableBase
}
}
+ @Override // DataNodeMXBean
+ public String getFileIoProviderStatistics() {
+ return fileIoProvider.getStatistics();
+ }
+
public void refreshNamenodes(Configuration conf) throws IOException {
blockPoolManager.refreshNamenodes(conf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
index 90c38d7..37f9635 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
@@ -120,4 +120,9 @@ public interface DataNodeMXBean {
* @return DiskBalancer Status
*/
String getDiskBalancerStatus();
+
+ /**
+ * Gets the {@link FileIoProvider} statistics.
+ */
+ String getFileIoProviderStatistics();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index f4deb6d..5163e6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -1356,6 +1356,12 @@ public class DataStorage extends Storage {
bpStorageMap.remove(bpId);
}
+ /**
+ * Prefer FileIoProvider#fullydelete.
+ * @param dir
+ * @return
+ */
+ @Deprecated
public static boolean fullyDelete(final File dir) {
boolean result = FileUtil.fullyDelete(dir);
return result;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
index ad054a8..c98ff54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
/** Provide utility methods for Datanode. */
@@ -55,15 +56,17 @@ public class DatanodeUtil {
* @throws IOException
* if the file already exists or if the file cannot be created.
*/
- public static File createTmpFile(Block b, File f) throws IOException {
- if (f.exists()) {
+ public static File createFileWithExistsCheck(
+ FsVolumeSpi volume, Block b, File f,
+ FileIoProvider fileIoProvider) throws IOException {
+ if (fileIoProvider.exists(volume, f)) {
throw new IOException("Failed to create temporary file for " + b
+ ". File " + f + " should not be present, but is.");
}
// Create the zero-length temp file
final boolean fileCreated;
try {
- fileCreated = f.createNewFile();
+ fileCreated = fileIoProvider.createFile(volume, f);
} catch (IOException ioe) {
throw new IOException(DISK_ERROR + "Failed to create " + f, ioe);
}
@@ -92,13 +95,17 @@ public class DatanodeUtil {
* @return true if there are no files
* @throws IOException if unable to list subdirectories
*/
- public static boolean dirNoFilesRecursive(File dir) throws IOException {
- File[] contents = dir.listFiles();
+ public static boolean dirNoFilesRecursive(
+ FsVolumeSpi volume, File dir,
+ FileIoProvider fileIoProvider) throws IOException {
+ File[] contents = fileIoProvider.listFiles(volume, dir);
if (contents == null) {
throw new IOException("Cannot list contents of " + dir);
}
for (File f : contents) {
- if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) {
+ if (!f.isDirectory() ||
+ (f.isDirectory() && !dirNoFilesRecursive(
+ volume, f, fileIoProvider))) {
return false;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
new file mode 100644
index 0000000..bd4932b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+
+import javax.annotation.Nullable;
+
+/**
+ * The default implementation of {@link FileIoEvents} that do nothing.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class DefaultFileIoEvents implements FileIoEvents {
+ @Override
+ public long beforeMetadataOp(
+ @Nullable FsVolumeSpi volume, OPERATION op) {
+ return 0;
+ }
+
+ @Override
+ public void afterMetadataOp(
+ @Nullable FsVolumeSpi volume, OPERATION op, long begin) {
+ }
+
+ @Override
+ public long beforeFileIo(
+ @Nullable FsVolumeSpi volume, OPERATION op, long len) {
+ return 0;
+ }
+
+ @Override
+ public void afterFileIo(
+ @Nullable FsVolumeSpi volume, OPERATION op, long begin, long len) {
+ }
+
+ @Override
+ public void onFailure(
+ @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) {
+ }
+
+ @Override
+ public @Nullable String getStatistics() {
+ // null is valid JSON.
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
new file mode 100644
index 0000000..48e703f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+
+import javax.annotation.Nullable;
+
+/**
+ * The following hooks can be implemented for instrumentation/fault
+ * injection.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface FileIoEvents {
+
+ /**
+ * Invoked before a filesystem metadata operation.
+ *
+ * @param volume target volume for the operation. Null if unavailable.
+ * @param op type of operation.
+ * @return timestamp at which the operation was started. 0 if
+ * unavailable.
+ */
+ long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op);
+
+ /**
+ * Invoked after a filesystem metadata operation has completed.
+ *
+ * @param volume target volume for the operation. Null if unavailable.
+ * @param op type of operation.
+ * @param begin timestamp at which the operation was started. 0
+ * if unavailable.
+ */
+ void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op, long begin);
+
+ /**
+ * Invoked before a read/write/flush/channel transfer operation.
+ *
+ * @param volume target volume for the operation. Null if unavailable.
+ * @param op type of operation.
+ * @param len length of the file IO. 0 for flush.
+ * @return timestamp at which the operation was started. 0 if
+ * unavailable.
+ */
+ long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op, long len);
+
+
+ /**
+ * Invoked after a read/write/flush/channel transfer operation
+ * has completed.
+ *
+ * @param volume target volume for the operation. Null if unavailable.
+ * @param op type of operation.
+ * @param len of the file IO. 0 for flush.
+ * @return timestamp at which the operation was started. 0 if
+ * unavailable.
+ */
+ void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op,
+ long begin, long len);
+
+ /**
+ * Invoked if an operation fails with an exception.
+ * @param volume target volume for the operation. Null if unavailable.
+ * @param op type of operation.
+ * @param e Exception encountered during the operation.
+ * @param begin time at which the operation was started.
+ */
+ void onFailure(
+ @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin);
+
+ /**
+ * Return statistics as a JSON string.
+ * @return
+ */
+ @Nullable String getStatistics();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
new file mode 100644
index 0000000..2344114
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java
@@ -0,0 +1,1006 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIOException;
+import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.file.CopyOption;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.*;
+
+/**
+ * This class abstracts out various file IO operations performed by the
+ * DataNode and invokes event hooks before and after each file IO.
+ *
+ * Behavior can be injected into these events by implementing
+ * {@link FileIoEvents} and replacing the default implementation
+ * with {@link DFSConfigKeys#DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY}.
+ *
+ * Most functions accept an optional {@link FsVolumeSpi} parameter for
+ * instrumentation/logging.
+ *
+ * Some methods may look redundant, especially the multiple variations of
+ * move/rename/list. They exist to retain behavior compatibility for existing
+ * code.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class FileIoProvider {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ FileIoProvider.class);
+
+ private final FileIoEvents eventHooks;
+
+ /**
+ * @param conf Configuration object. May be null. When null,
+ * the event handlers are no-ops.
+ */
+ public FileIoProvider(@Nullable Configuration conf) {
+ if (conf != null) {
+ final Class<? extends FileIoEvents> clazz = conf.getClass(
+ DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY,
+ DefaultFileIoEvents.class,
+ FileIoEvents.class);
+ eventHooks = ReflectionUtils.newInstance(clazz, conf);
+ } else {
+ eventHooks = new DefaultFileIoEvents();
+ }
+ }
+
+ /**
+ * Lists the types of file system operations. Passed to the
+ * IO hooks so implementations can choose behavior based on
+ * specific operations.
+ */
+ public enum OPERATION {
+ OPEN,
+ EXISTS,
+ LIST,
+ DELETE,
+ MOVE,
+ MKDIRS,
+ TRANSFER,
+ SYNC,
+ FADVISE,
+ READ,
+ WRITE,
+ FLUSH,
+ NATIVE_COPY
+ }
+
+ /**
+ * Retrieve statistics from the underlying {@link FileIoEvents}
+ * implementation as a JSON string, if it maintains them.
+ * @return statistics as a JSON string. May be null.
+ */
+ public @Nullable String getStatistics() {
+ return eventHooks.getStatistics();
+ }
+
+ /**
+ * See {@link Flushable#flush()}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @throws IOException
+ */
+ public void flush(
+ @Nullable FsVolumeSpi volume, Flushable f) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, FLUSH, 0);
+ try {
+ f.flush();
+ eventHooks.afterFileIo(volume, FLUSH, begin, 0);
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, FLUSH, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Sync the given {@link FileOutputStream}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @throws IOException
+ */
+ public void sync(
+ @Nullable FsVolumeSpi volume, FileOutputStream fos) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, SYNC, 0);
+ try {
+ fos.getChannel().force(true);
+ eventHooks.afterFileIo(volume, SYNC, begin, 0);
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, SYNC, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Call sync_file_range on the given file descriptor.
+ *
+ * @param volume target volume. null if unavailable.
+ * @throws IOException
+ */
+ public void syncFileRange(
+ @Nullable FsVolumeSpi volume, FileDescriptor outFd,
+ long offset, long numBytes, int flags) throws NativeIOException {
+ final long begin = eventHooks.beforeFileIo(volume, SYNC, 0);
+ try {
+ NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags);
+ eventHooks.afterFileIo(volume, SYNC, begin, 0);
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, SYNC, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Call posix_fadvise on the given file descriptor.
+ *
+ * @param volume target volume. null if unavailable.
+ * @throws IOException
+ */
+ public void posixFadvise(
+ @Nullable FsVolumeSpi volume, String identifier, FileDescriptor outFd,
+ long offset, long length, int flags) throws NativeIOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, FADVISE);
+ try {
+ NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+ identifier, outFd, offset, length, flags);
+ eventHooks.afterMetadataOp(volume, FADVISE, begin);
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, FADVISE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Delete a file.
+ * @param volume target volume. null if unavailable.
+ * @param f File to delete.
+ * @return true if the file was successfully deleted.
+ */
+ public boolean delete(@Nullable FsVolumeSpi volume, File f) {
+ final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
+ try {
+ boolean deleted = f.delete();
+ eventHooks.afterMetadataOp(volume, DELETE, begin);
+ return deleted;
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, DELETE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Delete a file, first checking to see if it exists.
+ * @param volume target volume. null if unavailable.
+ * @param f File to delete
+ * @return true if the file was successfully deleted or if it never
+ * existed.
+ */
+ public boolean deleteWithExistsCheck(@Nullable FsVolumeSpi volume, File f) {
+ final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
+ try {
+ boolean deleted = !f.exists() || f.delete();
+ eventHooks.afterMetadataOp(volume, DELETE, begin);
+ if (!deleted) {
+ LOG.warn("Failed to delete file {}", f);
+ }
+ return deleted;
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, DELETE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Transfer data from a FileChannel to a SocketOutputStream.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param sockOut SocketOutputStream to write the data.
+ * @param fileCh FileChannel from which to read data.
+ * @param position position within the channel where the transfer begins.
+ * @param count number of bytes to transfer.
+ * @param waitTime returns the nanoseconds spent waiting for the socket
+ * to become writable.
+ * @param transferTime returns the nanoseconds spent transferring data.
+ * @throws IOException
+ */
+ public void transferToSocketFully(
+ @Nullable FsVolumeSpi volume, SocketOutputStream sockOut,
+ FileChannel fileCh, long position, int count,
+ LongWritable waitTime, LongWritable transferTime) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, TRANSFER, count);
+ try {
+ sockOut.transferToFully(fileCh, position, count,
+ waitTime, transferTime);
+ eventHooks.afterFileIo(volume, TRANSFER, begin, count);
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, TRANSFER, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Create a file.
+ * @param volume target volume. null if unavailable.
+ * @param f File to be created.
+ * @return true if the file does not exist and was successfully created.
+ * false if the file already exists.
+ * @throws IOException
+ */
+ public boolean createFile(
+ @Nullable FsVolumeSpi volume, File f) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+ try {
+ boolean created = f.createNewFile();
+ eventHooks.afterMetadataOp(volume, OPEN, begin);
+ return created;
+ } catch (Exception e) {
+ eventHooks.onFailure(volume, OPEN, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Create a FileInputStream using
+ * {@link FileInputStream#FileInputStream(File)}.
+ *
+ * Wraps the created input stream to intercept read calls
+ * before delegating to the wrapped stream.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f File object.
+ * @return FileInputStream to the given file.
+ * @throws FileNotFoundException
+ */
+ public FileInputStream getFileInputStream(
+ @Nullable FsVolumeSpi volume, File f) throws FileNotFoundException {
+ final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+ FileInputStream fis = null;
+ try {
+ fis = new WrappedFileInputStream(volume, f);
+ eventHooks.afterMetadataOp(volume, OPEN, begin);
+ return fis;
+ } catch(Exception e) {
+ org.apache.commons.io.IOUtils.closeQuietly(fis);
+ eventHooks.onFailure(volume, OPEN, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Create a FileOutputStream using
+ * {@link FileOutputStream#FileOutputStream(File, boolean)}.
+ *
+ * Wraps the created output stream to intercept write calls
+ * before delegating to the wrapped stream.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f File object.
+ * @param append if true, then bytes will be written to the end of the
+ * file rather than the beginning.
+ * @param FileOutputStream to the given file object.
+ * @throws FileNotFoundException
+ */
+ public FileOutputStream getFileOutputStream(
+ @Nullable FsVolumeSpi volume, File f,
+ boolean append) throws FileNotFoundException {
+ final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+ FileOutputStream fos = null;
+ try {
+ fos = new WrappedFileOutputStream(volume, f, append);
+ eventHooks.afterMetadataOp(volume, OPEN, begin);
+ return fos;
+ } catch(Exception e) {
+ org.apache.commons.io.IOUtils.closeQuietly(fos);
+ eventHooks.onFailure(volume, OPEN, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Create a FileOutputStream using
+ * {@link FileOutputStream#FileOutputStream(File, boolean)}.
+ *
+ * Wraps the created output stream to intercept write calls
+ * before delegating to the wrapped stream.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f File object.
+ * @return FileOutputStream to the given file object.
+ * @throws FileNotFoundException
+ */
+ public FileOutputStream getFileOutputStream(
+ @Nullable FsVolumeSpi volume, File f) throws FileNotFoundException {
+ return getFileOutputStream(volume, f, false);
+ }
+
+ /**
+ * Create a FileOutputStream using
+ * {@link FileOutputStream#FileOutputStream(FileDescriptor)}.
+ *
+ * Wraps the created output stream to intercept write calls
+ * before delegating to the wrapped stream.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f File object.
+ * @return FileOutputStream to the given file object.
+ * @throws FileNotFoundException
+ */
+ public FileOutputStream getFileOutputStream(
+ @Nullable FsVolumeSpi volume, FileDescriptor fd) {
+ return new WrappedFileOutputStream(volume, fd);
+ }
+
+ /**
+ * Create a FileInputStream using
+ * {@link NativeIO#getShareDeleteFileDescriptor}.
+ * Wraps the created input stream to intercept input calls
+ * before delegating to the wrapped stream.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f File object.
+ * @param offset the offset position, measured in bytes from the
+ * beginning of the file, at which to set the file
+ * pointer.
+ * @return FileOutputStream to the given file object.
+ * @throws FileNotFoundException
+ */
+ public FileInputStream getShareDeleteFileInputStream(
+ @Nullable FsVolumeSpi volume, File f,
+ long offset) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+ FileInputStream fis = null;
+ try {
+ fis = new WrappedFileInputStream(volume,
+ NativeIO.getShareDeleteFileDescriptor(f, offset));
+ eventHooks.afterMetadataOp(volume, OPEN, begin);
+ return fis;
+ } catch(Exception e) {
+ org.apache.commons.io.IOUtils.closeQuietly(fis);
+ eventHooks.onFailure(volume, OPEN, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Create a FileInputStream using
+ * {@link FileInputStream#FileInputStream(File)} and position
+ * it at the given offset.
+ *
+ * Wraps the created input stream to intercept read calls
+ * before delegating to the wrapped stream.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f File object.
+ * @param offset the offset position, measured in bytes from the
+ * beginning of the file, at which to set the file
+ * pointer.
+ * @throws FileNotFoundException
+ */
+ public FileInputStream openAndSeek(
+ @Nullable FsVolumeSpi volume, File f, long offset) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+ FileInputStream fis = null;
+ try {
+ fis = new WrappedFileInputStream(volume,
+ FsDatasetUtil.openAndSeek(f, offset));
+ eventHooks.afterMetadataOp(volume, OPEN, begin);
+ return fis;
+ } catch(Exception e) {
+ org.apache.commons.io.IOUtils.closeQuietly(fis);
+ eventHooks.onFailure(volume, OPEN, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Create a RandomAccessFile using
+ * {@link RandomAccessFile#RandomAccessFile(File, String)}.
+ *
+ * Wraps the created input stream to intercept IO calls
+ * before delegating to the wrapped RandomAccessFile.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f File object.
+ * @param mode See {@link RandomAccessFile} for a description
+ * of the mode string.
+ * @return RandomAccessFile representing the given file.
+ * @throws FileNotFoundException
+ */
+ public RandomAccessFile getRandomAccessFile(
+ @Nullable FsVolumeSpi volume, File f,
+ String mode) throws FileNotFoundException {
+ final long begin = eventHooks.beforeMetadataOp(volume, OPEN);
+ RandomAccessFile raf = null;
+ try {
+ raf = new WrappedRandomAccessFile(volume, f, mode);
+ eventHooks.afterMetadataOp(volume, OPEN, begin);
+ return raf;
+ } catch(Exception e) {
+ org.apache.commons.io.IOUtils.closeQuietly(raf);
+ eventHooks.onFailure(volume, OPEN, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Delete the given directory using {@link FileUtil#fullyDelete(File)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param dir directory to be deleted.
+ * @return true on success false on failure.
+ */
+ public boolean fullyDelete(@Nullable FsVolumeSpi volume, File dir) {
+ final long begin = eventHooks.beforeMetadataOp(volume, DELETE);
+ try {
+ boolean deleted = FileUtil.fullyDelete(dir);
+ eventHooks.afterMetadataOp(volume, DELETE, begin);
+ return deleted;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, DELETE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Move the src file to the target using
+ * {@link FileUtil#replaceFile(File, File)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param src source path.
+ * @param target target path.
+ * @throws IOException
+ */
+ public void replaceFile(
+ @Nullable FsVolumeSpi volume, File src, File target) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+ try {
+ FileUtil.replaceFile(src, target);
+ eventHooks.afterMetadataOp(volume, MOVE, begin);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, MOVE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Move the src file to the target using
+ * {@link Storage#rename(File, File)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param src source path.
+ * @param target target path.
+ * @throws IOException
+ */
+ public void rename(
+ @Nullable FsVolumeSpi volume, File src, File target)
+ throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+ try {
+ Storage.rename(src, target);
+ eventHooks.afterMetadataOp(volume, MOVE, begin);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, MOVE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Move the src file to the target using
+ * {@link FileUtils#moveFile(File, File)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param src source path.
+ * @param target target path.
+ * @throws IOException
+ */
+ public void moveFile(
+ @Nullable FsVolumeSpi volume, File src, File target)
+ throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+ try {
+ FileUtils.moveFile(src, target);
+ eventHooks.afterMetadataOp(volume, MOVE, begin);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, MOVE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Move the src file to the target using
+ * {@link Files#move(Path, Path, CopyOption...)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param src source path.
+ * @param target target path.
+ * @param options See {@link Files#move} for a description
+ * of the options.
+ * @throws IOException
+ */
+ public void move(
+ @Nullable FsVolumeSpi volume, Path src, Path target,
+ CopyOption... options) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, MOVE);
+ try {
+ Files.move(src, target, options);
+ eventHooks.afterMetadataOp(volume, MOVE, begin);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, MOVE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * See {@link Storage#nativeCopyFileUnbuffered(File, File, boolean)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param src an existing file to copy, must not be {@code null}
+ * @param target the new file, must not be {@code null}
+ * @param preserveFileDate true if the file date of the copy
+ * should be the same as the original
+ * @throws IOException
+ */
+ public void nativeCopyFileUnbuffered(
+ @Nullable FsVolumeSpi volume, File src, File target,
+ boolean preserveFileDate) throws IOException {
+ final long length = src.length();
+ final long begin = eventHooks.beforeFileIo(volume, NATIVE_COPY, length);
+ try {
+ Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate);
+ eventHooks.afterFileIo(volume, NATIVE_COPY, begin, length);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, NATIVE_COPY, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * See {@link File#mkdirs()}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param dir directory to be created.
+ * @return true only if the directory was created. false if
+ * the directory already exists.
+ * @throws IOException if a directory with the given name does
+ * not exist and could not be created.
+ */
+ public boolean mkdirs(
+ @Nullable FsVolumeSpi volume, File dir) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, MKDIRS);
+ boolean created = false;
+ boolean isDirectory;
+ try {
+ created = dir.mkdirs();
+ isDirectory = !created && dir.isDirectory();
+ eventHooks.afterMetadataOp(volume, MKDIRS, begin);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, MKDIRS, e, begin);
+ throw e;
+ }
+
+ if (!created && !isDirectory) {
+ throw new IOException("Mkdirs failed to create " + dir);
+ }
+ return created;
+ }
+
+ /**
+ * Create the target directory using {@link File#mkdirs()} only if
+ * it doesn't exist already.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param dir directory to be created.
+ * @throws IOException if the directory could not created
+ */
+ public void mkdirsWithExistsCheck(
+ @Nullable FsVolumeSpi volume, File dir) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, MKDIRS);
+ boolean succeeded = false;
+ try {
+ succeeded = dir.isDirectory() || dir.mkdirs();
+ eventHooks.afterMetadataOp(volume, MKDIRS, begin);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, MKDIRS, e, begin);
+ throw e;
+ }
+
+ if (!succeeded) {
+ throw new IOException("Mkdirs failed to create " + dir);
+ }
+ }
+
+ /**
+ * Get a listing of the given directory using
+ * {@link FileUtil#listFiles(File)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param dir Directory to be listed.
+ * @return array of file objects representing the directory entries.
+ * @throws IOException
+ */
+ public File[] listFiles(
+ @Nullable FsVolumeSpi volume, File dir) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+ try {
+ File[] children = FileUtil.listFiles(dir);
+ eventHooks.afterMetadataOp(volume, LIST, begin);
+ return children;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, LIST, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Get a listing of the given directory using
+ * {@link FileUtil#listFiles(File)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param Driectory to be listed.
+ * @return array of strings representing the directory entries.
+ * @throws IOException
+ */
+ public String[] list(
+ @Nullable FsVolumeSpi volume, File dir) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+ try {
+ String[] children = FileUtil.list(dir);
+ eventHooks.afterMetadataOp(volume, LIST, begin);
+ return children;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, LIST, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Get a listing of the given directory using
+ * {@link IOUtils#listDirectory(File, FilenameFilter)}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param dir Directory to list.
+ * @param filter {@link FilenameFilter} to filter the directory entries.
+ * @throws IOException
+ */
+ public List<String> listDirectory(
+ @Nullable FsVolumeSpi volume, File dir,
+ FilenameFilter filter) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+ try {
+ List<String> children = IOUtils.listDirectory(dir, filter);
+ eventHooks.afterMetadataOp(volume, LIST, begin);
+ return children;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, LIST, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Retrieves the number of links to the specified file.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f file whose link count is being queried.
+ * @return number of hard-links to the given file, including the
+ * given path itself.
+ * @throws IOException
+ */
+ public int getHardLinkCount(
+ @Nullable FsVolumeSpi volume, File f) throws IOException {
+ final long begin = eventHooks.beforeMetadataOp(volume, LIST);
+ try {
+ int count = HardLink.getLinkCount(f);
+ eventHooks.afterMetadataOp(volume, LIST, begin);
+ return count;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, LIST, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * Check for file existence using {@link File#exists()}.
+ *
+ * @param volume target volume. null if unavailable.
+ * @param f file object.
+ * @return true if the file exists.
+ */
+ public boolean exists(@Nullable FsVolumeSpi volume, File f) {
+ final long begin = eventHooks.beforeMetadataOp(volume, EXISTS);
+ try {
+ boolean exists = f.exists();
+ eventHooks.afterMetadataOp(volume, EXISTS, begin);
+ return exists;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, EXISTS, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * A thin wrapper over {@link FileInputStream} that allows
+ * instrumenting disk IO.
+ */
+ private final class WrappedFileInputStream extends FileInputStream {
+ private @Nullable final FsVolumeSpi volume;
+
+ /**
+ * {@inheritDoc}.
+ */
+ private WrappedFileInputStream(@Nullable FsVolumeSpi volume, File f)
+ throws FileNotFoundException {
+ super(f);
+ this.volume = volume;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ private WrappedFileInputStream(
+ @Nullable FsVolumeSpi volume, FileDescriptor fd) {
+ super(fd);
+ this.volume = volume;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public int read() throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, READ, 1);
+ try {
+ int b = super.read();
+ eventHooks.afterFileIo(volume, READ, begin, 1);
+ return b;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, READ, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public int read(@Nonnull byte[] b) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, READ, b.length);
+ try {
+ int numBytesRead = super.read(b);
+ eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+ return numBytesRead;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, READ, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public int read(@Nonnull byte[] b, int off, int len) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, READ, len);
+ try {
+ int numBytesRead = super.read(b, off, len);
+ eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+ return numBytesRead;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, READ, e, begin);
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * A thin wrapper over {@link FileOutputStream} that allows
+ * instrumenting disk IO.
+ */
+ private final class WrappedFileOutputStream extends FileOutputStream {
+ private @Nullable final FsVolumeSpi volume;
+
+ /**
+ * {@inheritDoc}.
+ */
+ private WrappedFileOutputStream(
+ @Nullable FsVolumeSpi volume, File f,
+ boolean append) throws FileNotFoundException {
+ super(f, append);
+ this.volume = volume;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ private WrappedFileOutputStream(
+ @Nullable FsVolumeSpi volume, FileDescriptor fd) {
+ super(fd);
+ this.volume = volume;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void write(int b) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, WRITE, 1);
+ try {
+ super.write(b);
+ eventHooks.afterFileIo(volume, WRITE, begin, 1);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, WRITE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void write(@Nonnull byte[] b) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, WRITE, b.length);
+ try {
+ super.write(b);
+ eventHooks.afterFileIo(volume, WRITE, begin, b.length);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, WRITE, e, begin);
+ throw e;
+ }
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void write(@Nonnull byte[] b, int off, int len) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, WRITE, len);
+ try {
+ super.write(b, off, len);
+ eventHooks.afterFileIo(volume, WRITE, begin, len);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, WRITE, e, begin);
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * A thin wrapper over {@link FileInputStream} that allows
+ * instrumenting IO.
+ */
+ private final class WrappedRandomAccessFile extends RandomAccessFile {
+ private @Nullable final FsVolumeSpi volume;
+
+ public WrappedRandomAccessFile(
+ @Nullable FsVolumeSpi volume, File f, String mode)
+ throws FileNotFoundException {
+ super(f, mode);
+ this.volume = volume;
+ }
+
+ @Override
+ public int read() throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, READ, 1);
+ try {
+ int b = super.read();
+ eventHooks.afterFileIo(volume, READ, begin, 1);
+ return b;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, READ, e, begin);
+ throw e;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, READ, len);
+ try {
+ int numBytesRead = super.read(b, off, len);
+ eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+ return numBytesRead;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, READ, e, begin);
+ throw e;
+ }
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, READ, b.length);
+ try {
+ int numBytesRead = super.read(b);
+ eventHooks.afterFileIo(volume, READ, begin, numBytesRead);
+ return numBytesRead;
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, READ, e, begin);
+ throw e;
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, WRITE, 1);
+ try {
+ super.write(b);
+ eventHooks.afterFileIo(volume, WRITE, begin, 1);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, WRITE, e, begin);
+ throw e;
+ }
+ }
+
+ @Override
+ public void write(@Nonnull byte[] b) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, WRITE, b.length);
+ try {
+ super.write(b);
+ eventHooks.afterFileIo(volume, WRITE, begin, b.length);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, WRITE, e, begin);
+ throw e;
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ final long begin = eventHooks.beforeFileIo(volume, WRITE, len);
+ try {
+ super.write(b, off, len);
+ eventHooks.afterFileIo(volume, WRITE, begin, len);
+ } catch(Exception e) {
+ eventHooks.onFailure(volume, WRITE, e, begin);
+ throw e;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
index e6f7e12..1d46ddd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
@@ -29,17 +29,13 @@ import java.net.URI;
import java.util.HashMap;
import java.util.Map;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
@@ -187,20 +183,23 @@ abstract public class LocalReplica extends ReplicaInfo {
* be recovered (especially on Windows) on datanode restart.
*/
private void breakHardlinks(File file, Block b) throws IOException {
- File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
- try (FileInputStream in = new FileInputStream(file)) {
- try (FileOutputStream out = new FileOutputStream(tmpFile)){
- copyBytes(in, out, 16 * 1024);
+ final FileIoProvider fileIoProvider = getFileIoProvider();
+ final File tmpFile = DatanodeUtil.createFileWithExistsCheck(
+ getVolume(), b, DatanodeUtil.getUnlinkTmpFile(file), fileIoProvider);
+ try (FileInputStream in = fileIoProvider.getFileInputStream(
+ getVolume(), file)) {
+ try (FileOutputStream out = fileIoProvider.getFileOutputStream(
+ getVolume(), tmpFile)) {
+ IOUtils.copyBytes(in, out, 16 * 1024);
}
if (file.length() != tmpFile.length()) {
throw new IOException("Copy of file " + file + " size " + file.length()+
" into file " + tmpFile +
" resulted in a size of " + tmpFile.length());
}
- replaceFile(tmpFile, file);
+ fileIoProvider.replaceFile(getVolume(), tmpFile, file);
} catch (IOException e) {
- boolean done = tmpFile.delete();
- if (!done) {
+ if (!fileIoProvider.delete(getVolume(), tmpFile)) {
DataNode.LOG.info("detachFile failed to delete temporary file " +
tmpFile);
}
@@ -226,19 +225,20 @@ abstract public class LocalReplica extends ReplicaInfo {
* @throws IOException
*/
public boolean breakHardLinksIfNeeded() throws IOException {
- File file = getBlockFile();
+ final File file = getBlockFile();
+ final FileIoProvider fileIoProvider = getFileIoProvider();
if (file == null || getVolume() == null) {
throw new IOException("detachBlock:Block not found. " + this);
}
File meta = getMetaFile();
- int linkCount = getHardLinkCount(file);
+ int linkCount = fileIoProvider.getHardLinkCount(getVolume(), file);
if (linkCount > 1) {
DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
"block " + this);
breakHardlinks(file, this);
}
- if (getHardLinkCount(meta) > 1) {
+ if (fileIoProvider.getHardLinkCount(getVolume(), meta) > 1) {
breakHardlinks(meta, this);
}
return true;
@@ -256,17 +256,18 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public OutputStream getDataOutputStream(boolean append) throws IOException {
- return new FileOutputStream(getBlockFile(), append);
+ return getFileIoProvider().getFileOutputStream(
+ getVolume(), getBlockFile(), append);
}
@Override
public boolean blockDataExists() {
- return getBlockFile().exists();
+ return getFileIoProvider().exists(getVolume(), getBlockFile());
}
@Override
public boolean deleteBlockData() {
- return fullyDelete(getBlockFile());
+ return getFileIoProvider().fullyDelete(getVolume(), getBlockFile());
}
@Override
@@ -282,9 +283,10 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public LengthInputStream getMetadataInputStream(long offset)
throws IOException {
- File meta = getMetaFile();
+ final File meta = getMetaFile();
return new LengthInputStream(
- FsDatasetUtil.openAndSeek(meta, offset), meta.length());
+ getFileIoProvider().openAndSeek(getVolume(), meta, offset),
+ meta.length());
}
@Override
@@ -295,12 +297,12 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public boolean metadataExists() {
- return getMetaFile().exists();
+ return getFileIoProvider().exists(getVolume(), getMetaFile());
}
@Override
public boolean deleteMetadata() {
- return fullyDelete(getMetaFile());
+ return getFileIoProvider().fullyDelete(getVolume(), getMetaFile());
}
@Override
@@ -320,7 +322,7 @@ abstract public class LocalReplica extends ReplicaInfo {
private boolean renameFile(File srcfile, File destfile) throws IOException {
try {
- rename(srcfile, destfile);
+ getFileIoProvider().rename(getVolume(), srcfile, destfile);
return true;
} catch (IOException e) {
throw new IOException("Failed to move block file for " + this
@@ -360,9 +362,9 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public void bumpReplicaGS(long newGS) throws IOException {
long oldGS = getGenerationStamp();
- File oldmeta = getMetaFile();
+ final File oldmeta = getMetaFile();
setGenerationStamp(newGS);
- File newmeta = getMetaFile();
+ final File newmeta = getMetaFile();
// rename meta file to new GS
if (LOG.isDebugEnabled()) {
@@ -370,7 +372,7 @@ abstract public class LocalReplica extends ReplicaInfo {
}
try {
// calling renameMeta on the ReplicaInfo doesn't work here
- rename(oldmeta, newmeta);
+ getFileIoProvider().rename(getVolume(), oldmeta, newmeta);
} catch (IOException e) {
setGenerationStamp(oldGS); // restore old GS
throw new IOException("Block " + this + " reopen failed. " +
@@ -381,7 +383,8 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public void truncateBlock(long newLength) throws IOException {
- truncateBlock(getBlockFile(), getMetaFile(), getNumBytes(), newLength);
+ truncateBlock(getVolume(), getBlockFile(), getMetaFile(),
+ getNumBytes(), newLength, getFileIoProvider());
}
@Override
@@ -392,32 +395,15 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public void copyMetadata(URI destination) throws IOException {
//for local replicas, we assume the destination URI is file
- nativeCopyFileUnbuffered(getMetaFile(), new File(destination), true);
+ getFileIoProvider().nativeCopyFileUnbuffered(
+ getVolume(), getMetaFile(), new File(destination), true);
}
@Override
public void copyBlockdata(URI destination) throws IOException {
//for local replicas, we assume the destination URI is file
- nativeCopyFileUnbuffered(getBlockFile(), new File(destination), true);
- }
-
- public void renameMeta(File newMetaFile) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming " + getMetaFile() + " to " + newMetaFile);
- }
- renameFile(getMetaFile(), newMetaFile);
- }
-
- public void renameBlock(File newBlockFile) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming " + getBlockFile() + " to " + newBlockFile
- + ", file length=" + getBlockFile().length());
- }
- renameFile(getBlockFile(), newBlockFile);
- }
-
- public static void rename(File from, File to) throws IOException {
- Storage.rename(from, to);
+ getFileIoProvider().nativeCopyFileUnbuffered(
+ getVolume(), getBlockFile(), new File(destination), true);
}
/**
@@ -430,11 +416,13 @@ abstract public class LocalReplica extends ReplicaInfo {
private FileInputStream getDataInputStream(File f, long seekOffset)
throws IOException {
FileInputStream fis;
+ final FileIoProvider fileIoProvider = getFileIoProvider();
if (NativeIO.isAvailable()) {
- fis = NativeIO.getShareDeleteFileInputStream(f, seekOffset);
+ fis = fileIoProvider.getShareDeleteFileInputStream(
+ getVolume(), f, seekOffset);
} else {
try {
- fis = FsDatasetUtil.openAndSeek(f, seekOffset);
+ fis = fileIoProvider.openAndSeek(getVolume(), f, seekOffset);
} catch (FileNotFoundException fnfe) {
throw new IOException("Expected block file at " + f +
" does not exist.");
@@ -443,30 +431,6 @@ abstract public class LocalReplica extends ReplicaInfo {
return fis;
}
- private void nativeCopyFileUnbuffered(File srcFile, File destFile,
- boolean preserveFileDate) throws IOException {
- Storage.nativeCopyFileUnbuffered(srcFile, destFile, preserveFileDate);
- }
-
- private void copyBytes(InputStream in, OutputStream out, int
- buffSize) throws IOException{
- IOUtils.copyBytes(in, out, buffSize);
- }
-
- private void replaceFile(File src, File target) throws IOException {
- FileUtil.replaceFile(src, target);
- }
-
- public static boolean fullyDelete(final File dir) {
- boolean result = DataStorage.fullyDelete(dir);
- return result;
- }
-
- public static int getHardLinkCount(File fileName) throws IOException {
- int linkCount = HardLink.getLinkCount(fileName);
- return linkCount;
- }
-
/**
* Get pin status of a file by checking the sticky bit.
* @param localFS local file system
@@ -495,8 +459,10 @@ abstract public class LocalReplica extends ReplicaInfo {
localFS.setPermission(path, permission);
}
- public static void truncateBlock(File blockFile, File metaFile,
- long oldlen, long newlen) throws IOException {
+ public static void truncateBlock(
+ FsVolumeSpi volume, File blockFile, File metaFile,
+ long oldlen, long newlen, FileIoProvider fileIoProvider)
+ throws IOException {
LOG.info("truncateBlock: blockFile=" + blockFile
+ ", metaFile=" + metaFile
+ ", oldlen=" + oldlen
@@ -510,7 +476,10 @@ abstract public class LocalReplica extends ReplicaInfo {
+ ") to newlen (=" + newlen + ")");
}
- DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
+ // fis is closed by BlockMetadataHeader.readHeader.
+ final FileInputStream fis = fileIoProvider.getFileInputStream(
+ volume, metaFile);
+ DataChecksum dcs = BlockMetadataHeader.readHeader(fis).getChecksum();
int checksumsize = dcs.getChecksumSize();
int bpc = dcs.getBytesPerChecksum();
long n = (newlen - 1)/bpc + 1;
@@ -519,16 +488,14 @@ abstract public class LocalReplica extends ReplicaInfo {
int lastchunksize = (int)(newlen - lastchunkoffset);
byte[] b = new byte[Math.max(lastchunksize, checksumsize)];
- RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
- try {
+ try (RandomAccessFile blockRAF = fileIoProvider.getRandomAccessFile(
+ volume, blockFile, "rw")) {
//truncate blockFile
blockRAF.setLength(newlen);
//read last chunk
blockRAF.seek(lastchunkoffset);
blockRAF.readFully(b, 0, lastchunksize);
- } finally {
- blockRAF.close();
}
//compute checksum
@@ -536,13 +503,11 @@ abstract public class LocalReplica extends ReplicaInfo {
dcs.writeValue(b, 0, false);
//update metaFile
- RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
- try {
+ try (RandomAccessFile metaRAF = fileIoProvider.getRandomAccessFile(
+ volume, metaFile, "rw")) {
metaRAF.setLength(newmetalen);
metaRAF.seek(newmetalen - checksumsize);
metaRAF.write(b, 0, checksumsize);
- } finally {
- metaRAF.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
index 1387155..003f96f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
@@ -245,10 +245,9 @@ public class LocalReplicaInPipeline extends LocalReplica
@Override // ReplicaInPipeline
public ReplicaOutputStreams createStreams(boolean isCreate,
- DataChecksum requestedChecksum, long slowLogThresholdMs)
- throws IOException {
- File blockFile = getBlockFile();
- File metaFile = getMetaFile();
+ DataChecksum requestedChecksum) throws IOException {
+ final File blockFile = getBlockFile();
+ final File metaFile = getMetaFile();
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("writeTo blockfile is " + blockFile +
" of size " + blockFile.length());
@@ -262,14 +261,16 @@ public class LocalReplicaInPipeline extends LocalReplica
// may differ from requestedChecksum for appends.
final DataChecksum checksum;
- RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+ final RandomAccessFile metaRAF =
+ getFileIoProvider().getRandomAccessFile(getVolume(), metaFile, "rw");
if (!isCreate) {
// For append or recovery, we must enforce the existing checksum.
// Also, verify that the file has correct lengths, etc.
boolean checkedMeta = false;
try {
- BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
+ BlockMetadataHeader header =
+ BlockMetadataHeader.readHeader(metaRAF);
checksum = header.getChecksum();
if (checksum.getBytesPerChecksum() !=
@@ -302,20 +303,24 @@ public class LocalReplicaInPipeline extends LocalReplica
checksum = requestedChecksum;
}
+ final FileIoProvider fileIoProvider = getFileIoProvider();
FileOutputStream blockOut = null;
FileOutputStream crcOut = null;
try {
- blockOut = new FileOutputStream(
- new RandomAccessFile(blockFile, "rw").getFD());
- crcOut = new FileOutputStream(metaRAF.getFD());
+ blockOut = fileIoProvider.getFileOutputStream(
+ getVolume(),
+ fileIoProvider.getRandomAccessFile(getVolume(), blockFile, "rw")
+ .getFD());
+ crcOut = fileIoProvider.getFileOutputStream(getVolume(), metaRAF.getFD());
if (!isCreate) {
blockOut.getChannel().position(blockDiskSize);
crcOut.getChannel().position(crcDiskSize);
}
return new ReplicaOutputStreams(blockOut, crcOut, checksum,
- getVolume().isTransientStorage(), slowLogThresholdMs);
+ getVolume(), fileIoProvider);
} catch (IOException e) {
IOUtils.closeStream(blockOut);
+ IOUtils.closeStream(crcOut);
IOUtils.closeStream(metaRAF);
throw e;
}
@@ -326,11 +331,11 @@ public class LocalReplicaInPipeline extends LocalReplica
File blockFile = getBlockFile();
File restartMeta = new File(blockFile.getParent() +
File.pathSeparator + "." + blockFile.getName() + ".restart");
- if (restartMeta.exists() && !restartMeta.delete()) {
+ if (!getFileIoProvider().deleteWithExistsCheck(getVolume(), restartMeta)) {
DataNode.LOG.warn("Failed to delete restart meta file: " +
restartMeta.getPath());
}
- return new FileOutputStream(restartMeta);
+ return getFileIoProvider().getFileOutputStream(getVolume(), restartMeta);
}
@Override
@@ -373,12 +378,14 @@ public class LocalReplicaInPipeline extends LocalReplica
+ " should be derived from LocalReplica");
}
- LocalReplica oldReplica = (LocalReplica) oldReplicaInfo;
- File oldmeta = oldReplica.getMetaFile();
- File newmeta = getMetaFile();
+ final LocalReplica oldReplica = (LocalReplica) oldReplicaInfo;
+ final File oldBlockFile = oldReplica.getBlockFile();
+ final File oldmeta = oldReplica.getMetaFile();
+ final File newmeta = getMetaFile();
+ final FileIoProvider fileIoProvider = getFileIoProvider();
try {
- oldReplica.renameMeta(newmeta);
+ fileIoProvider.rename(getVolume(), oldmeta, newmeta);
} catch (IOException e) {
throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
" Unable to move meta file " + oldmeta +
@@ -386,10 +393,10 @@ public class LocalReplicaInPipeline extends LocalReplica
}
try {
- oldReplica.renameBlock(newBlkFile);
+ fileIoProvider.rename(getVolume(), oldBlockFile, newBlkFile);
} catch (IOException e) {
try {
- renameMeta(oldmeta);
+ fileIoProvider.rename(getVolume(), newmeta, oldmeta);
} catch (IOException ex) {
LOG.warn("Cannot move meta file " + newmeta +
"back to the finalized directory " + oldmeta, ex);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index 5fdbec0..efa6ea6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -69,13 +69,11 @@ public interface ReplicaInPipeline extends Replica {
*
* @param isCreate if it is for creation
* @param requestedChecksum the checksum the writer would prefer to use
- * @param slowLogThresholdMs slow io threshold for logging
* @return output streams for writing
* @throws IOException if any error occurs
*/
public ReplicaOutputStreams createStreams(boolean isCreate,
- DataChecksum requestedChecksum, long slowLogThresholdMs)
- throws IOException;
+ DataChecksum requestedChecksum) throws IOException;
/**
* Create an output stream to write restart metadata in case of datanode
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[9/9] hadoop git commit: HADOOP-13890. Maintain HTTP/host as SPNEGO
SPN support and fix KerberosName parsing. Contributed by Xiaoyu Yao.
Posted by xg...@apache.org.
HADOOP-13890. Maintain HTTP/host as SPNEGO SPN support and fix KerberosName parsing. Contributed by Xiaoyu Yao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f5e0bd30
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f5e0bd30
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f5e0bd30
Branch: refs/heads/YARN-5734
Commit: f5e0bd30fde654ed48fe73e5c0523030365385a4
Parents: 6ba9587
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Wed Dec 14 13:41:40 2016 -0800
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Wed Dec 14 13:45:21 2016 -0800
----------------------------------------------------------------------
.../server/KerberosAuthenticationHandler.java | 19 +++++++++--------
.../authentication/util/KerberosName.java | 4 ++--
.../authentication/util/TestKerberosName.java | 22 ++++++++++++++++++++
.../delegation/web/TestWebDelegationToken.java | 4 ++++
4 files changed, 38 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5e0bd30/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/KerberosAuthenticationHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/KerberosAuthenticationHandler.java b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/KerberosAuthenticationHandler.java
index f51bbd6..e0ee227 100644
--- a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/KerberosAuthenticationHandler.java
+++ b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/KerberosAuthenticationHandler.java
@@ -73,7 +73,7 @@ import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
* </ul>
*/
public class KerberosAuthenticationHandler implements AuthenticationHandler {
- private static final Logger LOG = LoggerFactory.getLogger(
+ public static final Logger LOG = LoggerFactory.getLogger(
KerberosAuthenticationHandler.class);
/**
@@ -274,14 +274,14 @@ public class KerberosAuthenticationHandler implements AuthenticationHandler {
loginContexts.add(loginContext);
KerberosName kerbName = new KerberosName(spnegoPrincipal);
if (kerbName.getHostName() != null
- && kerbName.getRealm() != null
&& kerbName.getServiceName() != null
&& kerbName.getServiceName().equals("HTTP")) {
- LOG.trace("Map server: {} to principal: {}", kerbName.getHostName(),
+ boolean added = serverPrincipalMap.put(kerbName.getHostName(),
spnegoPrincipal);
- serverPrincipalMap.put(kerbName.getHostName(), spnegoPrincipal);
+ LOG.info("Map server: {} to principal: [{}], added = {}",
+ kerbName.getHostName(), spnegoPrincipal, added);
} else {
- LOG.warn("HTTP principal: {} is invalid for SPNEGO!",
+ LOG.warn("HTTP principal: [{}] is invalid for SPNEGO!",
spnegoPrincipal);
}
}
@@ -419,8 +419,8 @@ public class KerberosAuthenticationHandler implements AuthenticationHandler {
@Override
public AuthenticationToken run() throws Exception {
if (LOG.isTraceEnabled()) {
- LOG.trace("SPNEGO with principals: {}",
- serverPrincipals.toString());
+ LOG.trace("SPNEGO with server principals: {} for {}",
+ serverPrincipals.toString(), serverName);
}
AuthenticationToken token = null;
Exception lastException = null;
@@ -464,7 +464,7 @@ public class KerberosAuthenticationHandler implements AuthenticationHandler {
GSSCredential gssCreds = null;
AuthenticationToken token = null;
try {
- LOG.trace("SPNEGO initiated with principal {}", serverPrincipal);
+ LOG.trace("SPNEGO initiated with server principal [{}]", serverPrincipal);
gssCreds = this.gssManager.createCredential(
this.gssManager.createName(serverPrincipal,
KerberosUtil.getOidInstance("NT_GSS_KRB5_PRINCIPAL")),
@@ -491,7 +491,8 @@ public class KerberosAuthenticationHandler implements AuthenticationHandler {
String userName = kerberosName.getShortName();
token = new AuthenticationToken(userName, clientPrincipal, getType());
response.setStatus(HttpServletResponse.SC_OK);
- LOG.trace("SPNEGO completed for principal [{}]", clientPrincipal);
+ LOG.trace("SPNEGO completed for client principal [{}]",
+ clientPrincipal);
}
} finally {
if (gssContext != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5e0bd30/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/KerberosName.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/KerberosName.java b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/KerberosName.java
index 0b668f1..6d15b6b 100644
--- a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/KerberosName.java
+++ b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/KerberosName.java
@@ -54,7 +54,7 @@ public class KerberosName {
* A pattern that matches a Kerberos name with at most 2 components.
*/
private static final Pattern nameParser =
- Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)");
+ Pattern.compile("([^/@]+)(/([^/@]+))?(@([^/@]+))?");
/**
* A pattern that matches a string with out '$' and then a single
@@ -109,7 +109,7 @@ public class KerberosName {
} else {
serviceName = match.group(1);
hostName = match.group(3);
- realm = match.group(4);
+ realm = match.group(5);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5e0bd30/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestKerberosName.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestKerberosName.java b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestKerberosName.java
index f85b3e1..a375bc9 100644
--- a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestKerberosName.java
+++ b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestKerberosName.java
@@ -83,6 +83,28 @@ public class TestKerberosName {
}
@Test
+ public void testParsing() throws Exception {
+ final String principalNameFull = "HTTP/abc.com@EXAMPLE.COM";
+ final String principalNameWoRealm = "HTTP/abc.com";
+ final String principalNameWoHost = "HTTP@EXAMPLE.COM";
+
+ final KerberosName kerbNameFull = new KerberosName(principalNameFull);
+ Assert.assertEquals("HTTP", kerbNameFull.getServiceName());
+ Assert.assertEquals("abc.com", kerbNameFull.getHostName());
+ Assert.assertEquals("EXAMPLE.COM", kerbNameFull.getRealm());
+
+ final KerberosName kerbNamewoRealm = new KerberosName(principalNameWoRealm);
+ Assert.assertEquals("HTTP", kerbNamewoRealm.getServiceName());
+ Assert.assertEquals("abc.com", kerbNamewoRealm.getHostName());
+ Assert.assertEquals(null, kerbNamewoRealm.getRealm());
+
+ final KerberosName kerbNameWoHost = new KerberosName(principalNameWoHost);
+ Assert.assertEquals("HTTP", kerbNameWoHost.getServiceName());
+ Assert.assertEquals(null, kerbNameWoHost.getHostName());
+ Assert.assertEquals("EXAMPLE.COM", kerbNameWoHost.getRealm());
+ }
+
+ @Test
public void testToLowerCase() throws Exception {
String rules =
"RULE:[1:$1]/L\n" +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f5e0bd30/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/web/TestWebDelegationToken.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/web/TestWebDelegationToken.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/web/TestWebDelegationToken.java
index 89f15da..7319e4c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/web/TestWebDelegationToken.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/web/TestWebDelegationToken.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHa
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
@@ -197,6 +199,8 @@ public class TestWebDelegationToken {
UserGroupInformation.setConfiguration(conf);
jetty = createJettyServer();
+ GenericTestUtils.setLogLevel(KerberosAuthenticationHandler.LOG,
+ Level.TRACE);
}
@After
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/9] hadoop git commit: HDFS-11164: Mover should avoid unnecessary
retries if the block is pinned. Contributed by Rakesh R
Posted by xg...@apache.org.
HDFS-11164: Mover should avoid unnecessary retries if the block is pinned. Contributed by Rakesh R
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e24a923d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e24a923d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e24a923d
Branch: refs/heads/YARN-5734
Commit: e24a923db50879f7dbe5d2afac0e6757089fb07d
Parents: 9947aeb
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Tue Dec 13 17:09:58 2016 -0800
Committer: Uma Maheswara Rao G <um...@intel.com>
Committed: Tue Dec 13 17:09:58 2016 -0800
----------------------------------------------------------------------
.../datatransfer/BlockPinningException.java | 33 ++++
.../datatransfer/DataTransferProtoUtil.java | 17 +-
.../src/main/proto/datatransfer.proto | 1 +
.../hadoop/hdfs/server/balancer/Dispatcher.java | 62 ++++++-
.../hdfs/server/datanode/DataXceiver.java | 8 +-
.../apache/hadoop/hdfs/server/mover/Mover.java | 26 ++-
.../hdfs/server/datanode/DataNodeTestUtils.java | 28 ++++
.../server/datanode/TestBlockReplacement.java | 70 +++++++-
.../hadoop/hdfs/server/mover/TestMover.java | 163 ++++++++++++++++++-
9 files changed, 395 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockPinningException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockPinningException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockPinningException.java
new file mode 100644
index 0000000..c2f12f9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockPinningException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.IOException;
+
+/**
+ * Indicates a failure due to block pinning.
+ */
+public class BlockPinningException extends IOException {
+
+ // Required by {@link java.io.Serializable}.
+ private static final long serialVersionUID = 1L;
+
+ public BlockPinningException(String errMsg) {
+ super(errMsg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
index 6801149..287928c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
@@ -24,11 +24,11 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -107,6 +107,11 @@ public abstract class DataTransferProtoUtil {
public static void checkBlockOpStatus(
BlockOpResponseProto response,
String logInfo) throws IOException {
+ checkBlockOpStatus(response, logInfo, false);
+ }
+
+ public static void checkBlockOpStatus(BlockOpResponseProto response,
+ String logInfo, boolean checkBlockPinningErr) throws IOException {
if (response.getStatus() != Status.SUCCESS) {
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
@@ -114,6 +119,14 @@ public abstract class DataTransferProtoUtil {
+ ", status message " + response.getMessage()
+ ", " + logInfo
);
+ } else if (checkBlockPinningErr
+ && response.getStatus() == Status.ERROR_BLOCK_PINNED) {
+ throw new BlockPinningException(
+ "Got error"
+ + ", status=" + response.getStatus().name()
+ + ", status message " + response.getMessage()
+ + ", " + logInfo
+ );
} else {
throw new IOException(
"Got error"
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index 290b158..889361a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -243,6 +243,7 @@ enum Status {
OOB_RESERVED2 = 10; // Reserved
OOB_RESERVED3 = 11; // Reserved
IN_PROGRESS = 12;
+ ERROR_BLOCK_PINNED = 13;
}
enum ShortCircuitFdResponse {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index eb3ed87..0e62da2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -36,6 +36,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -55,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -224,6 +226,10 @@ public class Dispatcher {
this.target = target;
}
+ public DatanodeInfo getSource() {
+ return source.getDatanodeInfo();
+ }
+
@Override
public String toString() {
final Block b = reportedBlock != null ? reportedBlock.getBlock() : null;
@@ -367,6 +373,15 @@ public class Dispatcher {
} catch (IOException e) {
LOG.warn("Failed to move " + this, e);
target.getDDatanode().setHasFailure();
+ // Check that the failure is due to block pinning errors.
+ if (e instanceof BlockPinningException) {
+ // Pinned block can't be moved. Add this block into failure list.
+ // Later in the next iteration mover will exclude these blocks from
+ // pending moves.
+ target.getDDatanode().addBlockPinningFailures(this);
+ return;
+ }
+
// Proxy or target may have some issues, delay before using these nodes
// further in order to avoid a potential storm of "threads quota
// exceeded" warnings when the dispatcher gets out of sync with work
@@ -419,7 +434,7 @@ public class Dispatcher {
}
}
String logInfo = "reportedBlock move is failed";
- DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
+ DataTransferProtoUtil.checkBlockOpStatus(response, logInfo, true);
}
/** reset the object */
@@ -600,6 +615,7 @@ public class Dispatcher {
/** blocks being moved but not confirmed yet */
private final List<PendingMove> pendings;
private volatile boolean hasFailure = false;
+ private Map<Long, Set<DatanodeInfo>> blockPinningFailures = new HashMap<>();
private volatile boolean hasSuccess = false;
private ExecutorService moveExecutor;
@@ -685,6 +701,22 @@ public class Dispatcher {
this.hasFailure = true;
}
+ void addBlockPinningFailures(PendingMove pendingBlock) {
+ synchronized (blockPinningFailures) {
+ long blockId = pendingBlock.reportedBlock.getBlock().getBlockId();
+ Set<DatanodeInfo> pinnedLocations = blockPinningFailures.get(blockId);
+ if (pinnedLocations == null) {
+ pinnedLocations = new HashSet<>();
+ blockPinningFailures.put(blockId, pinnedLocations);
+ }
+ pinnedLocations.add(pendingBlock.getSource());
+ }
+ }
+
+ Map<Long, Set<DatanodeInfo>> getBlockPinningFailureList() {
+ return blockPinningFailures;
+ }
+
void setHasSuccess() {
this.hasSuccess = true;
}
@@ -1155,6 +1187,34 @@ public class Dispatcher {
}
/**
+ * Check any of the block movements are failed due to block pinning errors. If
+ * yes, add the failed blockId and its respective source node location to the
+ * excluded list.
+ */
+ public static void checkForBlockPinningFailures(
+ Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks,
+ Iterable<? extends StorageGroup> targets) {
+ for (StorageGroup t : targets) {
+ Map<Long, Set<DatanodeInfo>> blockPinningFailureList = t.getDDatanode()
+ .getBlockPinningFailureList();
+ Set<Entry<Long, Set<DatanodeInfo>>> entrySet = blockPinningFailureList
+ .entrySet();
+ for (Entry<Long, Set<DatanodeInfo>> entry : entrySet) {
+ Long blockId = entry.getKey();
+ Set<DatanodeInfo> locs = excludedPinnedBlocks.get(blockId);
+ if (locs == null) {
+ // blockId doesn't exists in the excluded list.
+ locs = entry.getValue();
+ excludedPinnedBlocks.put(blockId, locs);
+ } else {
+ // blockId already exists in the excluded list, add the pinned node.
+ locs.addAll(entry.getValue());
+ }
+ }
+ }
+ }
+
+ /**
* @return true if some moves are success.
*/
public static boolean checkForSuccess(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index fee16b3..a35a5b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
@@ -1022,7 +1023,7 @@ class DataXceiver extends Receiver implements Runnable {
String msg = "Not able to copy block " + block.getBlockId() + " " +
"to " + peer.getRemoteAddressString() + " because it's pinned ";
LOG.info(msg);
- sendResponse(ERROR, msg);
+ sendResponse(Status.ERROR_BLOCK_PINNED, msg);
return;
}
@@ -1156,7 +1157,7 @@ class DataXceiver extends Receiver implements Runnable {
String logInfo = "copy block " + block + " from "
+ proxySock.getRemoteSocketAddress();
- DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo);
+ DataTransferProtoUtil.checkBlockOpStatus(copyResponse, logInfo, true);
// get checksum info about the block we're copying
ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
@@ -1183,6 +1184,9 @@ class DataXceiver extends Receiver implements Runnable {
}
} catch (IOException ioe) {
opStatus = ERROR;
+ if (ioe instanceof BlockPinningException) {
+ opStatus = Status.ERROR_BLOCK_PINNED;
+ }
errMsg = "opReplaceBlock " + block + " received exception " + ioe;
LOG.info(errMsg);
if (!IoeDuringCopyBlockOperation) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 4ab55d3..bc75f0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.SecurityUtil;
@@ -117,10 +116,12 @@ public class Mover {
private final List<Path> targetPaths;
private final int retryMaxAttempts;
private final AtomicInteger retryCount;
+ private final Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks;
private final BlockStoragePolicy[] blockStoragePolicies;
- Mover(NameNodeConnector nnc, Configuration conf, AtomicInteger retryCount) {
+ Mover(NameNodeConnector nnc, Configuration conf, AtomicInteger retryCount,
+ Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks) {
final long movedWinWidth = conf.getLong(
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY,
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT);
@@ -144,6 +145,7 @@ public class Mover {
this.targetPaths = nnc.getTargetPaths();
this.blockStoragePolicies = new BlockStoragePolicy[1 <<
BlockStoragePolicySuite.ID_BIT_LENGTH];
+ this.excludedPinnedBlocks = excludedPinnedBlocks;
}
void init() throws IOException {
@@ -292,6 +294,8 @@ public class Mover {
// wait for pending move to finish and retry the failed migration
boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets
.values());
+ Dispatcher.checkForBlockPinningFailures(excludedPinnedBlocks,
+ storages.targets.values());
boolean hasSuccess = Dispatcher.checkForSuccess(storages.targets
.values());
if (hasFailed && !hasSuccess) {
@@ -461,6 +465,19 @@ public class Mover {
return true;
}
+ // Check the given block is pinned in the source datanode. A pinned block
+ // can't be moved to a different datanode. So we can skip adding these
+ // blocks to different nodes.
+ long blockId = db.getBlock().getBlockId();
+ if (excludedPinnedBlocks.containsKey(blockId)) {
+ Set<DatanodeInfo> locs = excludedPinnedBlocks.get(blockId);
+ for (DatanodeInfo dn : locs) {
+ if (source.getDatanodeInfo().equals(dn)) {
+ return false;
+ }
+ }
+ }
+
if (dispatcher.getCluster().isNodeGroupAware()) {
if (chooseTarget(db, source, targetTypes, Matcher.SAME_NODE_GROUP)) {
return true;
@@ -614,6 +631,8 @@ public class Mover {
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT,
TimeUnit.SECONDS) * 1000;
AtomicInteger retryCount = new AtomicInteger(0);
+ // TODO: Need to limit the size of the pinned blocks to limit memory usage
+ Map<Long, Set<DatanodeInfo>> excludedPinnedBlocks = new HashMap<>();
LOG.info("namenodes = " + namenodes);
checkKeytabAndInit(conf);
@@ -628,7 +647,8 @@ public class Mover {
Iterator<NameNodeConnector> iter = connectors.iterator();
while (iter.hasNext()) {
NameNodeConnector nnc = iter.next();
- final Mover m = new Mover(nnc, conf, retryCount);
+ final Mover m = new Mover(nnc, conf, retryCount,
+ excludedPinnedBlocks);
final ExitStatus r = m.run();
if (r == ExitStatus.SUCCESS) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
index e2755f9..3501ed3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
@@ -25,10 +25,17 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
/**
* Utility class for accessing package-private DataNode information during tests.
@@ -175,4 +182,25 @@ public class DataNodeTestUtils {
dn.getDirectoryScanner().reconcile();
}
}
+
+ /**
+ * This method is used to mock the data node block pinning API.
+ *
+ * @param dn datanode
+ * @param pinned true if the block is pinned, false otherwise
+ * @throws IOException
+ */
+ public static void mockDatanodeBlkPinning(final DataNode dn,
+ final boolean pinned) throws IOException {
+ final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data;
+ dn.data = Mockito.spy(data);
+
+ doAnswer(new Answer<Object>() {
+ public Object answer(InvocationOnMock invocation) throws IOException {
+ // Bypass the argument to FsDatasetImpl#getPinning to show that
+ // the block is pinned.
+ return pinned;
+ }
+ }).when(dn.data).getPinning(any(ExtendedBlock.class));
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
index 597dc46..f811bd8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
@@ -208,6 +208,67 @@ public class TestBlockReplacement {
}
}
+ /**
+ * Test to verify that the copying of pinned block to a different destination
+ * datanode will throw IOException with error code Status.ERROR_BLOCK_PINNED.
+ *
+ */
+ @Test(timeout = 90000)
+ public void testBlockReplacementWithPinnedBlocks() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+
+ // create only one datanode in the cluster with DISK and ARCHIVE storage
+ // types.
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+ .storageTypes(
+ new StorageType[] {StorageType.DISK, StorageType.ARCHIVE})
+ .build();
+
+ try {
+ cluster.waitActive();
+
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ String fileName = "/testBlockReplacementWithPinnedBlocks/file";
+ final Path file = new Path(fileName);
+ DFSTestUtil.createFile(dfs, file, 1024, (short) 1, 1024);
+
+ LocatedBlock lb = dfs.getClient().getLocatedBlocks(fileName, 0).get(0);
+ DatanodeInfo[] oldNodes = lb.getLocations();
+ assertEquals("Wrong block locations", oldNodes.length, 1);
+ DatanodeInfo source = oldNodes[0];
+ ExtendedBlock b = lb.getBlock();
+
+ DatanodeInfo[] datanodes = dfs.getDataNodeStats();
+ DatanodeInfo destin = null;
+ for (DatanodeInfo datanodeInfo : datanodes) {
+ // choose different destination node
+ if (!oldNodes[0].equals(datanodeInfo)) {
+ destin = datanodeInfo;
+ break;
+ }
+ }
+
+ assertNotNull("Failed to choose destination datanode!", destin);
+
+ assertFalse("Source and destin datanode should be different",
+ source.equals(destin));
+
+ // Mock FsDatasetSpi#getPinning to show that the block is pinned.
+ for (int i = 0; i < cluster.getDataNodes().size(); i++) {
+ DataNode dn = cluster.getDataNodes().get(i);
+ LOG.info("Simulate block pinning in datanode " + dn);
+ DataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
+ }
+
+ // Block movement to a different datanode should fail as the block is
+ // pinned.
+ assertTrue("Status code mismatches!", replaceBlock(b, source, source,
+ destin, StorageType.ARCHIVE, Status.ERROR_BLOCK_PINNED));
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
@Test
public void testBlockMoveAcrossStorageInSameNode() throws Exception {
final Configuration conf = new HdfsConfiguration();
@@ -236,7 +297,7 @@ public class TestBlockReplacement {
// move block to ARCHIVE by using same DataNodeInfo for source, proxy and
// destination so that movement happens within datanode
assertTrue(replaceBlock(block, source, source, source,
- StorageType.ARCHIVE));
+ StorageType.ARCHIVE, Status.SUCCESS));
// wait till namenode notified
Thread.sleep(3000);
@@ -311,7 +372,7 @@ public class TestBlockReplacement {
private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
return replaceBlock(block, source, sourceProxy, destination,
- StorageType.DEFAULT);
+ StorageType.DEFAULT, Status.SUCCESS);
}
/*
@@ -322,7 +383,8 @@ public class TestBlockReplacement {
DatanodeInfo source,
DatanodeInfo sourceProxy,
DatanodeInfo destination,
- StorageType targetStorageType) throws IOException, SocketException {
+ StorageType targetStorageType,
+ Status opStatus) throws IOException, SocketException {
Socket sock = new Socket();
try {
sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
@@ -342,7 +404,7 @@ public class TestBlockReplacement {
while (proto.getStatus() == Status.IN_PROGRESS) {
proto = BlockOpResponseProto.parseDelimitedFrom(reply);
}
- return proto.getStatus() == Status.SUCCESS;
+ return proto.getStatus() == opStatus;
} finally {
sock.close();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e24a923d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index 20a6959..d565548 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -37,11 +37,13 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSF
import java.io.File;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -64,6 +66,7 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -72,6 +75,8 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
@@ -121,7 +126,7 @@ public class TestMover {
final List<NameNodeConnector> nncs = NameNodeConnector.newNameNodeConnectors(
nnMap, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
- return new Mover(nncs.get(0), conf, new AtomicInteger(0));
+ return new Mover(nncs.get(0), conf, new AtomicInteger(0), new HashMap<>());
}
@Test
@@ -705,4 +710,160 @@ public class TestMover {
UserGroupInformation.setConfiguration(new Configuration());
}
}
+
+ /**
+ * Test to verify that mover can't move pinned blocks.
+ */
+ @Test(timeout = 90000)
+ public void testMoverWithPinnedBlocks() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+
+ // Sets bigger retry max attempts value so that test case will timed out if
+ // block pinning errors are not handled properly during block movement.
+ conf.setInt(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, 10000);
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(3)
+ .build();
+ try {
+ cluster.waitActive();
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ final String file = "/testMoverWithPinnedBlocks/file";
+ Path dir = new Path("/testMoverWithPinnedBlocks");
+ dfs.mkdirs(dir);
+
+ // write to DISK
+ dfs.setStoragePolicy(dir, "HOT");
+ final FSDataOutputStream out = dfs.create(new Path(file));
+ byte[] fileData = StripedFileTestUtil
+ .generateBytes(DEFAULT_BLOCK_SIZE * 3);
+ out.write(fileData);
+ out.close();
+
+ // verify before movement
+ LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+ StorageType[] storageTypes = lb.getStorageTypes();
+ for (StorageType storageType : storageTypes) {
+ Assert.assertTrue(StorageType.DISK == storageType);
+ }
+
+ // Adding one SSD based data node to the cluster.
+ StorageType[][] newtypes = new StorageType[][] {{StorageType.SSD}};
+ startAdditionalDNs(conf, 1, newtypes, cluster);
+
+ // Mock FsDatasetSpi#getPinning to show that the block is pinned.
+ for (int i = 0; i < cluster.getDataNodes().size(); i++) {
+ DataNode dn = cluster.getDataNodes().get(i);
+ LOG.info("Simulate block pinning in datanode {}", dn);
+ DataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
+ }
+
+ // move file blocks to ONE_SSD policy
+ dfs.setStoragePolicy(dir, "ONE_SSD");
+ int rc = ToolRunner.run(conf, new Mover.Cli(),
+ new String[] {"-p", dir.toString()});
+
+ int exitcode = ExitStatus.NO_MOVE_BLOCK.getExitCode();
+ Assert.assertEquals("Movement should fail", exitcode, rc);
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test to verify that mover should work well with pinned blocks as well as
+ * failed blocks. Mover should continue retrying the failed blocks only.
+ */
+ @Test(timeout = 90000)
+ public void testMoverFailedRetryWithPinnedBlocks() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+ conf.set(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, "2");
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(2)
+ .storageTypes(
+ new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.DISK, StorageType.ARCHIVE}}).build();
+ try {
+ cluster.waitActive();
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ final String parenDir = "/parent";
+ dfs.mkdirs(new Path(parenDir));
+ final String file1 = "/parent/testMoverFailedRetryWithPinnedBlocks1";
+ // write to DISK
+ final FSDataOutputStream out = dfs.create(new Path(file1), (short) 2);
+ byte[] fileData = StripedFileTestUtil
+ .generateBytes(DEFAULT_BLOCK_SIZE * 2);
+ out.write(fileData);
+ out.close();
+
+ // Adding pinned blocks.
+ createFileWithFavoredDatanodes(conf, cluster, dfs);
+
+ // Delete block file so, block move will fail with FileNotFoundException
+ LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file1, 0);
+ Assert.assertEquals("Wrong block count", 2,
+ locatedBlocks.locatedBlockCount());
+ LocatedBlock lb = locatedBlocks.get(0);
+ cluster.corruptBlockOnDataNodesByDeletingBlockFile(lb.getBlock());
+
+ // move to ARCHIVE
+ dfs.setStoragePolicy(new Path(parenDir), "COLD");
+ int rc = ToolRunner.run(conf, new Mover.Cli(),
+ new String[] {"-p", parenDir.toString()});
+ Assert.assertEquals("Movement should fail after some retry",
+ ExitStatus.NO_MOVE_PROGRESS.getExitCode(), rc);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ private void createFileWithFavoredDatanodes(final Configuration conf,
+ final MiniDFSCluster cluster, final DistributedFileSystem dfs)
+ throws IOException {
+ // Adding two DISK based data node to the cluster.
+ // Also, ensure that blocks are pinned in these new data nodes.
+ StorageType[][] newtypes =
+ new StorageType[][] {{StorageType.DISK}, {StorageType.DISK}};
+ startAdditionalDNs(conf, 2, newtypes, cluster);
+ ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+ InetSocketAddress[] favoredNodes = new InetSocketAddress[2];
+ int j = 0;
+ for (int i = dataNodes.size() - 1; i >= 2; i--) {
+ favoredNodes[j++] = dataNodes.get(i).getXferAddress();
+ }
+ final String file = "/parent/testMoverFailedRetryWithPinnedBlocks2";
+ final FSDataOutputStream out = dfs.create(new Path(file),
+ FsPermission.getDefault(), true, DEFAULT_BLOCK_SIZE, (short) 2,
+ DEFAULT_BLOCK_SIZE, null, favoredNodes);
+ byte[] fileData = StripedFileTestUtil.generateBytes(DEFAULT_BLOCK_SIZE * 2);
+ out.write(fileData);
+ out.close();
+
+ // Mock FsDatasetSpi#getPinning to show that the block is pinned.
+ LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file, 0);
+ Assert.assertEquals("Wrong block count", 2,
+ locatedBlocks.locatedBlockCount());
+ LocatedBlock lb = locatedBlocks.get(0);
+ DatanodeInfo datanodeInfo = lb.getLocations()[0];
+ for (DataNode dn : cluster.getDataNodes()) {
+ if (dn.getDatanodeId().getDatanodeUuid()
+ .equals(datanodeInfo.getDatanodeUuid())) {
+ LOG.info("Simulate block pinning in datanode {}", datanodeInfo);
+ DataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
+ break;
+ }
+ }
+ }
+
+ private void startAdditionalDNs(final Configuration conf,
+ int newNodesRequired, StorageType[][] newTypes,
+ final MiniDFSCluster cluster) throws IOException {
+
+ cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null,
+ null, null, null, false, false, false, null);
+ cluster.triggerHeartbeats();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[5/9] hadoop git commit: HDFS-8411. Add bytes count metrics to
datanode for ECWorker. Contributed by Sammi Chen and Andrew Wang
Posted by xg...@apache.org.
HDFS-8411. Add bytes count metrics to datanode for ECWorker. Contributed by Sammi Chen and Andrew Wang
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1f14f6d0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1f14f6d0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1f14f6d0
Branch: refs/heads/YARN-5734
Commit: 1f14f6d038aecad55a5398c6fa4137c9d2f44729
Parents: ada876c
Author: Kai Zheng <ka...@intel.com>
Authored: Wed Dec 14 14:50:50 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Wed Dec 14 14:50:50 2016 +0800
----------------------------------------------------------------------
.../erasurecode/StripedBlockReader.java | 1 +
.../erasurecode/StripedBlockReconstructor.java | 6 +-
.../erasurecode/StripedBlockWriter.java | 1 +
.../datanode/erasurecode/StripedReader.java | 4 +
.../erasurecode/StripedReconstructor.java | 21 +++
.../datanode/erasurecode/StripedWriter.java | 4 +
.../datanode/metrics/DataNodeMetrics.java | 18 ++-
.../apache/hadoop/hdfs/StripedFileTestUtil.java | 24 +++
.../TestDataNodeErasureCodingMetrics.java | 149 +++++++++----------
9 files changed, 147 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
index a27de9b..0f7c5c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
@@ -187,6 +187,7 @@ class StripedBlockReader {
break;
}
n += nread;
+ stripedReader.getReconstructor().incrBytesRead(nread);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
index a8e9d30..5554d68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
/**
* StripedBlockReconstructor reconstruct one or more missed striped block in
@@ -66,7 +67,10 @@ class StripedBlockReconstructor extends StripedReconstructor
getDatanode().getMetrics().incrECFailedReconstructionTasks();
} finally {
getDatanode().decrementXmitsInProgress();
- getDatanode().getMetrics().incrECReconstructionTasks();
+ final DataNodeMetrics metrics = getDatanode().getMetrics();
+ metrics.incrECReconstructionTasks();
+ metrics.incrECReconstructionBytesRead(getBytesRead());
+ metrics.incrECReconstructionBytesWritten(getBytesWritten());
getStripedReader().close();
stripedWriter.close();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
index 592be45..d999202 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
@@ -196,6 +196,7 @@ class StripedBlockWriter {
packet.writeTo(targetOutputStream);
blockOffset4Target += toWrite;
+ stripedWriter.getReconstructor().incrBytesWritten(toWrite);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
index 238c628..f6f343a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
@@ -435,6 +435,10 @@ class StripedReader {
}
}
+ StripedReconstructor getReconstructor() {
+ return reconstructor;
+ }
+
StripedBlockReader getReader(int i) {
return readers.get(i);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
index 5641c35..68769f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
@@ -41,6 +41,7 @@ import java.util.BitSet;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
/**
* StripedReconstructor reconstruct one or more missed striped block in the
@@ -114,6 +115,10 @@ abstract class StripedReconstructor {
private long maxTargetLength = 0L;
private final BitSet liveBitSet;
+ // metrics
+ private AtomicLong bytesRead = new AtomicLong(0);
+ private AtomicLong bytesWritten = new AtomicLong(0);
+
StripedReconstructor(ErasureCodingWorker worker,
StripedReconstructionInfo stripedReconInfo) {
this.stripedReadPool = worker.getStripedReadPool();
@@ -133,6 +138,22 @@ abstract class StripedReconstructor {
positionInBlock = 0L;
}
+ public void incrBytesRead(long delta) {
+ bytesRead.addAndGet(delta);
+ }
+
+ public void incrBytesWritten(long delta) {
+ bytesWritten.addAndGet(delta);
+ }
+
+ public long getBytesRead() {
+ return bytesRead.get();
+ }
+
+ public long getBytesWritten() {
+ return bytesWritten.get();
+ }
+
/**
* Reconstruct one or more missed striped block in the striped block group,
* the minimum number of live striped blocks should be no less than data
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
index c099bc1..225a7ed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
@@ -280,6 +280,10 @@ class StripedWriter {
return reconstructor.getSocketAddress4Transfer(target);
}
+ StripedReconstructor getReconstructor() {
+ return reconstructor;
+ }
+
boolean hasValidTargets() {
return hasValidTargets;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index 23e15a2..e09a85f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.datanode.metrics;
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
-import static org.apache.hadoop.metrics2.lib.Interns.info;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -135,8 +134,12 @@ public class DataNodeMetrics {
MutableCounterLong ecReconstructionTasks;
@Metric("Count of erasure coding failed reconstruction tasks")
MutableCounterLong ecFailedReconstructionTasks;
- // Nanoseconds spent by decoding tasks.
+ @Metric("Nanoseconds spent by decoding tasks")
MutableCounterLong ecDecodingTimeNanos;
+ @Metric("Bytes read by erasure coding worker")
+ MutableCounterLong ecReconstructionBytesRead;
+ @Metric("Bytes written by erasure coding worker")
+ MutableCounterLong ecReconstructionBytesWritten;
final MetricsRegistry registry = new MetricsRegistry("datanode");
final String name;
@@ -156,9 +159,6 @@ public class DataNodeMetrics {
sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
- ecDecodingTimeNanos = registry.newCounter(
- info("ecDecodingTimeNanos", "Nanoseconds spent by decoding tasks"),
- (long) 0);
for (int i = 0; i < len; i++) {
int interval = intervals[i];
@@ -454,4 +454,12 @@ public class DataNodeMetrics {
public void incrECDecodingTime(long decodingTimeNanos) {
ecDecodingTimeNanos.incr(decodingTimeNanos);
}
+
+ public void incrECReconstructionBytesRead(long bytes) {
+ ecReconstructionBytesRead.incr(bytes);
+ }
+
+ public void incrECReconstructionBytesWritten(long bytes) {
+ ecReconstructionBytesWritten.incr(bytes);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
index 311ba7c..520d0e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -527,6 +527,30 @@ public class StripedFileTestUtil {
}
/**
+ * Wait for the reconstruction to be finished when the file has
+ * corrupted blocks. The function can take care file with any length.
+ */
+ public static void waitForAllReconstructionFinished(Path file,
+ DistributedFileSystem fs, long expectedBlocks) throws Exception {
+ LOG.info("Waiting for reconstruction to be finished for the file:" + file
+ + ", expectedBlocks:" + expectedBlocks);
+ final int attempts = 60;
+ for (int i = 0; i < attempts; i++) {
+ int totalBlocks = 0;
+ LocatedBlocks locatedBlocks = getLocatedBlocks(file, fs);
+ for (LocatedBlock locatedBlock: locatedBlocks.getLocatedBlocks()) {
+ DatanodeInfo[] storageInfos = locatedBlock.getLocations();
+ totalBlocks += storageInfos.length;
+ }
+ if (totalBlocks >= expectedBlocks) {
+ return;
+ }
+ Thread.sleep(1000);
+ }
+ throw new IOException("Time out waiting for EC block reconstruction.");
+ }
+
+ /**
* Get the located blocks of a file.
*/
public static LocatedBlocks getLocatedBlocks(Path file,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
index 1b0526b..64ddbd7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import com.google.common.base.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -28,7 +27,6 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@@ -38,21 +36,16 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-
import java.io.IOException;
-import java.util.Arrays;
/**
* This file tests the erasure coding metrics in DataNode.
@@ -65,8 +58,9 @@ public class TestDataNodeErasureCodingMetrics {
private final int dataBlocks = ecPolicy.getNumDataUnits();
private final int parityBlocks = ecPolicy.getNumParityUnits();
private final int cellSize = ecPolicy.getCellSize();
- private final int blockSize = cellSize;
+ private final int blockSize = cellSize * 2;
private final int groupSize = dataBlocks + parityBlocks;
+ private final int blockGroupSize = blockSize * dataBlocks;
private final int numDNs = groupSize + 1;
private MiniDFSCluster cluster;
@@ -76,7 +70,6 @@ public class TestDataNodeErasureCodingMetrics {
@Before
public void setup() throws IOException {
conf = new Configuration();
-
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
@@ -93,82 +86,86 @@ public class TestDataNodeErasureCodingMetrics {
}
@Test(timeout = 120000)
- public void testEcTasks() throws Exception {
- DataNode workerDn = doTest("/testEcTasks");
- MetricsRecordBuilder rb = getMetrics(workerDn.getMetrics().name());
-
- // Ensure that reconstruction task is finished
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- long taskMetricValue = getLongCounter("EcReconstructionTasks", rb);
- return (taskMetricValue > 0);
- }
- }, 500, 10000);
+ public void testFullBlock() throws Exception {
+ doTest("/testEcMetrics", blockGroupSize, 0);
+
+ Assert.assertEquals("EcReconstructionTasks should be ",
+ 1, getLongMetric("EcReconstructionTasks"));
+ Assert.assertEquals("EcFailedReconstructionTasks should be ",
+ 0, getLongMetric("EcFailedReconstructionTasks"));
+ Assert.assertTrue(getLongMetric("EcDecodingTimeNanos") > 0);
+ Assert.assertEquals("EcReconstructionBytesRead should be ",
+ blockGroupSize, getLongMetric("EcReconstructionBytesRead"));
+ Assert.assertEquals("EcReconstructionBytesWritten should be ",
+ blockSize, getLongMetric("EcReconstructionBytesWritten"));
+ }
- assertCounter("EcReconstructionTasks", (long) 1, rb);
- assertCounter("EcFailedReconstructionTasks", (long) 0, rb);
+ // A partial block, reconstruct the partial block
+ @Test(timeout = 120000)
+ public void testReconstructionBytesPartialGroup1() throws Exception {
+ final int fileLen = blockSize / 10;
+ doTest("/testEcBytes", fileLen, 0);
+
+ Assert.assertEquals("EcReconstructionBytesRead should be ",
+ fileLen, getLongMetric("EcReconstructionBytesRead"));
+ Assert.assertEquals("EcReconstructionBytesWritten should be ",
+ fileLen, getLongMetric("EcReconstructionBytesWritten"));
}
+ // 1 full block + 5 partial block, reconstruct the full block
@Test(timeout = 120000)
- public void testEcCodingTime() throws Exception {
- DataNode workerDn = doTest("/testEcCodingTime");
- MetricsRecordBuilder rb = getMetrics(workerDn.getMetrics().name());
-
- // Ensure that reconstruction task is finished
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- long taskMetricValue = getLongCounter("EcReconstructionTasks", rb);
- return (taskMetricValue > 0);
- }
- }, 500, 10000);
+ public void testReconstructionBytesPartialGroup2() throws Exception {
+ final int fileLen = cellSize * dataBlocks + cellSize + cellSize / 10;
+ doTest("/testEcBytes", fileLen, 0);
+
+ Assert.assertEquals("ecReconstructionBytesRead should be ",
+ cellSize * dataBlocks + cellSize + cellSize / 10,
+ getLongMetric("EcReconstructionBytesRead"));
+ Assert.assertEquals("ecReconstructionBytesWritten should be ",
+ blockSize, getLongMetric("EcReconstructionBytesWritten"));
+ }
- long decodeTime = getLongCounter("ecDecodingTimeNanos", rb);
- Assert.assertTrue(decodeTime > 0);
+ // 1 full block + 5 partial block, reconstruct the partial block
+ @Test(timeout = 120000)
+ public void testReconstructionBytesPartialGroup3() throws Exception {
+ final int fileLen = cellSize * dataBlocks + cellSize + cellSize / 10;
+ doTest("/testEcBytes", fileLen, 1);
+
+ Assert.assertEquals("ecReconstructionBytesRead should be ",
+ cellSize * dataBlocks + (cellSize / 10) * 2 ,
+ getLongMetric("EcReconstructionBytesRead"));
+ Assert.assertEquals("ecReconstructionBytesWritten should be ",
+ cellSize + cellSize / 10,
+ getLongMetric("EcReconstructionBytesWritten"));
}
- private DataNode doTest(String fileName) throws Exception {
+ private long getLongMetric(String metricName) {
+ long metricValue = 0;
+ // Add all reconstruction metric value from all data nodes
+ for (DataNode dn : cluster.getDataNodes()) {
+ MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
+ metricValue += getLongCounter(metricName, rb);
+ }
+ return metricValue;
+ }
+ private void doTest(String fileName, int fileLen,
+ int deadNodeIndex) throws Exception {
+ assertTrue(fileLen > 0);
+ assertTrue(deadNodeIndex >= 0 && deadNodeIndex < numDNs);
Path file = new Path(fileName);
- long fileLen = dataBlocks * blockSize;
- final byte[] data = StripedFileTestUtil.generateBytes((int) fileLen);
+ final byte[] data = StripedFileTestUtil.generateBytes(fileLen);
DFSTestUtil.writeFile(fs, file, data);
StripedFileTestUtil.waitBlockGroupsReported(fs, fileName);
- LocatedBlocks locatedBlocks =
+ final LocatedBlocks locatedBlocks =
StripedFileTestUtil.getLocatedBlocks(file, fs);
- //only one block group
- LocatedStripedBlock lastBlock =
+ final LocatedStripedBlock lastBlock =
(LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
- DataNode workerDn = null;
- DatanodeInfo[] locations = lastBlock.getLocations();
- assertEquals(locations.length, groupSize);
-
- // we have ONE extra datanode in addition to the GROUPSIZE datanodes, here
- // is to find the extra datanode that the reconstruction task will run on,
- // according to the current block placement logic for striped files.
- // This can be improved later to be flexible regardless wherever the task
- // runs.
- for (DataNode dn : cluster.getDataNodes()) {
- boolean appear = false;
- for (DatanodeInfo info : locations) {
- if (dn.getDatanodeUuid().equals(info.getDatanodeUuid())) {
- appear = true;
- break;
- }
- }
- if (!appear) {
- workerDn = dn;
- break;
- }
- }
- // Get a datanode from the block locations.
- LOG.info("Block locations: " + Arrays.asList(locations));
- LOG.info("Erasure coding worker datanode: " + workerDn);
- assertNotNull("Failed to find a worker datanode", workerDn);
+ assertTrue(lastBlock.getLocations().length > deadNodeIndex);
- DataNode toCorruptDn = cluster.getDataNode(locations[0].getIpcPort());
+ final DataNode toCorruptDn = cluster.getDataNode(
+ lastBlock.getLocations()[deadNodeIndex].getIpcPort());
LOG.info("Datanode to be corrupted: " + toCorruptDn);
assertNotNull("Failed to find a datanode to be corrupted", toCorruptDn);
toCorruptDn.shutdown();
@@ -176,12 +173,15 @@ public class TestDataNodeErasureCodingMetrics {
DFSTestUtil.waitForDatanodeState(cluster, toCorruptDn.getDatanodeUuid(),
false, 10000);
- int workCount = getComputedDatanodeWork();
+ final int workCount = getComputedDatanodeWork();
assertTrue("Wrongly computed block reconstruction work", workCount > 0);
cluster.triggerHeartbeats();
- StripedFileTestUtil.waitForReconstructionFinished(file, fs, groupSize);
-
- return workerDn;
+ int totalBlocks = (fileLen / blockGroupSize) * groupSize;
+ final int remainder = fileLen % blockGroupSize;
+ totalBlocks += (remainder == 0) ? 0 :
+ (remainder % blockSize == 0) ? remainder / blockSize + parityBlocks :
+ remainder / blockSize + 1 + parityBlocks;
+ StripedFileTestUtil.waitForAllReconstructionFinished(file, fs, totalBlocks);
}
private int getComputedDatanodeWork()
@@ -209,5 +209,4 @@ public class TestDataNodeErasureCodingMetrics {
BlockManagerTestUtil.checkHeartbeat(
cluster.getNamesystem().getBlockManager());
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[7/9] hadoop git commit: HDFS-10958. Add instrumentation hooks around
Datanode disk IO.
Posted by xg...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index dc63238..d3006c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -45,6 +45,10 @@ abstract public class ReplicaInfo extends Block
/** volume where the replica belongs. */
private FsVolumeSpi volume;
+ /** This is used by some tests and FsDatasetUtil#computeChecksum. */
+ private static final FileIoProvider DEFAULT_FILE_IO_PROVIDER =
+ new FileIoProvider(null);
+
/**
* Constructor
* @param vol volume where replica is located
@@ -64,7 +68,18 @@ abstract public class ReplicaInfo extends Block
public FsVolumeSpi getVolume() {
return volume;
}
-
+
+ /**
+ * Get the {@link FileIoProvider} for disk IO operations.
+ */
+ public FileIoProvider getFileIoProvider() {
+ // In tests and when invoked via FsDatasetUtil#computeChecksum, the
+ // target volume for this replica may be unknown and hence null.
+ // Use the DEFAULT_FILE_IO_PROVIDER with no-op hooks.
+ return (volume != null) ? volume.getFileIoProvider()
+ : DEFAULT_FILE_IO_PROVIDER;
+ }
+
/**
* Set the volume where this replica is located on disk.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index a11a207..4947ecf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
@@ -418,4 +419,6 @@ public interface FsVolumeSpi
*/
class VolumeCheckContext {
}
+
+ FileIoProvider getFileIoProvider();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
index 54d0e96..f40315a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
@@ -24,8 +24,8 @@ import java.io.InputStream;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.slf4j.Logger;
@@ -38,12 +38,15 @@ public class ReplicaInputStreams implements Closeable {
private InputStream dataIn;
private InputStream checksumIn;
private FsVolumeReference volumeRef;
+ private final FileIoProvider fileIoProvider;
private FileDescriptor dataInFd = null;
/** Create an object with a data input stream and a checksum input stream. */
- public ReplicaInputStreams(InputStream dataStream,
- InputStream checksumStream, FsVolumeReference volumeRef) {
+ public ReplicaInputStreams(
+ InputStream dataStream, InputStream checksumStream,
+ FsVolumeReference volumeRef, FileIoProvider fileIoProvider) {
this.volumeRef = volumeRef;
+ this.fileIoProvider = fileIoProvider;
this.dataIn = dataStream;
this.checksumIn = checksumStream;
if (dataIn instanceof FileInputStream) {
@@ -103,7 +106,7 @@ public class ReplicaInputStreams implements Closeable {
public void dropCacheBehindReads(String identifier, long offset, long len,
int flags) throws NativeIOException {
assert this.dataInFd != null : "null dataInFd!";
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+ fileIoProvider.posixFadvise(getVolumeRef().getVolume(),
identifier, dataInFd, offset, len, flags);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
index a66847a..1614ba2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
@@ -24,11 +24,10 @@ import java.io.OutputStream;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
/**
@@ -43,21 +42,22 @@ public class ReplicaOutputStreams implements Closeable {
/** Stream to checksum. */
private final OutputStream checksumOut;
private final DataChecksum checksum;
- private final boolean isTransientStorage;
- private final long slowLogThresholdMs;
+ private final FsVolumeSpi volume;
+ private final FileIoProvider fileIoProvider;
/**
* Create an object with a data output stream, a checksum output stream
* and a checksum.
*/
- public ReplicaOutputStreams(OutputStream dataOut,
- OutputStream checksumOut, DataChecksum checksum,
- boolean isTransientStorage, long slowLogThresholdMs) {
+ public ReplicaOutputStreams(
+ OutputStream dataOut, OutputStream checksumOut, DataChecksum checksum,
+ FsVolumeSpi volume, FileIoProvider fileIoProvider) {
+
this.dataOut = dataOut;
this.checksum = checksum;
- this.slowLogThresholdMs = slowLogThresholdMs;
- this.isTransientStorage = isTransientStorage;
this.checksumOut = checksumOut;
+ this.volume = volume;
+ this.fileIoProvider = fileIoProvider;
try {
if (this.dataOut instanceof FileOutputStream) {
@@ -93,7 +93,7 @@ public class ReplicaOutputStreams implements Closeable {
/** @return is writing to a transient storage? */
public boolean isTransientStorage() {
- return isTransientStorage;
+ return volume.isTransientStorage();
}
@Override
@@ -112,7 +112,7 @@ public class ReplicaOutputStreams implements Closeable {
*/
public void syncDataOut() throws IOException {
if (dataOut instanceof FileOutputStream) {
- sync((FileOutputStream)dataOut);
+ fileIoProvider.sync(volume, (FileOutputStream) dataOut);
}
}
@@ -121,7 +121,7 @@ public class ReplicaOutputStreams implements Closeable {
*/
public void syncChecksumOut() throws IOException {
if (checksumOut instanceof FileOutputStream) {
- sync((FileOutputStream)checksumOut);
+ fileIoProvider.sync(volume, (FileOutputStream) checksumOut);
}
}
@@ -129,60 +129,34 @@ public class ReplicaOutputStreams implements Closeable {
* Flush the data stream if it supports it.
*/
public void flushDataOut() throws IOException {
- flush(dataOut);
+ if (dataOut != null) {
+ fileIoProvider.flush(volume, dataOut);
+ }
}
/**
* Flush the checksum stream if it supports it.
*/
public void flushChecksumOut() throws IOException {
- flush(checksumOut);
- }
-
- private void flush(OutputStream dos) throws IOException {
- long begin = Time.monotonicNow();
- dos.flush();
- long duration = Time.monotonicNow() - begin;
- LOG.trace("ReplicaOutputStreams#flush takes {} ms.", duration);
- if (duration > slowLogThresholdMs) {
- LOG.warn("Slow flush took {} ms (threshold={} ms)", duration,
- slowLogThresholdMs);
+ if (checksumOut != null) {
+ fileIoProvider.flush(volume, checksumOut);
}
}
- private void sync(FileOutputStream fos) throws IOException {
- long begin = Time.monotonicNow();
- fos.getChannel().force(true);
- long duration = Time.monotonicNow() - begin;
- LOG.trace("ReplicaOutputStreams#sync takes {} ms.", duration);
- if (duration > slowLogThresholdMs) {
- LOG.warn("Slow fsync took {} ms (threshold={} ms)", duration,
- slowLogThresholdMs);
- }
- }
-
- public long writeToDisk(byte[] b, int off, int len) throws IOException {
- long begin = Time.monotonicNow();
+ public void writeDataToDisk(byte[] b, int off, int len)
+ throws IOException {
dataOut.write(b, off, len);
- long duration = Time.monotonicNow() - begin;
- LOG.trace("DatanodeIO#writeToDisk takes {} ms.", duration);
- if (duration > slowLogThresholdMs) {
- LOG.warn("Slow BlockReceiver write data to disk cost: {} ms " +
- "(threshold={} ms)", duration, slowLogThresholdMs);
- }
- return duration;
}
public void syncFileRangeIfPossible(long offset, long nbytes,
int flags) throws NativeIOException {
- assert this.outFd != null : "null outFd!";
- NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, nbytes, flags);
+ fileIoProvider.syncFileRange(
+ volume, outFd, offset, nbytes, flags);
}
public void dropCacheBehindWrites(String identifier,
long offset, long len, int flags) throws NativeIOException {
- assert this.outFd != null : "null outFd!";
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
- identifier, outFd, offset, len, flags);
+ fileIoProvider.posixFadvise(
+ volume, identifier, outFd, offset, len, flags);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 63e82f3..8273ebb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -32,13 +32,11 @@ import java.util.Iterator;
import java.util.Scanner;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.GetSpaceUsed;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
@@ -46,10 +44,10 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
-import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
@@ -64,7 +62,6 @@ import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.Timer;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.Files;
/**
* A block pool slice represents a portion of a block pool stored on a volume.
@@ -96,6 +93,7 @@ class BlockPoolSlice {
private final long cachedDfsUsedCheckTime;
private final Timer timer;
private final int maxDataLength;
+ private final FileIoProvider fileIoProvider;
// TODO:FEDERATION scalability issue - a thread per DU is needed
private final GetSpaceUsed dfsUsage;
@@ -113,6 +111,7 @@ class BlockPoolSlice {
Configuration conf, Timer timer) throws IOException {
this.bpid = bpid;
this.volume = volume;
+ this.fileIoProvider = volume.getFileIoProvider();
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
this.finalizedDir = new File(
currentDir, DataStorage.STORAGE_DIR_FINALIZED);
@@ -147,19 +146,14 @@ class BlockPoolSlice {
//
this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
if (tmpDir.exists()) {
- DataStorage.fullyDelete(tmpDir);
+ fileIoProvider.fullyDelete(volume, tmpDir);
}
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
- if (!rbwDir.mkdirs()) { // create rbw directory if not exist
- if (!rbwDir.isDirectory()) {
- throw new IOException("Mkdirs failed to create " + rbwDir.toString());
- }
- }
- if (!tmpDir.mkdirs()) {
- if (!tmpDir.isDirectory()) {
- throw new IOException("Mkdirs failed to create " + tmpDir.toString());
- }
- }
+
+ // create the rbw and tmp directories if they don't exist.
+ fileIoProvider.mkdirs(volume, rbwDir);
+ fileIoProvider.mkdirs(volume, tmpDir);
+
// Use cached value initially if available. Or the following call will
// block until the initial du command completes.
this.dfsUsage = new CachingGetSpaceUsed.Builder().setPath(bpDir)
@@ -266,7 +260,7 @@ class BlockPoolSlice {
*/
void saveDfsUsed() {
File outFile = new File(currentDir, DU_CACHE_FILE);
- if (outFile.exists() && !outFile.delete()) {
+ if (!fileIoProvider.deleteWithExistsCheck(volume, outFile)) {
FsDatasetImpl.LOG.warn("Failed to delete old dfsUsed file in " +
outFile.getParent());
}
@@ -277,7 +271,7 @@ class BlockPoolSlice {
new FileOutputStream(outFile), "UTF-8")) {
// mtime is written last, so that truncated writes won't be valid.
out.write(Long.toString(used) + " " + Long.toString(timer.now()));
- out.flush();
+ fileIoProvider.flush(volume, out);
}
} catch (IOException ioe) {
// If write failed, the volume might be bad. Since the cache file is
@@ -292,7 +286,8 @@ class BlockPoolSlice {
*/
File createTmpFile(Block b) throws IOException {
File f = new File(tmpDir, b.getBlockName());
- File tmpFile = DatanodeUtil.createTmpFile(b, f);
+ File tmpFile = DatanodeUtil.createFileWithExistsCheck(
+ volume, b, f, fileIoProvider);
// If any exception during creation, its expected that counter will not be
// incremented, So no need to decrement
incrNumBlocks();
@@ -305,7 +300,8 @@ class BlockPoolSlice {
*/
File createRbwFile(Block b) throws IOException {
File f = new File(rbwDir, b.getBlockName());
- File rbwFile = DatanodeUtil.createTmpFile(b, f);
+ File rbwFile = DatanodeUtil.createFileWithExistsCheck(
+ volume, b, f, fileIoProvider);
// If any exception during creation, its expected that counter will not be
// incremented, So no need to decrement
incrNumBlocks();
@@ -314,11 +310,7 @@ class BlockPoolSlice {
File addFinalizedBlock(Block b, ReplicaInfo replicaInfo) throws IOException {
File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
- if (!blockDir.exists()) {
- if (!blockDir.mkdirs()) {
- throw new IOException("Failed to mkdirs " + blockDir);
- }
- }
+ fileIoProvider.mkdirsWithExistsCheck(volume, blockDir);
File blockFile = FsDatasetImpl.moveBlockFiles(b, replicaInfo, blockDir);
File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
if (dfsUsage instanceof CachingGetSpaceUsed) {
@@ -340,9 +332,9 @@ class BlockPoolSlice {
final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId);
final File targetBlockFile = new File(blockDir, blockFile.getName());
final File targetMetaFile = new File(blockDir, metaFile.getName());
- FileUtils.moveFile(blockFile, targetBlockFile);
+ fileIoProvider.moveFile(volume, blockFile, targetBlockFile);
FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile);
- FileUtils.moveFile(metaFile, targetMetaFile);
+ fileIoProvider.moveFile(volume, metaFile, targetMetaFile);
FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile);
ReplicaInfo newReplicaInfo =
@@ -394,16 +386,13 @@ class BlockPoolSlice {
File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp);
if (blockFile.exists()) {
// If the original block file still exists, then no recovery is needed.
- if (!unlinkedTmp.delete()) {
+ if (!fileIoProvider.delete(volume, unlinkedTmp)) {
throw new IOException("Unable to cleanup unlinked tmp file " +
unlinkedTmp);
}
return null;
} else {
- if (!unlinkedTmp.renameTo(blockFile)) {
- throw new IOException("Unable to rename unlinked tmp file " +
- unlinkedTmp);
- }
+ fileIoProvider.rename(volume, unlinkedTmp, blockFile);
return blockFile;
}
}
@@ -416,7 +405,7 @@ class BlockPoolSlice {
*/
private int moveLazyPersistReplicasToFinalized(File source)
throws IOException {
- File files[] = FileUtil.listFiles(source);
+ File[] files = fileIoProvider.listFiles(volume, source);
int numRecovered = 0;
for (File file : files) {
if (file.isDirectory()) {
@@ -431,24 +420,25 @@ class BlockPoolSlice {
if (blockFile.exists()) {
- if (!targetDir.exists() && !targetDir.mkdirs()) {
+ try {
+ fileIoProvider.mkdirsWithExistsCheck(volume, targetDir);
+ } catch(IOException ioe) {
LOG.warn("Failed to mkdirs " + targetDir);
continue;
}
final File targetMetaFile = new File(targetDir, metaFile.getName());
try {
- LocalReplica.rename(metaFile, targetMetaFile);
+ fileIoProvider.rename(volume, metaFile, targetMetaFile);
} catch (IOException e) {
LOG.warn("Failed to move meta file from "
+ metaFile + " to " + targetMetaFile, e);
continue;
-
}
final File targetBlockFile = new File(targetDir, blockFile.getName());
try {
- LocalReplica.rename(blockFile, targetBlockFile);
+ fileIoProvider.rename(volume, blockFile, targetBlockFile);
} catch (IOException e) {
LOG.warn("Failed to move block file from "
+ blockFile + " to " + targetBlockFile, e);
@@ -465,7 +455,7 @@ class BlockPoolSlice {
}
}
- FileUtil.fullyDelete(source);
+ fileIoProvider.fullyDelete(volume, source);
return numRecovered;
}
@@ -508,7 +498,7 @@ class BlockPoolSlice {
loadRwr = false;
}
sc.close();
- if (!restartMeta.delete()) {
+ if (!fileIoProvider.delete(volume, restartMeta)) {
FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
restartMeta.getPath());
}
@@ -568,7 +558,7 @@ class BlockPoolSlice {
final RamDiskReplicaTracker lazyWriteReplicaMap,
boolean isFinalized)
throws IOException {
- File files[] = FileUtil.listFiles(dir);
+ File[] files = fileIoProvider.listFiles(volume, dir);
for (File file : files) {
if (file.isDirectory()) {
addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized);
@@ -581,8 +571,9 @@ class BlockPoolSlice {
continue;
}
}
- if (!Block.isBlockFilename(file))
+ if (!Block.isBlockFilename(file)) {
continue;
+ }
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
files, file);
@@ -700,7 +691,8 @@ class BlockPoolSlice {
return 0;
}
try (DataInputStream checksumIn = new DataInputStream(
- new BufferedInputStream(new FileInputStream(metaFile),
+ new BufferedInputStream(
+ fileIoProvider.getFileInputStream(volume, metaFile),
ioFileBufferSize))) {
// read and handle the common header here. For now just a version
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
@@ -713,9 +705,10 @@ class BlockPoolSlice {
if (numChunks == 0) {
return 0;
}
- try (InputStream blockIn = new FileInputStream(blockFile);
+ try (InputStream blockIn = fileIoProvider.getFileInputStream(
+ volume, blockFile);
ReplicaInputStreams ris = new ReplicaInputStreams(blockIn,
- checksumIn, volume.obtainReference())) {
+ checksumIn, volume.obtainReference(), fileIoProvider)) {
ris.skipChecksumFully((numChunks - 1) * checksumSize);
long lastChunkStartPos = (numChunks - 1) * bytesPerChecksum;
ris.skipDataFully(lastChunkStartPos);
@@ -734,7 +727,8 @@ class BlockPoolSlice {
// truncate if extra bytes are present without CRC
if (blockFile.length() > validFileLength) {
try (RandomAccessFile blockRAF =
- new RandomAccessFile(blockFile, "rw")) {
+ fileIoProvider.getRandomAccessFile(
+ volume, blockFile, "rw")) {
// truncate blockFile
blockRAF.setLength(validFileLength);
}
@@ -786,12 +780,14 @@ class BlockPoolSlice {
}
FileInputStream inputStream = null;
try {
- inputStream = new FileInputStream(replicaFile);
+ inputStream = fileIoProvider.getFileInputStream(volume, replicaFile);
BlockListAsLongs blocksList =
BlockListAsLongs.readFrom(inputStream, maxDataLength);
- Iterator<BlockReportReplica> iterator = blocksList.iterator();
- while (iterator.hasNext()) {
- BlockReportReplica replica = iterator.next();
+ if (blocksList == null) {
+ return false;
+ }
+
+ for (BlockReportReplica replica : blocksList) {
switch (replica.getState()) {
case FINALIZED:
addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, true);
@@ -828,7 +824,7 @@ class BlockPoolSlice {
return false;
}
finally {
- if (!replicaFile.delete()) {
+ if (!fileIoProvider.delete(volume, replicaFile)) {
LOG.info("Failed to delete replica cache file: " +
replicaFile.getPath());
}
@@ -842,41 +838,29 @@ class BlockPoolSlice {
blocksListToPersist.getNumberOfBlocks()== 0) {
return;
}
- File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp");
- if (tmpFile.exists() && !tmpFile.delete()) {
- LOG.warn("Failed to delete tmp replicas file in " +
- tmpFile.getPath());
- return;
- }
- File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE);
- if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
- LOG.warn("Failed to delete replicas file in " +
- replicaCacheFile.getPath());
+ final File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp");
+ final File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE);
+ if (!fileIoProvider.deleteWithExistsCheck(volume, tmpFile) ||
+ !fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile)) {
return;
}
FileOutputStream out = null;
try {
- out = new FileOutputStream(tmpFile);
+ out = fileIoProvider.getFileOutputStream(volume, tmpFile);
blocksListToPersist.writeTo(out);
out.close();
// Renaming the tmp file to replicas
- Files.move(tmpFile, replicaCacheFile);
+ fileIoProvider.moveFile(volume, tmpFile, replicaCacheFile);
} catch (Exception e) {
// If write failed, the volume might be bad. Since the cache file is
// not critical, log the error, delete both the files (tmp and cache)
// and continue.
LOG.warn("Failed to write replicas to cache ", e);
- if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
- LOG.warn("Failed to delete replicas file: " +
- replicaCacheFile.getPath());
- }
+ fileIoProvider.deleteWithExistsCheck(volume, replicaCacheFile);
} finally {
IOUtils.closeStream(out);
- if (tmpFile.exists() && !tmpFile.delete()) {
- LOG.warn("Failed to delete tmp file in " +
- tmpFile.getPath());
- }
+ fileIoProvider.deleteWithExistsCheck(volume, tmpFile);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index 97dcf8d..416609d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -272,8 +272,10 @@ class FsDatasetAsyncDiskService {
}
File trashDirFile = new File(trashDirectory);
- if (!trashDirFile.exists() && !trashDirFile.mkdirs()) {
- LOG.error("Failed to create trash directory " + trashDirectory);
+ try {
+ volume.getFileIoProvider().mkdirsWithExistsCheck(
+ volume, trashDirFile);
+ } catch (IOException e) {
return false;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 6065df2..35561cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -21,6 +21,7 @@ import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -57,6 +58,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -418,6 +420,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
.setDataset(this)
.setStorageID(sd.getStorageUuid())
.setStorageDirectory(sd)
+ .setFileIoProvider(datanode.getFileIoProvider())
.setConf(this.conf)
.build();
FsVolumeReference ref = fsVolume.obtainReference();
@@ -437,6 +440,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
.setDataset(this)
.setStorageID(storageUuid)
.setStorageDirectory(sd)
+ .setFileIoProvider(datanode.getFileIoProvider())
.setConf(conf)
.build();
}
@@ -818,7 +822,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
InputStream blockInStream = info.getDataInputStream(blkOffset);
try {
InputStream metaInStream = info.getMetadataInputStream(metaOffset);
- return new ReplicaInputStreams(blockInStream, metaInStream, ref);
+ return new ReplicaInputStreams(
+ blockInStream, metaInStream, ref, datanode.getFileIoProvider());
} catch (IOException e) {
IOUtils.cleanup(null, blockInStream);
throw e;
@@ -1027,9 +1032,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
static void computeChecksum(ReplicaInfo srcReplica, File dstMeta,
int smallBufferSize, final Configuration conf)
throws IOException {
- File srcMeta = new File(srcReplica.getMetadataURI());
- final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta,
- DFSUtilClient.getIoFileBufferSize(conf));
+ final File srcMeta = new File(srcReplica.getMetadataURI());
+
+ DataChecksum checksum;
+ try (FileInputStream fis =
+ srcReplica.getFileIoProvider().getFileInputStream(
+ srcReplica.getVolume(), srcMeta)) {
+ checksum = BlockMetadataHeader.readDataChecksum(
+ fis, DFSUtilClient.getIoFileBufferSize(conf), srcMeta);
+ }
+
final byte[] data = new byte[1 << 16];
final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
@@ -2161,16 +2173,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return;
}
- final long diskGS = diskMetaFile != null && diskMetaFile.exists() ?
+ final FileIoProvider fileIoProvider = datanode.getFileIoProvider();
+ final boolean diskMetaFileExists = diskMetaFile != null &&
+ fileIoProvider.exists(vol, diskMetaFile);
+ final boolean diskFileExists = diskFile != null &&
+ fileIoProvider.exists(vol, diskFile);
+
+ final long diskGS = diskMetaFileExists ?
Block.getGenerationStamp(diskMetaFile.getName()) :
- HdfsConstants.GRANDFATHER_GENERATION_STAMP;
+ HdfsConstants.GRANDFATHER_GENERATION_STAMP;
- if (diskFile == null || !diskFile.exists()) {
+ if (!diskFileExists) {
if (memBlockInfo == null) {
// Block file does not exist and block does not exist in memory
// If metadata file exists then delete it
- if (diskMetaFile != null && diskMetaFile.exists()
- && diskMetaFile.delete()) {
+ if (diskMetaFileExists && fileIoProvider.delete(vol, diskMetaFile)) {
LOG.warn("Deleted a metadata file without a block "
+ diskMetaFile.getAbsolutePath());
}
@@ -2186,8 +2203,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.warn("Removed block " + blockId
+ " from memory with missing block file on the disk");
// Finally remove the metadata file
- if (diskMetaFile != null && diskMetaFile.exists()
- && diskMetaFile.delete()) {
+ if (diskMetaFileExists && fileIoProvider.delete(vol, diskMetaFile)) {
LOG.warn("Deleted a metadata file for the deleted block "
+ diskMetaFile.getAbsolutePath());
}
@@ -2223,7 +2239,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Compare block files
if (memBlockInfo.blockDataExists()) {
if (memBlockInfo.getBlockURI().compareTo(diskFile.toURI()) != 0) {
- if (diskMetaFile.exists()) {
+ if (diskMetaFileExists) {
if (memBlockInfo.metadataExists()) {
// We have two sets of block+meta files. Decide which one to
// keep.
@@ -2239,7 +2255,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
memBlockInfo, diskBlockInfo, volumeMap);
}
} else {
- if (!diskFile.delete()) {
+ if (!fileIoProvider.delete(vol, diskFile)) {
LOG.warn("Failed to delete " + diskFile);
}
}
@@ -2278,8 +2294,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// as the block file, then use the generation stamp from it
try {
File memFile = new File(memBlockInfo.getBlockURI());
- long gs = diskMetaFile != null && diskMetaFile.exists()
- && diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
+ long gs = diskMetaFileExists &&
+ diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
: HdfsConstants.GRANDFATHER_GENERATION_STAMP;
LOG.warn("Updating generation stamp for block " + blockId
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
index 563f66a..32759c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
+import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -80,7 +81,7 @@ public class FsDatasetUtil {
return matches[0];
}
- public static FileInputStream openAndSeek(File file, long offset)
+ public static FileDescriptor openAndSeek(File file, long offset)
throws IOException {
RandomAccessFile raf = null;
try {
@@ -88,7 +89,7 @@ public class FsDatasetUtil {
if (offset > 0) {
raf.seek(offset);
}
- return new FileInputStream(raf.getFD());
+ return raf.getFD();
} catch(IOException ioe) {
IOUtils.cleanup(null, raf);
throw ioe;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index c317715..74ee063 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -19,14 +19,12 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
-import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
@@ -46,8 +44,8 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -75,7 +73,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.StringUtils;
@@ -132,6 +129,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
// limit the visible capacity for tests. If negative, then we just
// query from the filesystem.
protected volatile long configuredCapacity;
+ private final FileIoProvider fileIoProvider;
/**
* Per-volume worker pool that processes new blocks to cache.
@@ -141,8 +139,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
*/
protected ThreadPoolExecutor cacheExecutor;
- FsVolumeImpl(FsDatasetImpl dataset, String storageID, StorageDirectory sd,
- Configuration conf) throws IOException {
+ FsVolumeImpl(
+ FsDatasetImpl dataset, String storageID, StorageDirectory sd,
+ FileIoProvider fileIoProvider, Configuration conf) throws IOException {
if (sd.getStorageLocation() == null) {
throw new IOException("StorageLocation specified for storage directory " +
@@ -162,6 +161,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT));
this.configuredCapacity = -1;
this.conf = conf;
+ this.fileIoProvider = fileIoProvider;
cacheExecutor = initializeCacheExecutor(parent);
}
@@ -664,8 +664,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
*/
private String getNextSubDir(String prev, File dir)
throws IOException {
- List<String> children =
- IOUtils.listDirectory(dir, SubdirFilter.INSTANCE);
+ List<String> children = fileIoProvider.listDirectory(
+ FsVolumeImpl.this, dir, SubdirFilter.INSTANCE);
cache = null;
cacheMs = 0;
if (children.size() == 0) {
@@ -718,8 +718,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
File dir = Paths.get(bpidDir.getAbsolutePath(), "current", "finalized",
state.curFinalizedDir, state.curFinalizedSubDir).toFile();
- List<String> entries =
- IOUtils.listDirectory(dir, BlockFileFilter.INSTANCE);
+ List<String> entries = fileIoProvider.listDirectory(
+ FsVolumeImpl.this, dir, BlockFileFilter.INSTANCE);
if (entries.size() == 0) {
entries = null;
} else {
@@ -839,19 +839,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
public void save() throws IOException {
state.lastSavedMs = Time.now();
boolean success = false;
- try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
- new FileOutputStream(getTempSaveFile(), false), "UTF-8"))) {
+ try (BufferedWriter writer = new BufferedWriter(
+ new OutputStreamWriter(fileIoProvider.getFileOutputStream(
+ FsVolumeImpl.this, getTempSaveFile()), "UTF-8"))) {
WRITER.writeValue(writer, state);
success = true;
} finally {
if (!success) {
- if (getTempSaveFile().delete()) {
- LOG.debug("save({}, {}): error deleting temporary file.",
- storageID, bpid);
- }
+ fileIoProvider.delete(FsVolumeImpl.this, getTempSaveFile());
}
}
- Files.move(getTempSaveFile().toPath(), getSaveFile().toPath(),
+ fileIoProvider.move(FsVolumeImpl.this,
+ getTempSaveFile().toPath(), getSaveFile().toPath(),
StandardCopyOption.ATOMIC_MOVE);
if (LOG.isTraceEnabled()) {
LOG.trace("save({}, {}): saved {}", storageID, bpid,
@@ -1042,11 +1041,12 @@ public class FsVolumeImpl implements FsVolumeSpi {
File finalizedDir = new File(bpCurrentDir,
DataStorage.STORAGE_DIR_FINALIZED);
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
- if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive(
- finalizedDir)) {
+ if (fileIoProvider.exists(this, finalizedDir) &&
+ !DatanodeUtil.dirNoFilesRecursive(this, finalizedDir, fileIoProvider)) {
return false;
}
- if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
+ if (fileIoProvider.exists(this, rbwDir) &&
+ fileIoProvider.list(this, rbwDir).length != 0) {
return false;
}
return true;
@@ -1067,35 +1067,38 @@ public class FsVolumeImpl implements FsVolumeSpi {
DataStorage.STORAGE_DIR_LAZY_PERSIST);
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
if (force) {
- DataStorage.fullyDelete(bpDir);
+ fileIoProvider.fullyDelete(this, bpDir);
} else {
- if (!rbwDir.delete()) {
+ if (!fileIoProvider.delete(this, rbwDir)) {
throw new IOException("Failed to delete " + rbwDir);
}
- if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) ||
- !FileUtil.fullyDelete(finalizedDir)) {
+ if (!DatanodeUtil.dirNoFilesRecursive(
+ this, finalizedDir, fileIoProvider) ||
+ !fileIoProvider.fullyDelete(
+ this, finalizedDir)) {
throw new IOException("Failed to delete " + finalizedDir);
}
if (lazypersistDir.exists() &&
- ((!DatanodeUtil.dirNoFilesRecursive(lazypersistDir) ||
- !FileUtil.fullyDelete(lazypersistDir)))) {
+ ((!DatanodeUtil.dirNoFilesRecursive(
+ this, lazypersistDir, fileIoProvider) ||
+ !fileIoProvider.fullyDelete(this, lazypersistDir)))) {
throw new IOException("Failed to delete " + lazypersistDir);
}
- DataStorage.fullyDelete(tmpDir);
- for (File f : FileUtil.listFiles(bpCurrentDir)) {
- if (!f.delete()) {
+ fileIoProvider.fullyDelete(this, tmpDir);
+ for (File f : fileIoProvider.listFiles(this, bpCurrentDir)) {
+ if (!fileIoProvider.delete(this, f)) {
throw new IOException("Failed to delete " + f);
}
}
- if (!bpCurrentDir.delete()) {
+ if (!fileIoProvider.delete(this, bpCurrentDir)) {
throw new IOException("Failed to delete " + bpCurrentDir);
}
- for (File f : FileUtil.listFiles(bpDir)) {
- if (!f.delete()) {
+ for (File f : fileIoProvider.listFiles(this, bpDir)) {
+ if (!fileIoProvider.delete(this, f)) {
throw new IOException("Failed to delete " + f);
}
}
- if (!bpDir.delete()) {
+ if (!fileIoProvider.delete(this, bpDir)) {
throw new IOException("Failed to delete " + bpDir);
}
}
@@ -1118,7 +1121,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
private byte[] loadLastPartialChunkChecksum(
File blockFile, File metaFile) throws IOException {
- DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
+ // readHeader closes the temporary FileInputStream.
+ DataChecksum dcs = BlockMetadataHeader
+ .readHeader(fileIoProvider.getFileInputStream(this, metaFile))
+ .getChecksum();
final int checksumSize = dcs.getChecksumSize();
final long onDiskLen = blockFile.length();
final int bytesPerChecksum = dcs.getBytesPerChecksum();
@@ -1132,7 +1138,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
int offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
(int)(onDiskLen / bytesPerChecksum * checksumSize);
byte[] lastChecksum = new byte[checksumSize];
- try (RandomAccessFile raf = new RandomAccessFile(metaFile, "r")) {
+ try (RandomAccessFile raf = fileIoProvider.getRandomAccessFile(
+ this, metaFile, "r")) {
raf.seek(offsetInChecksum);
raf.read(lastChecksum, 0, checksumSize);
}
@@ -1246,8 +1253,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId);
File blockFile = copiedReplicaFiles[1];
File metaFile = copiedReplicaFiles[0];
- LocalReplica.truncateBlock(blockFile, metaFile,
- rur.getNumBytes(), newlength);
+ LocalReplica.truncateBlock(rur.getVolume(), blockFile, metaFile,
+ rur.getNumBytes(), newlength, fileIoProvider);
LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW)
.setBlockId(newBlockId)
@@ -1283,6 +1290,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
getFinalizedDir(bpid), report, reportCompiler);
}
+ @Override
+ public FileIoProvider getFileIoProvider() {
+ return fileIoProvider;
+ }
+
private LinkedList<ScanInfo> compileReport(File bpFinalizedDir,
File dir, LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException {
@@ -1291,7 +1303,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
List <String> fileNames;
try {
- fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE);
+ fileNames = fileIoProvider.listDirectory(
+ this, dir, BlockDirFilter.INSTANCE);
} catch (IOException ioe) {
LOG.warn("Exception occured while compiling report: ", ioe);
// Initiate a check on disk failure.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
index a1f7e91..5371eda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
/**
* This class is to be used as a builder for {@link FsVolumeImpl} objects.
@@ -31,6 +32,7 @@ public class FsVolumeImplBuilder {
private String storageID;
private StorageDirectory sd;
private Configuration conf;
+ private FileIoProvider fileIoProvider;
public FsVolumeImplBuilder() {
dataset = null;
@@ -59,7 +61,15 @@ public class FsVolumeImplBuilder {
return this;
}
+ FsVolumeImplBuilder setFileIoProvider(FileIoProvider fileIoProvider) {
+ this.fileIoProvider = fileIoProvider;
+ return this;
+ }
+
FsVolumeImpl build() throws IOException {
- return new FsVolumeImpl(dataset, storageID, sd, conf);
+ return new FsVolumeImpl(
+ dataset, storageID, sd,
+ fileIoProvider != null ? fileIoProvider : new FileIoProvider(null),
+ conf);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
index e963d41..20cec6a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
@@ -701,7 +701,7 @@ public class TestFileAppend{
ReplicaBeingWritten rbw =
(ReplicaBeingWritten)replicaHandler.getReplica();
ReplicaOutputStreams
- outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM, 300);
+ outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM);
OutputStream dataOutput = outputStreams.getDataOut();
byte[] appendBytes = new byte[1];
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index ae52905..a0041dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -122,6 +122,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
static final byte[] nullCrcFileData;
private final AutoCloseableLock datasetLock;
+ private final FileIoProvider fileIoProvider;
static {
DataChecksum checksum = DataChecksum.newDataChecksum(
@@ -260,7 +261,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
synchronized public ReplicaOutputStreams createStreams(boolean isCreate,
- DataChecksum requestedChecksum, long slowLogThresholdMs)
+ DataChecksum requestedChecksum)
throws IOException {
if (finalized) {
throw new IOException("Trying to write to a finalized replica "
@@ -268,7 +269,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} else {
SimulatedOutputStream crcStream = new SimulatedOutputStream();
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
- volume.isTransientStorage(), slowLogThresholdMs);
+ volume, fileIoProvider);
}
}
@@ -474,9 +475,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
static class SimulatedVolume implements FsVolumeSpi {
private final SimulatedStorage storage;
+ private final FileIoProvider fileIoProvider;
- SimulatedVolume(final SimulatedStorage storage) {
+ SimulatedVolume(final SimulatedStorage storage,
+ final FileIoProvider fileIoProvider) {
this.storage = storage;
+ this.fileIoProvider = fileIoProvider;
}
@Override
@@ -560,6 +564,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
+ public FileIoProvider getFileIoProvider() {
+ return fileIoProvider;
+ }
+
+ @Override
public VolumeCheckResult check(VolumeCheckContext context)
throws Exception {
return VolumeCheckResult.HEALTHY;
@@ -590,10 +599,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
registerMBean(datanodeUuid);
+ this.fileIoProvider = new FileIoProvider(conf);
this.storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
- this.volume = new SimulatedVolume(this.storage);
+ this.volume = new SimulatedVolume(this.storage, this.fileIoProvider);
this.datasetLock = new AutoCloseableLock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 8439991..619eda0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -673,7 +673,7 @@ public class TestBlockRecovery {
ReplicaOutputStreams streams = null;
try {
streams = replicaInfo.createStreams(true,
- DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
+ DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
streams.getChecksumOut().write('a');
dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index d7c8383..cc0915d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -905,6 +905,11 @@ public class TestDirectoryScanner {
return null;
}
+ @Override
+ public FileIoProvider getFileIoProvider() {
+ return null;
+ }
+
@Override
public VolumeCheckResult check(VolumeCheckContext context)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index fa980c2..4e724bc7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -83,7 +83,7 @@ public class TestSimulatedFSDataset {
ReplicaInPipeline bInfo = fsdataset.createRbw(
StorageType.DEFAULT, b, false).getReplica();
ReplicaOutputStreams out = bInfo.createStreams(true,
- DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
+ DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
try {
OutputStream dataOut = out.getDataOut();
assertEquals(0, fsdataset.getLength(b));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 2417c9d..5cd86e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -134,7 +134,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
@Override
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff,
long ckoff) throws IOException {
- return new ReplicaInputStreams(null, null, null);
+ return new ReplicaInputStreams(null, null, null, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
index 6fa2830..5c172e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
@@ -58,10 +58,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipeline {
@Override
public ReplicaOutputStreams createStreams(boolean isCreate,
- DataChecksum requestedChecksum, long slowLogThresholdMs)
+ DataChecksum requestedChecksum)
throws IOException {
- return new ReplicaOutputStreams(null, null, requestedChecksum, false,
- slowLogThresholdMs);
+ return new ReplicaOutputStreams(null, null, requestedChecksum,
+ null, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
index 2753a61..e607de5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
@@ -115,6 +116,11 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
}
@Override
+ public FileIoProvider getFileIoProvider() {
+ return null;
+ }
+
+ @Override
public VolumeCheckResult check(VolumeCheckContext context)
throws Exception {
return VolumeCheckResult.HEALTHY;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ba9587d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
index a089d39..3bac7b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
@@ -99,6 +99,8 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
.add(DFSConfigKeys.DFS_DATANODE_STARTUP_KEY);
configurationPropsToSkipCompare
.add(DFSConfigKeys.DFS_NAMENODE_STARTUP_KEY);
+ configurationPropsToSkipCompare
+ .add(DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY);
// Allocate
xmlPropsToSkipCompare = new HashSet<String>();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[6/9] hadoop git commit: HDFS-11204. Document the missing options of
hdfs zkfc command. Contributed by Yiqun Lin.
Posted by xg...@apache.org.
HDFS-11204. Document the missing options of hdfs zkfc command. Contributed by Yiqun Lin.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/72bff192
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/72bff192
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/72bff192
Branch: refs/heads/YARN-5734
Commit: 72bff192cd37ff97442e0f8dd477fbc2e58fc12d
Parents: 1f14f6d
Author: Akira Ajisaka <aa...@apache.org>
Authored: Wed Dec 14 18:50:43 2016 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Wed Dec 14 18:50:43 2016 +0900
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/ha/ZKFailoverController.java | 7 +++++--
.../hadoop-hdfs/src/site/markdown/HDFSCommands.md | 2 +-
2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/72bff192/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
index 30ec8f2..0ed9158 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
@@ -84,8 +84,11 @@ public abstract class ZKFailoverController {
ZK_AUTH_KEY
};
- protected static final String USAGE =
- "Usage: hdfs zkfc [ -formatZK [-force] [-nonInteractive] ]";
+ protected static final String USAGE =
+ "Usage: hdfs zkfc [ -formatZK [-force] [-nonInteractive] ]\n"
+ + "\t-force: formats the znode if the znode exists.\n"
+ + "\t-nonInteractive: formats the znode aborts if the znode exists,\n"
+ + "\tunless -force option is specified.";
/** Unable to format the parent znode in ZK */
static final int ERR_CODE_FORMAT_DENIED = 2;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/72bff192/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index 7f7dcde..a0d0ed7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -599,7 +599,7 @@ Usage: `hdfs zkfc [-formatZK [-force] [-nonInteractive]]`
| COMMAND\_OPTION | Description |
|:---- |:---- |
-| `-formatZK` | Format the Zookeeper instance |
+| `-formatZK` | Format the Zookeeper instance. -force: formats the znode if the znode exists. -nonInteractive: formats the znode aborts if the znode exists, unless -force option is specified. |
| `-h` | Display help |
This comamnd starts a Zookeeper Failover Controller process for use with [HDFS HA with QJM](./HDFSHighAvailabilityWithQJM.html#Administrative_commands).
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[4/9] hadoop git commit: Revert YARN-4126. RM should not issue
delegation tokens in unsecure mode.
Posted by xg...@apache.org.
Revert YARN-4126. RM should not issue delegation tokens in unsecure mode.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ada876cd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ada876cd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ada876cd
Branch: refs/heads/YARN-5734
Commit: ada876cd1d22b61f237603cf339bbed65285dab8
Parents: fbdbbd5
Author: Jian He <ji...@apache.org>
Authored: Tue Dec 13 20:49:54 2016 -0800
Committer: Jian He <ji...@apache.org>
Committed: Tue Dec 13 20:49:54 2016 -0800
----------------------------------------------------------------------
.../apache/hadoop/yarn/server/resourcemanager/ClientRMService.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ada876cd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 1bc40c4..3dc7e38 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -1238,7 +1238,7 @@ public class ClientRMService extends AbstractService implements
.contains(UserGroupInformation.getCurrentUser()
.getRealAuthenticationMethod());
} else {
- return false;
+ return true;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[3/9] hadoop git commit: HDFS-10684. WebHDFS DataNode calls fail
without parameter createparent. Contributed by John Zhuge.
Posted by xg...@apache.org.
HDFS-10684. WebHDFS DataNode calls fail without parameter createparent. Contributed by John Zhuge.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fbdbbd57
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fbdbbd57
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fbdbbd57
Branch: refs/heads/YARN-5734
Commit: fbdbbd57cdc3d8c778fca9266a7cadf298c8ff6c
Parents: e24a923
Author: Andrew Wang <wa...@apache.org>
Authored: Tue Dec 13 18:01:31 2016 -0800
Committer: Andrew Wang <wa...@apache.org>
Committed: Tue Dec 13 18:01:31 2016 -0800
----------------------------------------------------------------------
.../hdfs/web/resources/CreateParentParam.java | 2 +-
.../hdfs/web/resources/OverwriteParam.java | 2 +-
.../hdfs/web/TestWebHdfsFileSystemContract.java | 40 ++++++++++++++++++++
3 files changed, 42 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbdbbd57/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/CreateParentParam.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/CreateParentParam.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/CreateParentParam.java
index eaa5e8d..029efa0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/CreateParentParam.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/CreateParentParam.java
@@ -39,7 +39,7 @@ public class CreateParentParam extends BooleanParam {
* @param str a string representation of the parameter value.
*/
public CreateParentParam(final String str) {
- this(DOMAIN.parse(str));
+ this(DOMAIN.parse(str == null ? DEFAULT : str));
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbdbbd57/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/OverwriteParam.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/OverwriteParam.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/OverwriteParam.java
index 9610b93..d7f5fb0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/OverwriteParam.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/OverwriteParam.java
@@ -39,7 +39,7 @@ public class OverwriteParam extends BooleanParam {
* @param str a string representation of the parameter value.
*/
public OverwriteParam(final String str) {
- this(DOMAIN.parse(str));
+ super(DOMAIN, DOMAIN.parse(str == null ? DEFAULT : str));
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fbdbbd57/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
index a68b1ac..4854471 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
@@ -25,6 +25,7 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Map;
import java.util.Random;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -531,6 +533,44 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
}
}
+ public void testDatanodeCreateMissingParameter() throws IOException {
+ final WebHdfsFileSystem webhdfs = (WebHdfsFileSystem) fs;
+ final Path testDir = new Path(MessageFormat.format("/test/{0}/{1}",
+ TestWebHdfsFileSystemContract.class,
+ GenericTestUtils.getMethodName()));
+ assertTrue(webhdfs.mkdirs(testDir));
+
+ for (String dnCreateParam : new String[]{
+ CreateFlagParam.NAME,
+ CreateParentParam.NAME,
+ OverwriteParam.NAME
+ }) {
+ final HttpOpParam.Op op = PutOpParam.Op.CREATE;
+ final Path newfile = new Path(testDir, "newfile_" + dnCreateParam);
+ final URL url = webhdfs.toUrl(op, newfile);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod(op.getType().toString());
+ conn.setDoOutput(false);
+ conn.setInstanceFollowRedirects(false);
+ conn.connect();
+ final String redirect = conn.getHeaderField("Location");
+ conn.disconnect();
+
+ //remove createparent
+ WebHdfsFileSystem.LOG.info("redirect = " + redirect);
+ String re = "&" + dnCreateParam + "=[^&]*";
+ String modified = redirect.replaceAll(re, "");
+ WebHdfsFileSystem.LOG.info("modified = " + modified);
+
+ //connect to datanode
+ conn = (HttpURLConnection)new URL(modified).openConnection();
+ conn.setRequestMethod(op.getType().toString());
+ conn.setDoOutput(op.getDoOutput());
+ conn.connect();
+ assertEquals(HttpServletResponse.SC_CREATED, conn.getResponseCode());
+ }
+ }
+
@Test
public void testAccess() throws IOException, InterruptedException {
Path p1 = new Path("/pathX");
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org