You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/06/03 00:42:34 UTC

hadoop git commit: HDFS-8386. Improve synchronization of 'streamer' reference in DFSOutputStream. Contributed by Rakesh R.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 03fb5c642 -> efc510a57


HDFS-8386. Improve synchronization of 'streamer' reference in DFSOutputStream. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/efc510a5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/efc510a5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/efc510a5

Branch: refs/heads/trunk
Commit: efc510a570cf880e7df1b69932aa41932658ee51
Parents: 03fb5c6
Author: Andrew Wang <wa...@apache.org>
Authored: Tue Jun 2 15:39:24 2015 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Tue Jun 2 15:39:24 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 159 +++++++++++--------
 2 files changed, 92 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/efc510a5/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 0822f90..9d427ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -591,6 +591,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8489. Subclass BlockInfo to represent contiguous blocks.
     (Zhe Zhang via jing9)
 
+    HDFS-8386. Improve synchronization of 'streamer' reference in
+    DFSOutputStream. (Rakesh R via wang)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/efc510a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index ae5d3eb..1dc4a9f 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -139,7 +139,7 @@ public class DFSOutputStream extends FSOutputSummer
   @Override
   protected void checkClosed() throws IOException {
     if (isClosed()) {
-      streamer.getLastException().throwException4Close();
+      getStreamer().getLastException().throwException4Close();
     }
   }
 
@@ -148,10 +148,10 @@ public class DFSOutputStream extends FSOutputSummer
   //
   @VisibleForTesting
   public synchronized DatanodeInfo[] getPipeline() {
-    if (streamer.streamerClosed()) {
+    if (getStreamer().streamerClosed()) {
       return null;
     }
-    DatanodeInfo[] currentNodes = streamer.getNodes();
+    DatanodeInfo[] currentNodes = getStreamer().getNodes();
     if (currentNodes == null) {
       return null;
     }
@@ -293,9 +293,9 @@ public class DFSOutputStream extends FSOutputSummer
       // indicate that we are appending to an existing block
       streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum,
           cachingStrategy, byteArrayManager);
-      streamer.setBytesCurBlock(lastBlock.getBlockSize());
+      getStreamer().setBytesCurBlock(lastBlock.getBlockSize());
       adjustPacketChunkSize(stat);
-      streamer.setPipelineInConstruction(lastBlock);
+      getStreamer().setPipelineInConstruction(lastBlock);
     } else {
       computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
           bytesPerChecksum);
@@ -329,7 +329,7 @@ public class DFSOutputStream extends FSOutputSummer
       //
       computePacketChunkSize(0, freeInCksum);
       setChecksumBufSize(freeInCksum);
-      streamer.setAppendChunk(true);
+      getStreamer().setAppendChunk(true);
     } else {
       // if the remaining space in the block is smaller than
       // that expected size of of a packet, then create
@@ -392,36 +392,36 @@ public class DFSOutputStream extends FSOutputSummer
     }
 
     if (currentPacket == null) {
-      currentPacket = createPacket(packetSize, chunksPerPacket, 
-          streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false);
+      currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
+          .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
             currentPacket.getSeqno() +
             ", src=" + src +
             ", packetSize=" + packetSize +
             ", chunksPerPacket=" + chunksPerPacket +
-            ", bytesCurBlock=" + streamer.getBytesCurBlock());
+            ", bytesCurBlock=" + getStreamer().getBytesCurBlock());
       }
     }
 
     currentPacket.writeChecksum(checksum, ckoff, cklen);
     currentPacket.writeData(b, offset, len);
     currentPacket.incNumChunks();
-    streamer.incBytesCurBlock(len);
+    getStreamer().incBytesCurBlock(len);
 
     // If packet is full, enqueue it for transmission
     //
     if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
-        streamer.getBytesCurBlock() == blockSize) {
+        getStreamer().getBytesCurBlock() == blockSize) {
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
             currentPacket.getSeqno() +
             ", src=" + src +
-            ", bytesCurBlock=" + streamer.getBytesCurBlock() +
+            ", bytesCurBlock=" + getStreamer().getBytesCurBlock() +
             ", blockSize=" + blockSize +
-            ", appendChunk=" + streamer.getAppendChunk());
+            ", appendChunk=" + getStreamer().getAppendChunk());
       }
-      streamer.waitAndQueuePacket(currentPacket);
+      getStreamer().waitAndQueuePacket(currentPacket);
       currentPacket = null;
 
       adjustChunkBoundary();
@@ -436,14 +436,14 @@ public class DFSOutputStream extends FSOutputSummer
    * crc chunks from now on.
    */
   protected void adjustChunkBoundary() {
-    if (streamer.getAppendChunk() &&
-        streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
-      streamer.setAppendChunk(false);
+    if (getStreamer().getAppendChunk() &&
+        getStreamer().getBytesCurBlock() % bytesPerChecksum == 0) {
+      getStreamer().setAppendChunk(false);
       resetChecksumBufSize();
     }
 
-    if (!streamer.getAppendChunk()) {
-      int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()),
+    if (!getStreamer().getAppendChunk()) {
+      int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()),
           dfsClient.getConf().getWritePacketSize());
       computePacketChunkSize(psize, bytesPerChecksum);
     }
@@ -456,13 +456,13 @@ public class DFSOutputStream extends FSOutputSummer
    * @throws IOException
    */
   protected void endBlock() throws IOException {
-    if (streamer.getBytesCurBlock() == blockSize) {
-      currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
-          streamer.getAndIncCurrentSeqno(), true);
+    if (getStreamer().getBytesCurBlock() == blockSize) {
+      currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+          getStreamer().getAndIncCurrentSeqno(), true);
       currentPacket.setSyncBlock(shouldSyncBlock);
-      streamer.waitAndQueuePacket(currentPacket);
+      getStreamer().waitAndQueuePacket(currentPacket);
       currentPacket = null;
-      streamer.setBytesCurBlock(0);
+      getStreamer().setBytesCurBlock(0);
       lastFlushOffset = 0;
     }
   }
@@ -551,30 +551,33 @@ public class DFSOutputStream extends FSOutputSummer
 
         if (DFSClient.LOG.isDebugEnabled()) {
           DFSClient.LOG.debug("DFSClient flush(): "
-              + " bytesCurBlock=" + streamer.getBytesCurBlock()
+              + " bytesCurBlock=" + getStreamer().getBytesCurBlock()
               + " lastFlushOffset=" + lastFlushOffset
               + " createNewBlock=" + endBlock);
         }
         // Flush only if we haven't already flushed till this offset.
-        if (lastFlushOffset != streamer.getBytesCurBlock()) {
-          assert streamer.getBytesCurBlock() > lastFlushOffset;
+        if (lastFlushOffset != getStreamer().getBytesCurBlock()) {
+          assert getStreamer().getBytesCurBlock() > lastFlushOffset;
           // record the valid offset of this flush
-          lastFlushOffset = streamer.getBytesCurBlock();
+          lastFlushOffset = getStreamer().getBytesCurBlock();
           if (isSync && currentPacket == null && !endBlock) {
             // Nothing to send right now,
             // but sync was requested.
             // Send an empty packet if we do not end the block right now
             currentPacket = createPacket(packetSize, chunksPerPacket,
-                streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false);
+                getStreamer().getBytesCurBlock(), getStreamer()
+                    .getAndIncCurrentSeqno(), false);
           }
         } else {
-          if (isSync && streamer.getBytesCurBlock() > 0 && !endBlock) {
+          if (isSync && getStreamer().getBytesCurBlock() > 0 && !endBlock) {
             // Nothing to send right now,
             // and the block was partially written,
             // and sync was requested.
-            // So send an empty sync packet if we do not end the block right now
+            // So send an empty sync packet if we do not end the block right
+            // now
             currentPacket = createPacket(packetSize, chunksPerPacket,
-                streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false);
+                getStreamer().getBytesCurBlock(), getStreamer()
+                    .getAndIncCurrentSeqno(), false);
           } else if (currentPacket != null) {
             // just discard the current packet since it is already been sent.
             currentPacket.releaseBuffer(byteArrayManager);
@@ -583,42 +586,44 @@ public class DFSOutputStream extends FSOutputSummer
         }
         if (currentPacket != null) {
           currentPacket.setSyncBlock(isSync);
-          streamer.waitAndQueuePacket(currentPacket);
+          getStreamer().waitAndQueuePacket(currentPacket);
           currentPacket = null;
         }
-        if (endBlock && streamer.getBytesCurBlock() > 0) {
+        if (endBlock && getStreamer().getBytesCurBlock() > 0) {
           // Need to end the current block, thus send an empty packet to
           // indicate this is the end of the block and reset bytesCurBlock
-          currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
-              streamer.getAndIncCurrentSeqno(), true);
+          currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+              getStreamer().getAndIncCurrentSeqno(), true);
           currentPacket.setSyncBlock(shouldSyncBlock || isSync);
-          streamer.waitAndQueuePacket(currentPacket);
+          getStreamer().waitAndQueuePacket(currentPacket);
           currentPacket = null;
-          streamer.setBytesCurBlock(0);
+          getStreamer().setBytesCurBlock(0);
           lastFlushOffset = 0;
         } else {
           // Restore state of stream. Record the last flush offset
           // of the last full chunk that was flushed.
-          streamer.setBytesCurBlock(streamer.getBytesCurBlock() - numKept);
+          getStreamer().setBytesCurBlock(
+              getStreamer().getBytesCurBlock() - numKept);
         }
 
-        toWaitFor = streamer.getLastQueuedSeqno();
+        toWaitFor = getStreamer().getLastQueuedSeqno();
       } // end synchronized
 
-      streamer.waitForAckedSeqno(toWaitFor);
+      getStreamer().waitForAckedSeqno(toWaitFor);
 
       // update the block length first time irrespective of flag
-      if (updateLength || streamer.getPersistBlocks().get()) {
+      if (updateLength || getStreamer().getPersistBlocks().get()) {
         synchronized (this) {
-          if (!streamer.streamerClosed() && streamer.getBlock() != null) {
-            lastBlockLength = streamer.getBlock().getNumBytes();
+          if (!getStreamer().streamerClosed()
+              && getStreamer().getBlock() != null) {
+            lastBlockLength = getStreamer().getBlock().getNumBytes();
           }
         }
       }
       // If 1) any new blocks were allocated since the last flush, or 2) to
       // update length in NN is required, then persist block locations on
       // namenode.
-      if (streamer.getPersistBlocks().getAndSet(false) || updateLength) {
+      if (getStreamer().getPersistBlocks().getAndSet(false) || updateLength) {
         try {
           dfsClient.namenode.fsync(src, fileId, dfsClient.clientName,
               lastBlockLength);
@@ -635,8 +640,8 @@ public class DFSOutputStream extends FSOutputSummer
       }
 
       synchronized(this) {
-        if (!streamer.streamerClosed()) {
-          streamer.setHflush();
+        if (!getStreamer().streamerClosed()) {
+          getStreamer().setHflush();
         }
       }
     } catch (InterruptedIOException interrupt) {
@@ -648,7 +653,7 @@ public class DFSOutputStream extends FSOutputSummer
       DFSClient.LOG.warn("Error while syncing", e);
       synchronized (this) {
         if (!isClosed()) {
-          streamer.getLastException().set(e);
+          getStreamer().getLastException().set(e);
           closeThreads(true);
         }
       }
@@ -673,10 +678,10 @@ public class DFSOutputStream extends FSOutputSummer
   public synchronized int getCurrentBlockReplication() throws IOException {
     dfsClient.checkOpen();
     checkClosed();
-    if (streamer.streamerClosed()) {
+    if (getStreamer().streamerClosed()) {
       return blockReplication; // no pipeline, return repl factor of file
     }
-    DatanodeInfo[] currentNodes = streamer.getNodes();
+    DatanodeInfo[] currentNodes = getStreamer().getNodes();
     if (currentNodes == null) {
       return blockReplication; // no pipeline, return repl factor of file
     }
@@ -695,16 +700,16 @@ public class DFSOutputStream extends FSOutputSummer
       //
       // If there is data in the current buffer, send it across
       //
-      streamer.queuePacket(currentPacket);
+      getStreamer().queuePacket(currentPacket);
       currentPacket = null;
-      toWaitFor = streamer.getLastQueuedSeqno();
+      toWaitFor = getStreamer().getLastQueuedSeqno();
     }
 
-    streamer.waitForAckedSeqno(toWaitFor);
+    getStreamer().waitForAckedSeqno(toWaitFor);
   }
 
   protected synchronized void start() {
-    streamer.start();
+    getStreamer().start();
   }
   
   /**
@@ -715,32 +720,32 @@ public class DFSOutputStream extends FSOutputSummer
     if (isClosed()) {
       return;
     }
-    streamer.getLastException().set(new IOException("Lease timeout of "
+    getStreamer().getLastException().set(new IOException("Lease timeout of "
         + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired."));
     closeThreads(true);
     dfsClient.endFileLease(fileId);
   }
 
   boolean isClosed() {
-    return closed || streamer.streamerClosed();
+    return closed || getStreamer().streamerClosed();
   }
 
   void setClosed() {
     closed = true;
-    streamer.release();
+    getStreamer().release();
   }
 
   // shutdown datastreamer and responseprocessor threads.
   // interrupt datastreamer if force is true
   protected void closeThreads(boolean force) throws IOException {
     try {
-      streamer.close(force);
-      streamer.join();
-      streamer.closeSocket();
+      getStreamer().close(force);
+      getStreamer().join();
+      getStreamer().closeSocket();
     } catch (InterruptedException e) {
       throw new IOException("Failed to shutdown streamer");
     } finally {
-      streamer.setSocketToNull();
+      getStreamer().setSocketToNull();
       setClosed();
     }
   }
@@ -762,7 +767,7 @@ public class DFSOutputStream extends FSOutputSummer
 
   protected synchronized void closeImpl() throws IOException {
     if (isClosed()) {
-      streamer.getLastException().check(true);
+      getStreamer().getLastException().check(true);
       return;
     }
 
@@ -770,20 +775,20 @@ public class DFSOutputStream extends FSOutputSummer
       flushBuffer();       // flush from all upper layers
 
       if (currentPacket != null) {
-        streamer.waitAndQueuePacket(currentPacket);
+        getStreamer().waitAndQueuePacket(currentPacket);
         currentPacket = null;
       }
 
-      if (streamer.getBytesCurBlock() != 0) {
+      if (getStreamer().getBytesCurBlock() != 0) {
         // send an empty packet to mark the end of the block
-        currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
-            streamer.getAndIncCurrentSeqno(), true);
+        currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+            getStreamer().getAndIncCurrentSeqno(), true);
         currentPacket.setSyncBlock(shouldSyncBlock);
       }
 
       flushInternal();             // flush all data to Datanodes
       // get last block before destroying the streamer
-      ExtendedBlock lastBlock = streamer.getBlock();
+      ExtendedBlock lastBlock = getStreamer().getBlock();
       closeThreads(false);
       TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
       try {
@@ -841,7 +846,7 @@ public class DFSOutputStream extends FSOutputSummer
 
   @VisibleForTesting
   public void setArtificialSlowdown(long period) {
-    streamer.setArtificialSlowdown(period);
+    getStreamer().setArtificialSlowdown(period);
   }
 
   @VisibleForTesting
@@ -868,7 +873,7 @@ public class DFSOutputStream extends FSOutputSummer
    * Returns the access token currently used by streamer, for testing only
    */
   synchronized Token<BlockTokenIdentifier> getBlockToken() {
-    return streamer.getBlockToken();
+    return getStreamer().getBlockToken();
   }
 
   @Override
@@ -885,11 +890,25 @@ public class DFSOutputStream extends FSOutputSummer
 
   @VisibleForTesting
   ExtendedBlock getBlock() {
-    return streamer.getBlock();
+    return getStreamer().getBlock();
   }
 
   @VisibleForTesting
   public long getFileId() {
     return fileId;
   }
+
+  /**
+   * Set the data streamer object.
+   */
+  protected synchronized void setStreamer(DataStreamer streamer) {
+    this.streamer = streamer;
+  }
+
+  /**
+   * Returns the data streamer object.
+   */
+  protected synchronized DataStreamer getStreamer() {
+    return streamer;
+  }
 }