You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/06/18 17:48:31 UTC

hadoop git commit: HDFS-8605. Merge Refactor of DFSOutputStream from HDFS-7285 branch. (vinayakumarb)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 2ad668748 -> 1c13519e1


HDFS-8605. Merge Refactor of DFSOutputStream from HDFS-7285 branch. (vinayakumarb)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1c13519e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1c13519e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1c13519e

Branch: refs/heads/trunk
Commit: 1c13519e1e7588c3e2974138d37bf3449ca8b3df
Parents: 2ad6687
Author: Andrew Wang <wa...@apache.org>
Authored: Thu Jun 18 08:48:09 2015 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Thu Jun 18 08:48:09 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 59 ++++++++++----------
 .../org/apache/hadoop/hdfs/DataStreamer.java    |  7 ++-
 3 files changed, 40 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c13519e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 2545bcf..a61cf78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -656,6 +656,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-6249. Output AclEntry in PBImageXmlWriter.
     (surendra singh lilhore via aajisaka)
 
+    HDFS-8605. Merge Refactor of DFSOutputStream from HDFS-7285 branch.
+    (vinayakumarb via wang)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c13519e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 4622be6..c16aef2 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.util.Time;
 import org.apache.htrace.Sampler;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -86,6 +88,7 @@ import com.google.common.base.Preconditions;
 @InterfaceAudience.Private
 public class DFSOutputStream extends FSOutputSummer
     implements Syncable, CanSetDropBehind {
+  static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class);
   /**
    * Number of times to retry creating a file when there are transient 
    * errors (typically related to encryption zones and KeyProvider operations).
@@ -413,21 +416,30 @@ public class DFSOutputStream extends FSOutputSummer
     //
     if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
         getStreamer().getBytesCurBlock() == blockSize) {
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
-            currentPacket.getSeqno() +
-            ", src=" + src +
-            ", bytesCurBlock=" + getStreamer().getBytesCurBlock() +
-            ", blockSize=" + blockSize +
-            ", appendChunk=" + getStreamer().getAppendChunk());
-      }
-      getStreamer().waitAndQueuePacket(currentPacket);
-      currentPacket = null;
+      enqueueCurrentPacketFull();
+    }
+  }
 
-      adjustChunkBoundary();
+  void enqueueCurrentPacket() throws IOException {
+    getStreamer().waitAndQueuePacket(currentPacket);
+    currentPacket = null;
+  }
 
-      endBlock();
-    }
+  void enqueueCurrentPacketFull() throws IOException {
+    LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
+        + " appendChunk={}, {}", currentPacket, src, getStreamer()
+        .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
+        getStreamer());
+    enqueueCurrentPacket();
+    adjustChunkBoundary();
+    endBlock();
+  }
+
+  /** create an empty packet to mark the end of the block. */
+  void setCurrentPacketToEmpty() throws InterruptedIOException {
+    currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+        getStreamer().getAndIncCurrentSeqno(), true);
+    currentPacket.setSyncBlock(shouldSyncBlock);
   }
 
   /**
@@ -457,11 +469,8 @@ public class DFSOutputStream extends FSOutputSummer
    */
   protected void endBlock() throws IOException {
     if (getStreamer().getBytesCurBlock() == blockSize) {
-      currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
-          getStreamer().getAndIncCurrentSeqno(), true);
-      currentPacket.setSyncBlock(shouldSyncBlock);
-      getStreamer().waitAndQueuePacket(currentPacket);
-      currentPacket = null;
+      setCurrentPacketToEmpty();
+      enqueueCurrentPacket();
       getStreamer().setBytesCurBlock(0);
       lastFlushOffset = 0;
     }
@@ -586,8 +595,7 @@ public class DFSOutputStream extends FSOutputSummer
         }
         if (currentPacket != null) {
           currentPacket.setSyncBlock(isSync);
-          getStreamer().waitAndQueuePacket(currentPacket);
-          currentPacket = null;
+          enqueueCurrentPacket();
         }
         if (endBlock && getStreamer().getBytesCurBlock() > 0) {
           // Need to end the current block, thus send an empty packet to
@@ -595,8 +603,7 @@ public class DFSOutputStream extends FSOutputSummer
           currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
               getStreamer().getAndIncCurrentSeqno(), true);
           currentPacket.setSyncBlock(shouldSyncBlock || isSync);
-          getStreamer().waitAndQueuePacket(currentPacket);
-          currentPacket = null;
+          enqueueCurrentPacket();
           getStreamer().setBytesCurBlock(0);
           lastFlushOffset = 0;
         } else {
@@ -775,15 +782,11 @@ public class DFSOutputStream extends FSOutputSummer
       flushBuffer();       // flush from all upper layers
 
       if (currentPacket != null) {
-        getStreamer().waitAndQueuePacket(currentPacket);
-        currentPacket = null;
+        enqueueCurrentPacket();
       }
 
       if (getStreamer().getBytesCurBlock() != 0) {
-        // send an empty packet to mark the end of the block
-        currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
-            getStreamer().getAndIncCurrentSeqno(), true);
-        currentPacket.setSyncBlock(shouldSyncBlock);
+        setCurrentPacketToEmpty();
       }
 
       flushInternal();             // flush all data to Datanodes

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c13519e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index cecd5a0..8dd85b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -44,7 +44,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
@@ -1901,4 +1900,10 @@ class DataStreamer extends Daemon {
       s.close();
     }
   }
+
+  @Override
+  public String toString() {
+    return  (block == null? null: block.getLocalBlock())
+        + "@" + Arrays.toString(getNodes());
+  }
 }