You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/06/03 00:42:34 UTC
hadoop git commit: HDFS-8386. Improve synchronization of 'streamer'
reference in DFSOutputStream. Contributed by Rakesh R.
Repository: hadoop
Updated Branches:
refs/heads/trunk 03fb5c642 -> efc510a57
HDFS-8386. Improve synchronization of 'streamer' reference in DFSOutputStream. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/efc510a5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/efc510a5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/efc510a5
Branch: refs/heads/trunk
Commit: efc510a570cf880e7df1b69932aa41932658ee51
Parents: 03fb5c6
Author: Andrew Wang <wa...@apache.org>
Authored: Tue Jun 2 15:39:24 2015 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Tue Jun 2 15:39:24 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 159 +++++++++++--------
2 files changed, 92 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/efc510a5/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 0822f90..9d427ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -591,6 +591,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8489. Subclass BlockInfo to represent contiguous blocks.
(Zhe Zhang via jing9)
+ HDFS-8386. Improve synchronization of 'streamer' reference in
+ DFSOutputStream. (Rakesh R via wang)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/efc510a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index ae5d3eb..1dc4a9f 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -139,7 +139,7 @@ public class DFSOutputStream extends FSOutputSummer
@Override
protected void checkClosed() throws IOException {
if (isClosed()) {
- streamer.getLastException().throwException4Close();
+ getStreamer().getLastException().throwException4Close();
}
}
@@ -148,10 +148,10 @@ public class DFSOutputStream extends FSOutputSummer
//
@VisibleForTesting
public synchronized DatanodeInfo[] getPipeline() {
- if (streamer.streamerClosed()) {
+ if (getStreamer().streamerClosed()) {
return null;
}
- DatanodeInfo[] currentNodes = streamer.getNodes();
+ DatanodeInfo[] currentNodes = getStreamer().getNodes();
if (currentNodes == null) {
return null;
}
@@ -293,9 +293,9 @@ public class DFSOutputStream extends FSOutputSummer
// indicate that we are appending to an existing block
streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum,
cachingStrategy, byteArrayManager);
- streamer.setBytesCurBlock(lastBlock.getBlockSize());
+ getStreamer().setBytesCurBlock(lastBlock.getBlockSize());
adjustPacketChunkSize(stat);
- streamer.setPipelineInConstruction(lastBlock);
+ getStreamer().setPipelineInConstruction(lastBlock);
} else {
computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
bytesPerChecksum);
@@ -329,7 +329,7 @@ public class DFSOutputStream extends FSOutputSummer
//
computePacketChunkSize(0, freeInCksum);
setChecksumBufSize(freeInCksum);
- streamer.setAppendChunk(true);
+ getStreamer().setAppendChunk(true);
} else {
// if the remaining space in the block is smaller than
// that expected size of of a packet, then create
@@ -392,36 +392,36 @@ public class DFSOutputStream extends FSOutputSummer
}
if (currentPacket == null) {
- currentPacket = createPacket(packetSize, chunksPerPacket,
- streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false);
+ currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
+ .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.getSeqno() +
", src=" + src +
", packetSize=" + packetSize +
", chunksPerPacket=" + chunksPerPacket +
- ", bytesCurBlock=" + streamer.getBytesCurBlock());
+ ", bytesCurBlock=" + getStreamer().getBytesCurBlock());
}
}
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.incNumChunks();
- streamer.incBytesCurBlock(len);
+ getStreamer().incBytesCurBlock(len);
// If packet is full, enqueue it for transmission
//
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
- streamer.getBytesCurBlock() == blockSize) {
+ getStreamer().getBytesCurBlock() == blockSize) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
currentPacket.getSeqno() +
", src=" + src +
- ", bytesCurBlock=" + streamer.getBytesCurBlock() +
+ ", bytesCurBlock=" + getStreamer().getBytesCurBlock() +
", blockSize=" + blockSize +
- ", appendChunk=" + streamer.getAppendChunk());
+ ", appendChunk=" + getStreamer().getAppendChunk());
}
- streamer.waitAndQueuePacket(currentPacket);
+ getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null;
adjustChunkBoundary();
@@ -436,14 +436,14 @@ public class DFSOutputStream extends FSOutputSummer
* crc chunks from now on.
*/
protected void adjustChunkBoundary() {
- if (streamer.getAppendChunk() &&
- streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
- streamer.setAppendChunk(false);
+ if (getStreamer().getAppendChunk() &&
+ getStreamer().getBytesCurBlock() % bytesPerChecksum == 0) {
+ getStreamer().setAppendChunk(false);
resetChecksumBufSize();
}
- if (!streamer.getAppendChunk()) {
- int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()),
+ if (!getStreamer().getAppendChunk()) {
+ int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()),
dfsClient.getConf().getWritePacketSize());
computePacketChunkSize(psize, bytesPerChecksum);
}
@@ -456,13 +456,13 @@ public class DFSOutputStream extends FSOutputSummer
* @throws IOException
*/
protected void endBlock() throws IOException {
- if (streamer.getBytesCurBlock() == blockSize) {
- currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
- streamer.getAndIncCurrentSeqno(), true);
+ if (getStreamer().getBytesCurBlock() == blockSize) {
+ currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+ getStreamer().getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock);
- streamer.waitAndQueuePacket(currentPacket);
+ getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null;
- streamer.setBytesCurBlock(0);
+ getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0;
}
}
@@ -551,30 +551,33 @@ public class DFSOutputStream extends FSOutputSummer
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient flush(): "
- + " bytesCurBlock=" + streamer.getBytesCurBlock()
+ + " bytesCurBlock=" + getStreamer().getBytesCurBlock()
+ " lastFlushOffset=" + lastFlushOffset
+ " createNewBlock=" + endBlock);
}
// Flush only if we haven't already flushed till this offset.
- if (lastFlushOffset != streamer.getBytesCurBlock()) {
- assert streamer.getBytesCurBlock() > lastFlushOffset;
+ if (lastFlushOffset != getStreamer().getBytesCurBlock()) {
+ assert getStreamer().getBytesCurBlock() > lastFlushOffset;
// record the valid offset of this flush
- lastFlushOffset = streamer.getBytesCurBlock();
+ lastFlushOffset = getStreamer().getBytesCurBlock();
if (isSync && currentPacket == null && !endBlock) {
// Nothing to send right now,
// but sync was requested.
// Send an empty packet if we do not end the block right now
currentPacket = createPacket(packetSize, chunksPerPacket,
- streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false);
+ getStreamer().getBytesCurBlock(), getStreamer()
+ .getAndIncCurrentSeqno(), false);
}
} else {
- if (isSync && streamer.getBytesCurBlock() > 0 && !endBlock) {
+ if (isSync && getStreamer().getBytesCurBlock() > 0 && !endBlock) {
// Nothing to send right now,
// and the block was partially written,
// and sync was requested.
- // So send an empty sync packet if we do not end the block right now
+ // So send an empty sync packet if we do not end the block right
+ // now
currentPacket = createPacket(packetSize, chunksPerPacket,
- streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false);
+ getStreamer().getBytesCurBlock(), getStreamer()
+ .getAndIncCurrentSeqno(), false);
} else if (currentPacket != null) {
// just discard the current packet since it is already been sent.
currentPacket.releaseBuffer(byteArrayManager);
@@ -583,42 +586,44 @@ public class DFSOutputStream extends FSOutputSummer
}
if (currentPacket != null) {
currentPacket.setSyncBlock(isSync);
- streamer.waitAndQueuePacket(currentPacket);
+ getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null;
}
- if (endBlock && streamer.getBytesCurBlock() > 0) {
+ if (endBlock && getStreamer().getBytesCurBlock() > 0) {
// Need to end the current block, thus send an empty packet to
// indicate this is the end of the block and reset bytesCurBlock
- currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
- streamer.getAndIncCurrentSeqno(), true);
+ currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+ getStreamer().getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock || isSync);
- streamer.waitAndQueuePacket(currentPacket);
+ getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null;
- streamer.setBytesCurBlock(0);
+ getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0;
} else {
// Restore state of stream. Record the last flush offset
// of the last full chunk that was flushed.
- streamer.setBytesCurBlock(streamer.getBytesCurBlock() - numKept);
+ getStreamer().setBytesCurBlock(
+ getStreamer().getBytesCurBlock() - numKept);
}
- toWaitFor = streamer.getLastQueuedSeqno();
+ toWaitFor = getStreamer().getLastQueuedSeqno();
} // end synchronized
- streamer.waitForAckedSeqno(toWaitFor);
+ getStreamer().waitForAckedSeqno(toWaitFor);
// update the block length first time irrespective of flag
- if (updateLength || streamer.getPersistBlocks().get()) {
+ if (updateLength || getStreamer().getPersistBlocks().get()) {
synchronized (this) {
- if (!streamer.streamerClosed() && streamer.getBlock() != null) {
- lastBlockLength = streamer.getBlock().getNumBytes();
+ if (!getStreamer().streamerClosed()
+ && getStreamer().getBlock() != null) {
+ lastBlockLength = getStreamer().getBlock().getNumBytes();
}
}
}
// If 1) any new blocks were allocated since the last flush, or 2) to
// update length in NN is required, then persist block locations on
// namenode.
- if (streamer.getPersistBlocks().getAndSet(false) || updateLength) {
+ if (getStreamer().getPersistBlocks().getAndSet(false) || updateLength) {
try {
dfsClient.namenode.fsync(src, fileId, dfsClient.clientName,
lastBlockLength);
@@ -635,8 +640,8 @@ public class DFSOutputStream extends FSOutputSummer
}
synchronized(this) {
- if (!streamer.streamerClosed()) {
- streamer.setHflush();
+ if (!getStreamer().streamerClosed()) {
+ getStreamer().setHflush();
}
}
} catch (InterruptedIOException interrupt) {
@@ -648,7 +653,7 @@ public class DFSOutputStream extends FSOutputSummer
DFSClient.LOG.warn("Error while syncing", e);
synchronized (this) {
if (!isClosed()) {
- streamer.getLastException().set(e);
+ getStreamer().getLastException().set(e);
closeThreads(true);
}
}
@@ -673,10 +678,10 @@ public class DFSOutputStream extends FSOutputSummer
public synchronized int getCurrentBlockReplication() throws IOException {
dfsClient.checkOpen();
checkClosed();
- if (streamer.streamerClosed()) {
+ if (getStreamer().streamerClosed()) {
return blockReplication; // no pipeline, return repl factor of file
}
- DatanodeInfo[] currentNodes = streamer.getNodes();
+ DatanodeInfo[] currentNodes = getStreamer().getNodes();
if (currentNodes == null) {
return blockReplication; // no pipeline, return repl factor of file
}
@@ -695,16 +700,16 @@ public class DFSOutputStream extends FSOutputSummer
//
// If there is data in the current buffer, send it across
//
- streamer.queuePacket(currentPacket);
+ getStreamer().queuePacket(currentPacket);
currentPacket = null;
- toWaitFor = streamer.getLastQueuedSeqno();
+ toWaitFor = getStreamer().getLastQueuedSeqno();
}
- streamer.waitForAckedSeqno(toWaitFor);
+ getStreamer().waitForAckedSeqno(toWaitFor);
}
protected synchronized void start() {
- streamer.start();
+ getStreamer().start();
}
/**
@@ -715,32 +720,32 @@ public class DFSOutputStream extends FSOutputSummer
if (isClosed()) {
return;
}
- streamer.getLastException().set(new IOException("Lease timeout of "
+ getStreamer().getLastException().set(new IOException("Lease timeout of "
+ (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired."));
closeThreads(true);
dfsClient.endFileLease(fileId);
}
boolean isClosed() {
- return closed || streamer.streamerClosed();
+ return closed || getStreamer().streamerClosed();
}
void setClosed() {
closed = true;
- streamer.release();
+ getStreamer().release();
}
// shutdown datastreamer and responseprocessor threads.
// interrupt datastreamer if force is true
protected void closeThreads(boolean force) throws IOException {
try {
- streamer.close(force);
- streamer.join();
- streamer.closeSocket();
+ getStreamer().close(force);
+ getStreamer().join();
+ getStreamer().closeSocket();
} catch (InterruptedException e) {
throw new IOException("Failed to shutdown streamer");
} finally {
- streamer.setSocketToNull();
+ getStreamer().setSocketToNull();
setClosed();
}
}
@@ -762,7 +767,7 @@ public class DFSOutputStream extends FSOutputSummer
protected synchronized void closeImpl() throws IOException {
if (isClosed()) {
- streamer.getLastException().check(true);
+ getStreamer().getLastException().check(true);
return;
}
@@ -770,20 +775,20 @@ public class DFSOutputStream extends FSOutputSummer
flushBuffer(); // flush from all upper layers
if (currentPacket != null) {
- streamer.waitAndQueuePacket(currentPacket);
+ getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null;
}
- if (streamer.getBytesCurBlock() != 0) {
+ if (getStreamer().getBytesCurBlock() != 0) {
// send an empty packet to mark the end of the block
- currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
- streamer.getAndIncCurrentSeqno(), true);
+ currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+ getStreamer().getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock);
}
flushInternal(); // flush all data to Datanodes
// get last block before destroying the streamer
- ExtendedBlock lastBlock = streamer.getBlock();
+ ExtendedBlock lastBlock = getStreamer().getBlock();
closeThreads(false);
TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
try {
@@ -841,7 +846,7 @@ public class DFSOutputStream extends FSOutputSummer
@VisibleForTesting
public void setArtificialSlowdown(long period) {
- streamer.setArtificialSlowdown(period);
+ getStreamer().setArtificialSlowdown(period);
}
@VisibleForTesting
@@ -868,7 +873,7 @@ public class DFSOutputStream extends FSOutputSummer
* Returns the access token currently used by streamer, for testing only
*/
synchronized Token<BlockTokenIdentifier> getBlockToken() {
- return streamer.getBlockToken();
+ return getStreamer().getBlockToken();
}
@Override
@@ -885,11 +890,25 @@ public class DFSOutputStream extends FSOutputSummer
@VisibleForTesting
ExtendedBlock getBlock() {
- return streamer.getBlock();
+ return getStreamer().getBlock();
}
@VisibleForTesting
public long getFileId() {
return fileId;
}
+
+ /**
+ * Set the data streamer object.
+ */
+ protected synchronized void setStreamer(DataStreamer streamer) {
+ this.streamer = streamer;
+ }
+
+ /**
+ * Returns the data streamer object.
+ */
+ protected synchronized DataStreamer getStreamer() {
+ return streamer;
+ }
}