You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2016/12/08 22:29:34 UTC
[27/50] [abbrv] hadoop git commit: HDFS-10930. Refactor: Wrap
Datanode IO related operations. Contributed by Xiaoyu Yao.
HDFS-10930. Refactor: Wrap Datanode IO related operations. Contributed by Xiaoyu Yao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/df983b52
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/df983b52
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/df983b52
Branch: refs/heads/HADOOP-13345
Commit: df983b524ab68ea0c70cee9033bfff2d28052cbf
Parents: 43cb167
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Mon Dec 5 13:04:39 2016 -0800
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Tue Dec 6 11:05:47 2016 -0800
----------------------------------------------------------------------
.../hdfs/server/datanode/BlockReceiver.java | 66 +++----
.../hdfs/server/datanode/BlockSender.java | 105 ++++-------
.../hadoop/hdfs/server/datanode/DNConf.java | 4 +
.../hdfs/server/datanode/DataStorage.java | 5 +
.../hdfs/server/datanode/LocalReplica.java | 179 +++++++++++++------
.../server/datanode/LocalReplicaInPipeline.java | 30 ++--
.../hdfs/server/datanode/ReplicaInPipeline.java | 4 +-
.../server/datanode/fsdataset/FsDatasetSpi.java | 3 +-
.../datanode/fsdataset/ReplicaInputStreams.java | 102 ++++++++++-
.../fsdataset/ReplicaOutputStreams.java | 107 ++++++++++-
.../datanode/fsdataset/impl/BlockPoolSlice.java | 97 +++++-----
.../impl/FsDatasetAsyncDiskService.java | 7 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 5 +-
.../datanode/fsdataset/impl/FsVolumeImpl.java | 5 +-
.../org/apache/hadoop/hdfs/TestFileAppend.java | 2 +-
.../server/datanode/SimulatedFSDataset.java | 8 +-
.../hdfs/server/datanode/TestBlockRecovery.java | 2 +-
.../server/datanode/TestSimulatedFSDataset.java | 2 +-
.../extdataset/ExternalDatasetImpl.java | 4 +-
.../extdataset/ExternalReplicaInPipeline.java | 6 +-
20 files changed, 470 insertions(+), 273 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 39419c1..f372072 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -24,10 +24,7 @@ import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
-import java.io.FileDescriptor;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
@@ -53,7 +50,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
@@ -88,8 +84,6 @@ class BlockReceiver implements Closeable {
* the DataNode needs to recalculate checksums before writing.
*/
private final boolean needsChecksumTranslation;
- private OutputStream out = null; // to block file at local disk
- private FileDescriptor outFd;
private DataOutputStream checksumOut = null; // to crc file at local disk
private final int bytesPerChecksum;
private final int checksumSize;
@@ -250,7 +244,8 @@ class BlockReceiver implements Closeable {
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
- streams = replicaInfo.createStreams(isCreate, requestedChecksum);
+ streams = replicaInfo.createStreams(isCreate, requestedChecksum,
+ datanodeSlowLogThresholdMs);
assert streams != null : "null streams!";
// read checksum meta information
@@ -260,13 +255,6 @@ class BlockReceiver implements Closeable {
this.bytesPerChecksum = diskChecksum.getBytesPerChecksum();
this.checksumSize = diskChecksum.getChecksumSize();
- this.out = streams.getDataOut();
- if (out instanceof FileOutputStream) {
- this.outFd = ((FileOutputStream)out).getFD();
- } else {
- LOG.warn("Could not get file descriptor for outputstream of class " +
- out.getClass());
- }
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.getChecksumOut(), DFSUtilClient.getSmallBufferSize(
datanode.getConf())));
@@ -319,7 +307,7 @@ class BlockReceiver implements Closeable {
packetReceiver.close();
IOException ioe = null;
- if (syncOnClose && (out != null || checksumOut != null)) {
+ if (syncOnClose && (streams.getDataOut() != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount();
}
long flushTotalNanos = 0;
@@ -348,9 +336,9 @@ class BlockReceiver implements Closeable {
}
// close block file
try {
- if (out != null) {
+ if (streams.getDataOut() != null) {
long flushStartNanos = System.nanoTime();
- out.flush();
+ streams.flushDataOut();
long flushEndNanos = System.nanoTime();
if (syncOnClose) {
long fsyncStartNanos = flushEndNanos;
@@ -359,14 +347,13 @@ class BlockReceiver implements Closeable {
}
flushTotalNanos += flushEndNanos - flushStartNanos;
measuredFlushTime = true;
- out.close();
- out = null;
+ streams.closeDataStream();
}
} catch (IOException e) {
ioe = e;
}
finally{
- IOUtils.closeStream(out);
+ streams.close();
}
if (replicaHandler != null) {
IOUtils.cleanup(null, replicaHandler);
@@ -419,9 +406,9 @@ class BlockReceiver implements Closeable {
}
flushTotalNanos += flushEndNanos - flushStartNanos;
}
- if (out != null) {
+ if (streams.getDataOut() != null) {
long flushStartNanos = System.nanoTime();
- out.flush();
+ streams.flushDataOut();
long flushEndNanos = System.nanoTime();
if (isSync) {
long fsyncStartNanos = flushEndNanos;
@@ -430,10 +417,10 @@ class BlockReceiver implements Closeable {
}
flushTotalNanos += flushEndNanos - flushStartNanos;
}
- if (checksumOut != null || out != null) {
+ if (checksumOut != null || streams.getDataOut() != null) {
datanode.metrics.addFlushNanos(flushTotalNanos);
if (isSync) {
- datanode.metrics.incrFsyncCount();
+ datanode.metrics.incrFsyncCount();
}
}
long duration = Time.monotonicNow() - begin;
@@ -716,16 +703,12 @@ class BlockReceiver implements Closeable {
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
// Write data to disk.
- long begin = Time.monotonicNow();
- out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
- long duration = Time.monotonicNow() - begin;
+ long duration = streams.writeToDisk(dataBuf.array(),
+ startByteToDisk, numBytesToDisk);
+
if (duration > maxWriteToDiskMs) {
maxWriteToDiskMs = duration;
}
- if (duration > datanodeSlowLogThresholdMs) {
- LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
- + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
- }
final byte[] lastCrc;
if (shouldNotWriteChecksum) {
@@ -842,7 +825,7 @@ class BlockReceiver implements Closeable {
private void manageWriterOsCache(long offsetInBlock) {
try {
- if (outFd != null &&
+ if (streams.getOutFd() != null &&
offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
long begin = Time.monotonicNow();
//
@@ -857,12 +840,11 @@ class BlockReceiver implements Closeable {
if (syncBehindWrites) {
if (syncBehindWritesInBackground) {
this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest(
- block, outFd, lastCacheManagementOffset,
+ block, streams, lastCacheManagementOffset,
offsetInBlock - lastCacheManagementOffset,
SYNC_FILE_RANGE_WRITE);
} else {
- NativeIO.POSIX.syncFileRangeIfPossible(outFd,
- lastCacheManagementOffset,
+ streams.syncFileRangeIfPossible(lastCacheManagementOffset,
offsetInBlock - lastCacheManagementOffset,
SYNC_FILE_RANGE_WRITE);
}
@@ -879,8 +861,8 @@ class BlockReceiver implements Closeable {
//
long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
if (dropPos > 0 && dropCacheBehindWrites) {
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
- block.getBlockName(), outFd, 0, dropPos, POSIX_FADV_DONTNEED);
+ streams.dropCacheBehindWrites(block.getBlockName(), 0, dropPos,
+ POSIX_FADV_DONTNEED);
}
lastCacheManagementOffset = offsetInBlock;
long duration = Time.monotonicNow() - begin;
@@ -989,7 +971,7 @@ class BlockReceiver implements Closeable {
// The worst case is not recovering this RBW replica.
// Client will fall back to regular pipeline recovery.
} finally {
- IOUtils.closeStream(out);
+ IOUtils.closeStream(streams.getDataOut());
}
try {
// Even if the connection is closed after the ack packet is
@@ -1047,8 +1029,8 @@ class BlockReceiver implements Closeable {
* will be overwritten.
*/
private void adjustCrcFilePosition() throws IOException {
- if (out != null) {
- out.flush();
+ if (streams.getDataOut() != null) {
+ streams.flushDataOut();
}
if (checksumOut != null) {
checksumOut.flush();
@@ -1094,10 +1076,10 @@ class BlockReceiver implements Closeable {
byte[] crcbuf = new byte[checksumSize];
try (ReplicaInputStreams instr =
datanode.data.getTmpInputStreams(block, blkoff, ckoff)) {
- IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk);
+ instr.readDataFully(buf, 0, sizePartialChunk);
// open meta file and read in crc value computer earlier
- IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length);
+ instr.readChecksumFully(crcbuf, 0, crcbuf.length);
}
// compute crc of partial chunk from data read in the block file.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index a1b1f86..9182c88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -42,11 +41,11 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
-import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
@@ -120,12 +119,11 @@ class BlockSender implements java.io.Closeable {
/** the block to read from */
private final ExtendedBlock block;
- /** Stream to read block data from */
- private InputStream blockIn;
+
+ /** InputStreams and file descriptors to read block/checksum. */
+ private ReplicaInputStreams ris;
/** updated while using transferTo() */
private long blockInPosition = -1;
- /** Stream to read checksum */
- private DataInputStream checksumIn;
/** Checksum utility */
private final DataChecksum checksum;
/** Initial position to read */
@@ -152,11 +150,6 @@ class BlockSender implements java.io.Closeable {
private final String clientTraceFmt;
private volatile ChunkChecksum lastChunkChecksum = null;
private DataNode datanode;
-
- /** The file descriptor of the block being sent */
- private FileDescriptor blockInFd;
- /** The reference to the volume where the block is located */
- private FsVolumeReference volumeRef;
/** The replica of the block that is being read. */
private final Replica replica;
@@ -201,6 +194,9 @@ class BlockSender implements java.io.Closeable {
boolean sendChecksum, DataNode datanode, String clientTraceFmt,
CachingStrategy cachingStrategy)
throws IOException {
+ InputStream blockIn = null;
+ DataInputStream checksumIn = null;
+ FsVolumeReference volumeRef = null;
try {
this.block = block;
this.corruptChecksumOk = corruptChecksumOk;
@@ -281,7 +277,7 @@ class BlockSender implements java.io.Closeable {
(!is32Bit || length <= Integer.MAX_VALUE);
// Obtain a reference before reading data
- this.volumeRef = datanode.data.getVolume(block).obtainReference();
+ volumeRef = datanode.data.getVolume(block).obtainReference();
/*
* (corruptChecksumOK, meta_file_exist): operation
@@ -405,14 +401,9 @@ class BlockSender implements java.io.Closeable {
DataNode.LOG.debug("replica=" + replica);
}
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
- if (blockIn instanceof FileInputStream) {
- blockInFd = ((FileInputStream)blockIn).getFD();
- } else {
- blockInFd = null;
- }
+ ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef);
} catch (IOException ioe) {
IOUtils.closeStream(this);
- IOUtils.closeStream(blockIn);
throw ioe;
}
}
@@ -422,12 +413,11 @@ class BlockSender implements java.io.Closeable {
*/
@Override
public void close() throws IOException {
- if (blockInFd != null &&
+ if (ris.getDataInFd() != null &&
((dropCacheBehindAllReads) ||
(dropCacheBehindLargeReads && isLongRead()))) {
try {
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
- block.getBlockName(), blockInFd, lastCacheDropOffset,
+ ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset,
offset - lastCacheDropOffset, POSIX_FADV_DONTNEED);
} catch (Exception e) {
LOG.warn("Unable to drop cache on file close", e);
@@ -436,32 +426,12 @@ class BlockSender implements java.io.Closeable {
if (curReadahead != null) {
curReadahead.cancel();
}
-
- IOException ioe = null;
- if(checksumIn!=null) {
- try {
- checksumIn.close(); // close checksum file
- } catch (IOException e) {
- ioe = e;
- }
- checksumIn = null;
- }
- if(blockIn!=null) {
- try {
- blockIn.close(); // close data file
- } catch (IOException e) {
- ioe = e;
- }
- blockIn = null;
- blockInFd = null;
- }
- if (volumeRef != null) {
- IOUtils.cleanup(null, volumeRef);
- volumeRef = null;
- }
- // throw IOException if there is any
- if(ioe!= null) {
- throw ioe;
+
+ try {
+ ris.closeStreams();
+ } finally {
+ IOUtils.closeStream(ris);
+ ris = null;
}
}
@@ -565,7 +535,7 @@ class BlockSender implements java.io.Closeable {
int checksumOff = pkt.position();
byte[] buf = pkt.array();
- if (checksumSize > 0 && checksumIn != null) {
+ if (checksumSize > 0 && ris.getChecksumIn() != null) {
readChecksum(buf, checksumOff, checksumDataLen);
// write in progress that we need to use to get last checksum
@@ -581,7 +551,7 @@ class BlockSender implements java.io.Closeable {
int dataOff = checksumOff + checksumDataLen;
if (!transferTo) { // normal transfer
- IOUtils.readFully(blockIn, buf, dataOff, dataLen);
+ ris.readDataFully(buf, dataOff, dataLen);
if (verifyChecksum) {
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
@@ -593,12 +563,12 @@ class BlockSender implements java.io.Closeable {
SocketOutputStream sockOut = (SocketOutputStream)out;
// First write header and checksums
sockOut.write(buf, headerOff, dataOff - headerOff);
-
+
// no need to flush since we know out is not a buffered stream
- FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
+ FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
- sockOut.transferToFully(fileCh, blockInPosition, dataLen,
+ sockOut.transferToFully(fileCh, blockInPosition, dataLen,
waitTime, transferTime);
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
@@ -630,7 +600,7 @@ class BlockSender implements java.io.Closeable {
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e);
datanode.getBlockScanner().markSuspectBlock(
- volumeRef.getVolume().getStorageID(),
+ ris.getVolumeRef().getVolume().getStorageID(),
block);
}
}
@@ -653,16 +623,15 @@ class BlockSender implements java.io.Closeable {
*/
private void readChecksum(byte[] buf, final int checksumOffset,
final int checksumLen) throws IOException {
- if (checksumSize <= 0 && checksumIn == null) {
+ if (checksumSize <= 0 && ris.getChecksumIn() == null) {
return;
}
try {
- checksumIn.readFully(buf, checksumOffset, checksumLen);
+ ris.readChecksumFully(buf, checksumOffset, checksumLen);
} catch (IOException e) {
LOG.warn(" Could not read or failed to verify checksum for data"
+ " at offset " + offset + " for block " + block, e);
- IOUtils.closeStream(checksumIn);
- checksumIn = null;
+ ris.closeChecksumStream();
if (corruptChecksumOk) {
if (checksumOffset < checksumLen) {
// Just fill the array with zeros.
@@ -746,10 +715,10 @@ class BlockSender implements java.io.Closeable {
lastCacheDropOffset = initialOffset;
- if (isLongRead() && blockInFd != null) {
+ if (isLongRead() && ris.getDataInFd() != null) {
// Advise that this file descriptor will be accessed sequentially.
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
- block.getBlockName(), blockInFd, 0, 0, POSIX_FADV_SEQUENTIAL);
+ ris.dropCacheBehindReads(block.getBlockName(), 0, 0,
+ POSIX_FADV_SEQUENTIAL);
}
// Trigger readahead of beginning of file if configured.
@@ -761,9 +730,10 @@ class BlockSender implements java.io.Closeable {
int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum
&& baseStream instanceof SocketOutputStream
- && blockIn instanceof FileInputStream;
+ && ris.getDataIn() instanceof FileInputStream;
if (transferTo) {
- FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
+ FileChannel fileChannel =
+ ((FileInputStream)ris.getDataIn()).getChannel();
blockInPosition = fileChannel.position();
streamForSendChunks = baseStream;
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
@@ -818,14 +788,16 @@ class BlockSender implements java.io.Closeable {
private void manageOsCache() throws IOException {
// We can't manage the cache for this block if we don't have a file
// descriptor to work with.
- if (blockInFd == null) return;
+ if (ris.getDataInFd() == null) {
+ return;
+ }
// Perform readahead if necessary
if ((readaheadLength > 0) && (datanode.readaheadPool != null) &&
(alwaysReadahead || isLongRead())) {
curReadahead = datanode.readaheadPool.readaheadStream(
- clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE,
- curReadahead);
+ clientTraceFmt, ris.getDataInFd(), offset, readaheadLength,
+ Long.MAX_VALUE, curReadahead);
}
// Drop what we've just read from cache, since we aren't
@@ -835,8 +807,7 @@ class BlockSender implements java.io.Closeable {
long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
if (offset >= nextCacheDropOffset) {
long dropLength = offset - lastCacheDropOffset;
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
- block.getBlockName(), blockInFd, lastCacheDropOffset,
+ ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset,
dropLength, POSIX_FADV_DONTNEED);
lastCacheDropOffset = offset;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 823d05c..c1487b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -402,6 +402,10 @@ public class DNConf {
return volsConfigured;
}
+ public long getSlowIoWarningThresholdMs() {
+ return datanodeSlowIoWarningThresholdMs;
+ }
+
int getMaxDataLength() {
return maxDataLength;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 29b14e7..f4deb6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -1355,4 +1355,9 @@ public class DataStorage extends Storage {
synchronized void removeBlockPoolStorage(String bpId) {
bpStorageMap.remove(bpId);
}
+
+ public static boolean fullyDelete(final File dir) {
+ boolean result = FileUtil.fullyDelete(dir);
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
index f829111..e6f7e12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
@@ -29,9 +29,6 @@ import java.net.URI;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.fs.LocalFileSystem;
@@ -46,6 +43,8 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
@@ -69,15 +68,7 @@ abstract public class LocalReplica extends ReplicaInfo {
private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
- static final Log LOG = LogFactory.getLog(LocalReplica.class);
- private final static boolean IS_NATIVE_IO_AVAIL;
- static {
- IS_NATIVE_IO_AVAIL = NativeIO.isAvailable();
- if (Path.WINDOWS && !IS_NATIVE_IO_AVAIL) {
- LOG.warn("Data node cannot fully support concurrent reading"
- + " and writing without native code extensions on Windows.");
- }
- }
+ static final Logger LOG = LoggerFactory.getLogger(LocalReplica.class);
/**
* Constructor
@@ -199,14 +190,14 @@ abstract public class LocalReplica extends ReplicaInfo {
File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
try (FileInputStream in = new FileInputStream(file)) {
try (FileOutputStream out = new FileOutputStream(tmpFile)){
- IOUtils.copyBytes(in, out, 16 * 1024);
+ copyBytes(in, out, 16 * 1024);
}
if (file.length() != tmpFile.length()) {
throw new IOException("Copy of file " + file + " size " + file.length()+
" into file " + tmpFile +
" resulted in a size of " + tmpFile.length());
}
- FileUtil.replaceFile(tmpFile, file);
+ replaceFile(tmpFile, file);
} catch (IOException e) {
boolean done = tmpFile.delete();
if (!done) {
@@ -241,13 +232,13 @@ abstract public class LocalReplica extends ReplicaInfo {
}
File meta = getMetaFile();
- int linkCount = HardLink.getLinkCount(file);
+ int linkCount = getHardLinkCount(file);
if (linkCount > 1) {
DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
"block " + this);
breakHardlinks(file, this);
}
- if (HardLink.getLinkCount(meta) > 1) {
+ if (getHardLinkCount(meta) > 1) {
breakHardlinks(meta, this);
}
return true;
@@ -260,18 +251,7 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public InputStream getDataInputStream(long seekOffset) throws IOException {
-
- File blockFile = getBlockFile();
- if (IS_NATIVE_IO_AVAIL) {
- return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
- } else {
- try {
- return FsDatasetUtil.openAndSeek(blockFile, seekOffset);
- } catch (FileNotFoundException fnfe) {
- throw new IOException("Block " + this + " is not valid. " +
- "Expected block file at " + blockFile + " does not exist.");
- }
- }
+ return getDataInputStream(getBlockFile(), seekOffset);
}
@Override
@@ -286,7 +266,7 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public boolean deleteBlockData() {
- return getBlockFile().delete();
+ return fullyDelete(getBlockFile());
}
@Override
@@ -320,7 +300,7 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public boolean deleteMetadata() {
- return getMetaFile().delete();
+ return fullyDelete(getMetaFile());
}
@Override
@@ -340,7 +320,7 @@ abstract public class LocalReplica extends ReplicaInfo {
private boolean renameFile(File srcfile, File destfile) throws IOException {
try {
- NativeIO.renameTo(srcfile, destfile);
+ rename(srcfile, destfile);
return true;
} catch (IOException e) {
throw new IOException("Failed to move block file for " + this
@@ -367,22 +347,14 @@ abstract public class LocalReplica extends ReplicaInfo {
@Override
public boolean getPinning(LocalFileSystem localFS) throws IOException {
- FileStatus fss =
- localFS.getFileStatus(new Path(getBlockFile().getAbsolutePath()));
- return fss.getPermission().getStickyBit();
+ return getPinning(localFS, new Path(getBlockFile().getAbsolutePath()));
}
@Override
public void setPinning(LocalFileSystem localFS) throws IOException {
File f = getBlockFile();
Path p = new Path(f.getAbsolutePath());
-
- FsPermission oldPermission = localFS.getFileStatus(
- new Path(f.getAbsolutePath())).getPermission();
- //sticky bit is used for pinning purpose
- FsPermission permission = new FsPermission(oldPermission.getUserAction(),
- oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
- localFS.setPermission(p, permission);
+ setPinning(localFS, p);
}
@Override
@@ -398,7 +370,7 @@ abstract public class LocalReplica extends ReplicaInfo {
}
try {
// calling renameMeta on the ReplicaInfo doesn't work here
- NativeIO.renameTo(oldmeta, newmeta);
+ rename(oldmeta, newmeta);
} catch (IOException e) {
setGenerationStamp(oldGS); // restore old GS
throw new IOException("Block " + this + " reopen failed. " +
@@ -417,7 +389,113 @@ abstract public class LocalReplica extends ReplicaInfo {
return info.getBlockFile().compareTo(getBlockFile());
}
- static public void truncateBlock(File blockFile, File metaFile,
+ @Override
+ public void copyMetadata(URI destination) throws IOException {
+ //for local replicas, we assume the destination URI is file
+ nativeCopyFileUnbuffered(getMetaFile(), new File(destination), true);
+ }
+
+ @Override
+ public void copyBlockdata(URI destination) throws IOException {
+ //for local replicas, we assume the destination URI is file
+ nativeCopyFileUnbuffered(getBlockFile(), new File(destination), true);
+ }
+
+ public void renameMeta(File newMetaFile) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming " + getMetaFile() + " to " + newMetaFile);
+ }
+ renameFile(getMetaFile(), newMetaFile);
+ }
+
+ public void renameBlock(File newBlockFile) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming " + getBlockFile() + " to " + newBlockFile
+ + ", file length=" + getBlockFile().length());
+ }
+ renameFile(getBlockFile(), newBlockFile);
+ }
+
+ public static void rename(File from, File to) throws IOException {
+ Storage.rename(from, to);
+ }
+
+ /**
+ * Get input stream for a local file and optionally seek to the offset.
+ * @param f path to the file
+ * @param seekOffset offset to seek
+ * @return input stream for read
+ * @throws IOException
+ */
+ private FileInputStream getDataInputStream(File f, long seekOffset)
+ throws IOException {
+ FileInputStream fis;
+ if (NativeIO.isAvailable()) {
+ fis = NativeIO.getShareDeleteFileInputStream(f, seekOffset);
+ } else {
+ try {
+ fis = FsDatasetUtil.openAndSeek(f, seekOffset);
+ } catch (FileNotFoundException fnfe) {
+ throw new IOException("Expected block file at " + f +
+ " does not exist.");
+ }
+ }
+ return fis;
+ }
+
+ private void nativeCopyFileUnbuffered(File srcFile, File destFile,
+ boolean preserveFileDate) throws IOException {
+ Storage.nativeCopyFileUnbuffered(srcFile, destFile, preserveFileDate);
+ }
+
+ private void copyBytes(InputStream in, OutputStream out, int
+ buffSize) throws IOException{
+ IOUtils.copyBytes(in, out, buffSize);
+ }
+
+ private void replaceFile(File src, File target) throws IOException {
+ FileUtil.replaceFile(src, target);
+ }
+
+ public static boolean fullyDelete(final File dir) {
+ boolean result = DataStorage.fullyDelete(dir);
+ return result;
+ }
+
+ public static int getHardLinkCount(File fileName) throws IOException {
+ int linkCount = HardLink.getLinkCount(fileName);
+ return linkCount;
+ }
+
+ /**
+ * Get pin status of a file by checking the sticky bit.
+ * @param localFS local file system
+ * @param path path to be checked
+ * @return true if the file is pinned with sticky bit
+ * @throws IOException
+ */
+ public boolean getPinning(LocalFileSystem localFS, Path path) throws
+ IOException {
+ boolean stickyBit =
+ localFS.getFileStatus(path).getPermission().getStickyBit();
+ return stickyBit;
+ }
+
+ /**
+ * Set sticky bit on path to pin file.
+ * @param localFS local file system
+ * @param path path to be pinned with sticky bit
+ * @throws IOException
+ */
+ public void setPinning(LocalFileSystem localFS, Path path) throws
+ IOException {
+ FsPermission oldPermission = localFS.getFileStatus(path).getPermission();
+ FsPermission permission = new FsPermission(oldPermission.getUserAction(),
+ oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
+ localFS.setPermission(path, permission);
+ }
+
+ public static void truncateBlock(File blockFile, File metaFile,
long oldlen, long newlen) throws IOException {
LOG.info("truncateBlock: blockFile=" + blockFile
+ ", metaFile=" + metaFile
@@ -467,19 +545,4 @@ abstract public class LocalReplica extends ReplicaInfo {
metaRAF.close();
}
}
-
- @Override
- public void copyMetadata(URI destination) throws IOException {
- //for local replicas, we assume the destination URI is file
- Storage.nativeCopyFileUnbuffered(getMetaFile(),
- new File(destination), true);
- }
-
- @Override
- public void copyBlockdata(URI destination) throws IOException {
- //for local replicas, we assume the destination URI is file
- Storage.nativeCopyFileUnbuffered(getBlockFile(),
- new File(destination), true);
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
index bc7bc6d..1387155 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
@@ -246,7 +245,8 @@ public class LocalReplicaInPipeline extends LocalReplica
@Override // ReplicaInPipeline
public ReplicaOutputStreams createStreams(boolean isCreate,
- DataChecksum requestedChecksum) throws IOException {
+ DataChecksum requestedChecksum, long slowLogThresholdMs)
+ throws IOException {
File blockFile = getBlockFile();
File metaFile = getMetaFile();
if (DataNode.LOG.isDebugEnabled()) {
@@ -313,7 +313,7 @@ public class LocalReplicaInPipeline extends LocalReplica
crcOut.getChannel().position(crcDiskSize);
}
return new ReplicaOutputStreams(blockOut, crcOut, checksum,
- getVolume().isTransientStorage());
+ getVolume().isTransientStorage(), slowLogThresholdMs);
} catch (IOException e) {
IOUtils.closeStream(blockOut);
IOUtils.closeStream(metaRAF);
@@ -373,40 +373,30 @@ public class LocalReplicaInPipeline extends LocalReplica
+ " should be derived from LocalReplica");
}
- LocalReplica localReplica = (LocalReplica) oldReplicaInfo;
-
- File oldmeta = localReplica.getMetaFile();
+ LocalReplica oldReplica = (LocalReplica) oldReplicaInfo;
+ File oldmeta = oldReplica.getMetaFile();
File newmeta = getMetaFile();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming " + oldmeta + " to " + newmeta);
- }
try {
- NativeIO.renameTo(oldmeta, newmeta);
+ oldReplica.renameMeta(newmeta);
} catch (IOException e) {
throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
" Unable to move meta file " + oldmeta +
" to rbw dir " + newmeta, e);
}
- File blkfile = localReplica.getBlockFile();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming " + blkfile + " to " + newBlkFile
- + ", file length=" + blkfile.length());
- }
try {
- NativeIO.renameTo(blkfile, newBlkFile);
+ oldReplica.renameBlock(newBlkFile);
} catch (IOException e) {
try {
- NativeIO.renameTo(newmeta, oldmeta);
+ renameMeta(oldmeta);
} catch (IOException ex) {
LOG.warn("Cannot move meta file " + newmeta +
"back to the finalized directory " + oldmeta, ex);
}
throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
- " Unable to move block file " + blkfile +
- " to rbw dir " + newBlkFile, e);
+ " Unable to move block file " + oldReplica.getBlockFile() +
+ " to rbw dir " + newBlkFile, e);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index efa6ea6..5fdbec0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -69,11 +69,13 @@ public interface ReplicaInPipeline extends Replica {
*
* @param isCreate if it is for creation
* @param requestedChecksum the checksum the writer would prefer to use
+ * @param slowLogThresholdMs slow io threshold for logging
* @return output streams for writing
* @throws IOException if any error occurs
*/
public ReplicaOutputStreams createStreams(boolean isCreate,
- DataChecksum requestedChecksum) throws IOException;
+ DataChecksum requestedChecksum, long slowLogThresholdMs)
+ throws IOException;
/**
* Create an output stream to write restart metadata in case of datanode
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 57ec2b4..30f045f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
-import java.io.FileDescriptor;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@@ -605,7 +604,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* submit a sync_file_range request to AsyncDiskService.
*/
void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block,
- final FileDescriptor fd, final long offset, final long nbytes,
+ final ReplicaOutputStreams outs, final long offset, final long nbytes,
final int flags);
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
index 227179d..54d0e96 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
@@ -18,24 +18,45 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
import java.io.InputStream;
+import java.io.IOException;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIOException;
+import org.slf4j.Logger;
/**
* Contains the input streams for the data and checksum of a replica.
*/
public class ReplicaInputStreams implements Closeable {
- private final InputStream dataIn;
- private final InputStream checksumIn;
- private final FsVolumeReference volumeRef;
+ public static final Logger LOG = DataNode.LOG;
+
+ private InputStream dataIn;
+ private InputStream checksumIn;
+ private FsVolumeReference volumeRef;
+ private FileDescriptor dataInFd = null;
/** Create an object with a data input stream and a checksum input stream. */
- public ReplicaInputStreams(InputStream dataStream, InputStream checksumStream,
- FsVolumeReference volumeRef) {
+ public ReplicaInputStreams(InputStream dataStream,
+ InputStream checksumStream, FsVolumeReference volumeRef) {
this.volumeRef = volumeRef;
this.dataIn = dataStream;
this.checksumIn = checksumStream;
+ if (dataIn instanceof FileInputStream) {
+ try {
+ dataInFd = ((FileInputStream) dataIn).getFD();
+ } catch (Exception e) {
+ LOG.warn("Could not get file descriptor for inputstream of class " +
+ this.dataIn.getClass());
+ }
+ } else {
+ LOG.debug("Could not get file descriptor for inputstream of class " +
+ this.dataIn.getClass());
+ }
}
/** @return the data input stream. */
@@ -48,10 +69,81 @@ public class ReplicaInputStreams implements Closeable {
return checksumIn;
}
+ public FileDescriptor getDataInFd() {
+ return dataInFd;
+ }
+
+ public FsVolumeReference getVolumeRef() {
+ return volumeRef;
+ }
+
+ public void readDataFully(byte[] buf, int off, int len)
+ throws IOException {
+ IOUtils.readFully(dataIn, buf, off, len);
+ }
+
+ public void readChecksumFully(byte[] buf, int off, int len)
+ throws IOException {
+ IOUtils.readFully(checksumIn, buf, off, len);
+ }
+
+ public void skipDataFully(long len) throws IOException {
+ IOUtils.skipFully(dataIn, len);
+ }
+
+ public void skipChecksumFully(long len) throws IOException {
+ IOUtils.skipFully(checksumIn, len);
+ }
+
+ public void closeChecksumStream() throws IOException {
+ IOUtils.closeStream(checksumIn);
+ checksumIn = null;
+ }
+
+ public void dropCacheBehindReads(String identifier, long offset, long len,
+ int flags) throws NativeIOException {
+ assert this.dataInFd != null : "null dataInFd!";
+ NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+ identifier, dataInFd, offset, len, flags);
+ }
+
+ public void closeStreams() throws IOException {
+ IOException ioe = null;
+ if(checksumIn!=null) {
+ try {
+ checksumIn.close(); // close checksum file
+ } catch (IOException e) {
+ ioe = e;
+ }
+ checksumIn = null;
+ }
+ if(dataIn!=null) {
+ try {
+ dataIn.close(); // close data file
+ } catch (IOException e) {
+ ioe = e;
+ }
+ dataIn = null;
+ dataInFd = null;
+ }
+ if (volumeRef != null) {
+ IOUtils.cleanup(null, volumeRef);
+ volumeRef = null;
+ }
+ // throw IOException if there is any
+ if(ioe!= null) {
+ throw ioe;
+ }
+ }
+
@Override
public void close() {
IOUtils.closeStream(dataIn);
+ dataIn = null;
+ dataInFd = null;
IOUtils.closeStream(checksumIn);
+ checksumIn = null;
IOUtils.cleanup(null, volumeRef);
+ volumeRef = null;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
index bd1461a..a66847a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
@@ -18,32 +18,62 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
+import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.io.IOException;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
/**
* Contains the output streams for the data and checksum of a replica.
*/
public class ReplicaOutputStreams implements Closeable {
- private final OutputStream dataOut;
+ public static final Logger LOG = DataNode.LOG;
+
+ private FileDescriptor outFd = null;
+ /** Stream to block. */
+ private OutputStream dataOut;
+ /** Stream to checksum. */
private final OutputStream checksumOut;
private final DataChecksum checksum;
private final boolean isTransientStorage;
+ private final long slowLogThresholdMs;
/**
* Create an object with a data output stream, a checksum output stream
* and a checksum.
*/
- public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut,
- DataChecksum checksum, boolean isTransientStorage) {
+ public ReplicaOutputStreams(OutputStream dataOut,
+ OutputStream checksumOut, DataChecksum checksum,
+ boolean isTransientStorage, long slowLogThresholdMs) {
this.dataOut = dataOut;
- this.checksumOut = checksumOut;
this.checksum = checksum;
+ this.slowLogThresholdMs = slowLogThresholdMs;
this.isTransientStorage = isTransientStorage;
+ this.checksumOut = checksumOut;
+
+ try {
+ if (this.dataOut instanceof FileOutputStream) {
+ this.outFd = ((FileOutputStream)this.dataOut).getFD();
+ } else {
+ LOG.debug("Could not get file descriptor for outputstream of class " +
+ this.dataOut.getClass());
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not get file descriptor for outputstream of class " +
+ this.dataOut.getClass());
+ }
+ }
+
+ public FileDescriptor getOutFd() {
+ return outFd;
}
/** @return the data output stream. */
@@ -72,12 +102,17 @@ public class ReplicaOutputStreams implements Closeable {
IOUtils.closeStream(checksumOut);
}
+ public void closeDataStream() throws IOException {
+ dataOut.close();
+ dataOut = null;
+ }
+
/**
* Sync the data stream if it supports it.
*/
public void syncDataOut() throws IOException {
if (dataOut instanceof FileOutputStream) {
- ((FileOutputStream)dataOut).getChannel().force(true);
+ sync((FileOutputStream)dataOut);
}
}
@@ -86,8 +121,68 @@ public class ReplicaOutputStreams implements Closeable {
*/
public void syncChecksumOut() throws IOException {
if (checksumOut instanceof FileOutputStream) {
- ((FileOutputStream)checksumOut).getChannel().force(true);
+ sync((FileOutputStream)checksumOut);
}
}
+ /**
+ * Flush the data stream if it supports it.
+ */
+ public void flushDataOut() throws IOException {
+ flush(dataOut);
+ }
+
+ /**
+ * Flush the checksum stream if it supports it.
+ */
+ public void flushChecksumOut() throws IOException {
+ flush(checksumOut);
+ }
+
+ private void flush(OutputStream dos) throws IOException {
+ long begin = Time.monotonicNow();
+ dos.flush();
+ long duration = Time.monotonicNow() - begin;
+ LOG.trace("ReplicaOutputStreams#flush takes {} ms.", duration);
+ if (duration > slowLogThresholdMs) {
+ LOG.warn("Slow flush took {} ms (threshold={} ms)", duration,
+ slowLogThresholdMs);
+ }
+ }
+
+ private void sync(FileOutputStream fos) throws IOException {
+ long begin = Time.monotonicNow();
+ fos.getChannel().force(true);
+ long duration = Time.monotonicNow() - begin;
+ LOG.trace("ReplicaOutputStreams#sync takes {} ms.", duration);
+ if (duration > slowLogThresholdMs) {
+ LOG.warn("Slow fsync took {} ms (threshold={} ms)", duration,
+ slowLogThresholdMs);
+ }
+ }
+
+ public long writeToDisk(byte[] b, int off, int len) throws IOException {
+ long begin = Time.monotonicNow();
+ dataOut.write(b, off, len);
+ long duration = Time.monotonicNow() - begin;
+ LOG.trace("DatanodeIO#writeToDisk takes {} ms.", duration);
+ if (duration > slowLogThresholdMs) {
+ LOG.warn("Slow BlockReceiver write data to disk cost: {} ms " +
+ "(threshold={} ms)", duration, slowLogThresholdMs);
+ }
+ return duration;
+ }
+
+ public void syncFileRangeIfPossible(long offset, long nbytes,
+ int flags) throws NativeIOException {
+ assert this.outFd != null : "null outFd!";
+ NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, nbytes, flags);
+ }
+
+ public void dropCacheBehindWrites(String identifier,
+ long offset, long len, int flags) throws NativeIOException {
+ assert this.outFd != null : "null outFd!";
+ NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+ identifier, outFd, offset, len, flags);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 29dbb29..63e82f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -49,11 +49,13 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
+
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;
@@ -145,7 +147,7 @@ class BlockPoolSlice {
//
this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
if (tmpDir.exists()) {
- FileUtil.fullyDelete(tmpDir);
+ DataStorage.fullyDelete(tmpDir);
}
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
if (!rbwDir.mkdirs()) { // create rbw directory if not exist
@@ -436,7 +438,7 @@ class BlockPoolSlice {
final File targetMetaFile = new File(targetDir, metaFile.getName());
try {
- NativeIO.renameTo(metaFile, targetMetaFile);
+ LocalReplica.rename(metaFile, targetMetaFile);
} catch (IOException e) {
LOG.warn("Failed to move meta file from "
+ metaFile + " to " + targetMetaFile, e);
@@ -446,7 +448,7 @@ class BlockPoolSlice {
final File targetBlockFile = new File(targetDir, blockFile.getName());
try {
- NativeIO.renameTo(blockFile, targetBlockFile);
+ LocalReplica.rename(blockFile, targetBlockFile);
} catch (IOException e) {
LOG.warn("Failed to move block file from "
+ blockFile + " to " + targetBlockFile, e);
@@ -688,8 +690,6 @@ class BlockPoolSlice {
* @return the number of valid bytes
*/
private long validateIntegrityAndSetLength(File blockFile, long genStamp) {
- DataInputStream checksumIn = null;
- InputStream blockIn = null;
try {
final File metaFile = FsDatasetUtil.getMetaFile(blockFile, genStamp);
long blockFileLen = blockFile.length();
@@ -699,57 +699,52 @@ class BlockPoolSlice {
!metaFile.exists() || metaFileLen < crcHeaderLen) {
return 0;
}
- checksumIn = new DataInputStream(
+ try (DataInputStream checksumIn = new DataInputStream(
new BufferedInputStream(new FileInputStream(metaFile),
- ioFileBufferSize));
-
- // read and handle the common header here. For now just a version
- final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
- checksumIn, metaFile);
- int bytesPerChecksum = checksum.getBytesPerChecksum();
- int checksumSize = checksum.getChecksumSize();
- long numChunks = Math.min(
- (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum,
- (metaFileLen - crcHeaderLen)/checksumSize);
- if (numChunks == 0) {
- return 0;
- }
- IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize);
- blockIn = new FileInputStream(blockFile);
- long lastChunkStartPos = (numChunks-1)*bytesPerChecksum;
- IOUtils.skipFully(blockIn, lastChunkStartPos);
- int lastChunkSize = (int)Math.min(
- bytesPerChecksum, blockFileLen-lastChunkStartPos);
- byte[] buf = new byte[lastChunkSize+checksumSize];
- checksumIn.readFully(buf, lastChunkSize, checksumSize);
- IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
-
- checksum.update(buf, 0, lastChunkSize);
- long validFileLength;
- if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
- validFileLength = lastChunkStartPos + lastChunkSize;
- } else { // last chunck is corrupt
- validFileLength = lastChunkStartPos;
- }
-
- // truncate if extra bytes are present without CRC
- if (blockFile.length() > validFileLength) {
- RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
- try {
- // truncate blockFile
- blockRAF.setLength(validFileLength);
- } finally {
- blockRAF.close();
+ ioFileBufferSize))) {
+ // read and handle the common header here. For now just a version
+ final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
+ checksumIn, metaFile);
+ int bytesPerChecksum = checksum.getBytesPerChecksum();
+ int checksumSize = checksum.getChecksumSize();
+ long numChunks = Math.min(
+ (blockFileLen + bytesPerChecksum - 1) / bytesPerChecksum,
+ (metaFileLen - crcHeaderLen) / checksumSize);
+ if (numChunks == 0) {
+ return 0;
+ }
+ try (InputStream blockIn = new FileInputStream(blockFile);
+ ReplicaInputStreams ris = new ReplicaInputStreams(blockIn,
+ checksumIn, volume.obtainReference())) {
+ ris.skipChecksumFully((numChunks - 1) * checksumSize);
+ long lastChunkStartPos = (numChunks - 1) * bytesPerChecksum;
+ ris.skipDataFully(lastChunkStartPos);
+ int lastChunkSize = (int) Math.min(
+ bytesPerChecksum, blockFileLen - lastChunkStartPos);
+ byte[] buf = new byte[lastChunkSize + checksumSize];
+ ris.readChecksumFully(buf, lastChunkSize, checksumSize);
+ ris.readDataFully(buf, 0, lastChunkSize);
+ checksum.update(buf, 0, lastChunkSize);
+ long validFileLength;
+ if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
+ validFileLength = lastChunkStartPos + lastChunkSize;
+ } else { // last chunk is corrupt
+ validFileLength = lastChunkStartPos;
+ }
+ // truncate if extra bytes are present without CRC
+ if (blockFile.length() > validFileLength) {
+ try (RandomAccessFile blockRAF =
+ new RandomAccessFile(blockFile, "rw")) {
+ // truncate blockFile
+ blockRAF.setLength(validFileLength);
+ }
+ }
+ return validFileLength;
}
}
-
- return validFileLength;
} catch (IOException e) {
FsDatasetImpl.LOG.warn(e);
return 0;
- } finally {
- IOUtils.closeStream(checksumIn);
- IOUtils.closeStream(blockIn);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index b9c731b..97dcf8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
-import java.io.FileDescriptor;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
@@ -38,9 +37,9 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
/**
@@ -202,13 +201,13 @@ class FsDatasetAsyncDiskService {
}
public void submitSyncFileRangeRequest(FsVolumeImpl volume,
- final FileDescriptor fd, final long offset, final long nbytes,
+ final ReplicaOutputStreams streams, final long offset, final long nbytes,
final int flags) {
execute(volume, new Runnable() {
@Override
public void run() {
try {
- NativeIO.POSIX.syncFileRangeIfPossible(fd, offset, nbytes, flags);
+ streams.syncFileRangeIfPossible(offset, nbytes, flags);
} catch (NativeIOException e) {
LOG.warn("sync_file_range error", e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 954d6ef..6065df2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -21,7 +21,6 @@ import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
-import java.io.FileDescriptor;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -2755,9 +2754,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
- FileDescriptor fd, long offset, long nbytes, int flags) {
+ ReplicaOutputStreams outs, long offset, long nbytes, int flags) {
FsVolumeImpl fsVolumeImpl = this.getVolume(block);
- asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset,
+ asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, outs, offset,
nbytes, flags);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index a231e03..08564de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -1067,7 +1067,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
DataStorage.STORAGE_DIR_LAZY_PERSIST);
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
if (force) {
- FileUtil.fullyDelete(bpDir);
+ DataStorage.fullyDelete(bpDir);
} else {
if (!rbwDir.delete()) {
throw new IOException("Failed to delete " + rbwDir);
@@ -1081,7 +1081,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
!FileUtil.fullyDelete(lazypersistDir)))) {
throw new IOException("Failed to delete " + lazypersistDir);
}
- FileUtil.fullyDelete(tmpDir);
+ DataStorage.fullyDelete(tmpDir);
for (File f : FileUtil.listFiles(bpCurrentDir)) {
if (!f.delete()) {
throw new IOException("Failed to delete " + f);
@@ -1437,4 +1437,3 @@ public class FsVolumeImpl implements FsVolumeSpi {
replicaState);
}
}
-
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
index 20cec6a..e963d41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
@@ -701,7 +701,7 @@ public class TestFileAppend{
ReplicaBeingWritten rbw =
(ReplicaBeingWritten)replicaHandler.getReplica();
ReplicaOutputStreams
- outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM);
+ outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM, 300);
OutputStream dataOutput = outputStreams.getDataOut();
byte[] appendBytes = new byte[1];
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 5d63d07..ae52905 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
-import java.io.FileDescriptor;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -261,14 +260,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
synchronized public ReplicaOutputStreams createStreams(boolean isCreate,
- DataChecksum requestedChecksum) throws IOException {
+ DataChecksum requestedChecksum, long slowLogThresholdMs)
+ throws IOException {
if (finalized) {
throw new IOException("Trying to write to a finalized replica "
+ theBlock);
} else {
SimulatedOutputStream crcStream = new SimulatedOutputStream();
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
- volume.isTransientStorage());
+ volume.isTransientStorage(), slowLogThresholdMs);
}
}
@@ -1364,7 +1364,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
- FileDescriptor fd, long offset, long nbytes, int flags) {
+ ReplicaOutputStreams outs, long offset, long nbytes, int flags) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 619eda0..8439991 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -673,7 +673,7 @@ public class TestBlockRecovery {
ReplicaOutputStreams streams = null;
try {
streams = replicaInfo.createStreams(true,
- DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
+ DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
streams.getChecksumOut().write('a');
dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index 4e724bc7..fa980c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -83,7 +83,7 @@ public class TestSimulatedFSDataset {
ReplicaInPipeline bInfo = fsdataset.createRbw(
StorageType.DEFAULT, b, false).getReplica();
ReplicaOutputStreams out = bInfo.createStreams(true,
- DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
+ DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
try {
OutputStream dataOut = out.getDataOut();
assertEquals(0, fsdataset.getLength(b));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 4166346..2417c9d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -318,8 +318,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
- public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, FileDescriptor fd, long offset, long nbytes, int flags) {
-
+ public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
+ ReplicaOutputStreams outs, long offset, long nbytes, int flags) {
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df983b52/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
index 90c3b8a..6fa2830 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
@@ -58,8 +58,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipeline {
@Override
public ReplicaOutputStreams createStreams(boolean isCreate,
- DataChecksum requestedChecksum) throws IOException {
- return new ReplicaOutputStreams(null, null, requestedChecksum, false);
+ DataChecksum requestedChecksum, long slowLogThresholdMs)
+ throws IOException {
+ return new ReplicaOutputStreams(null, null, requestedChecksum, false,
+ slowLogThresholdMs);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org