You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2015/04/02 19:59:48 UTC

hadoop git commit: HDFS-7888. Change DFSOutputStream and DataStreamer for convenience of subclassing. Contributed by Li Bo

Repository: hadoop
Updated Branches:
  refs/heads/trunk 867d5d267 -> 9ed43f218


HDFS-7888. Change DFSOutputStream and DataStreamer for convenience of subclassing. Contributed by Li Bo


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9ed43f21
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9ed43f21
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9ed43f21

Branch: refs/heads/trunk
Commit: 9ed43f2189fb4674b7379e8e995d53d4970d5c3a
Parents: 867d5d2
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Thu Apr 2 10:59:26 2015 -0700
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Thu Apr 2 10:59:26 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 116 ++++++++++---------
 .../org/apache/hadoop/hdfs/DataStreamer.java    |  13 +--
 3 files changed, 69 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ed43f21/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 80d958d..0c66309 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -376,6 +376,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-7978. Add LOG.isDebugEnabled() guard for some LOG.debug(..).
     (Walter Su via wang)
 
+    HDFS-7888. Change DFSOutputStream and DataStreamer for convenience of
+    subclassing. (Li Bo via szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ed43f21/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index c88639d..f6733e3 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.net.Socket;
 import java.nio.channels.ClosedChannelException;
 import java.util.EnumSet;
 import java.util.concurrent.atomic.AtomicReference;
@@ -95,29 +94,29 @@ public class DFSOutputStream extends FSOutputSummer
   static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS =
       CryptoProtocolVersion.supported();
 
-  private final DFSClient dfsClient;
-  private final ByteArrayManager byteArrayManager;
+  protected final DFSClient dfsClient;
+  protected final ByteArrayManager byteArrayManager;
   // closed is accessed by different threads under different locks.
-  private volatile boolean closed = false;
-
-  private final String src;
-  private final long fileId;
-  private final long blockSize;
-  private final int bytesPerChecksum;
-
-  private DFSPacket currentPacket = null;
-  private DataStreamer streamer;
-  private int packetSize = 0; // write packet size, not including the header.
-  private int chunksPerPacket = 0;
-  private long lastFlushOffset = 0; // offset when flush was invoked
+  protected volatile boolean closed = false;
+
+  protected final String src;
+  protected final long fileId;
+  protected final long blockSize;
+  protected final int bytesPerChecksum;
+
+  protected DFSPacket currentPacket = null;
+  protected DataStreamer streamer;
+  protected int packetSize = 0; // write packet size, not including the header.
+  protected int chunksPerPacket = 0;
+  protected long lastFlushOffset = 0; // offset when flush was invoked
   private long initialFileSize = 0; // at time of file open
   private final short blockReplication; // replication factor of file
-  private boolean shouldSyncBlock = false; // force blocks to disk upon close
-  private final AtomicReference<CachingStrategy> cachingStrategy;
+  protected boolean shouldSyncBlock = false; // force blocks to disk upon close
+  protected final AtomicReference<CachingStrategy> cachingStrategy;
   private FileEncryptionInfo fileEncryptionInfo;
 
   /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
-  private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
+  protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
       long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
     final byte[] buf;
     final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
@@ -206,7 +205,7 @@ public class DFSOutputStream extends FSOutputSummer
   }
 
   /** Construct a new output stream for creating a file. */
-  private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
+  protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
       EnumSet<CreateFlag> flag, Progressable progress,
       DataChecksum checksum, String[] favoredNodes) throws IOException {
     this(dfsClient, src, progress, stat, checksum);
@@ -359,7 +358,7 @@ public class DFSOutputStream extends FSOutputSummer
     }
   }
 
-  private void computePacketChunkSize(int psize, int csize) {
+  protected void computePacketChunkSize(int psize, int csize) {
     final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
     final int chunkSize = csize + getChecksumSize();
     chunksPerPacket = Math.max(bodySize/chunkSize, 1);
@@ -426,33 +425,46 @@ public class DFSOutputStream extends FSOutputSummer
       streamer.waitAndQueuePacket(currentPacket);
       currentPacket = null;
 
-      // If the reopened file did not end at chunk boundary and the above
-      // write filled up its partial chunk. Tell the summer to generate full 
-      // crc chunks from now on.
-      if (streamer.getAppendChunk() &&
-          streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
-        streamer.setAppendChunk(false);
-        resetChecksumBufSize();
-      }
+      adjustChunkBoundary();
 
-      if (!streamer.getAppendChunk()) {
-        int psize = Math.min((int)(blockSize-streamer.getBytesCurBlock()),
-            dfsClient.getConf().writePacketSize);
-        computePacketChunkSize(psize, bytesPerChecksum);
-      }
-      //
-      // if encountering a block boundary, send an empty packet to 
-      // indicate the end of block and reset bytesCurBlock.
-      //
-      if (streamer.getBytesCurBlock() == blockSize) {
-        currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
-            streamer.getAndIncCurrentSeqno(), true);
-        currentPacket.setSyncBlock(shouldSyncBlock);
-        streamer.waitAndQueuePacket(currentPacket);
-        currentPacket = null;
-        streamer.setBytesCurBlock(0);
-        lastFlushOffset = 0;
-      }
+      endBlock();
+    }
+  }
+
+  /**
+   * If the reopened file did not end at chunk boundary and the above
+   * write filled up its partial chunk. Tell the summer to generate full
+   * crc chunks from now on.
+   */
+  protected void adjustChunkBoundary() {
+    if (streamer.getAppendChunk() &&
+        streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
+      streamer.setAppendChunk(false);
+      resetChecksumBufSize();
+    }
+
+    if (!streamer.getAppendChunk()) {
+      int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()),
+          dfsClient.getConf().writePacketSize);
+      computePacketChunkSize(psize, bytesPerChecksum);
+    }
+  }
+
+  /**
+   * if encountering a block boundary, send an empty packet to
+   * indicate the end of block and reset bytesCurBlock.
+   *
+   * @throws IOException
+   */
+  protected void endBlock() throws IOException {
+    if (streamer.getBytesCurBlock() == blockSize) {
+      currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
+          streamer.getAndIncCurrentSeqno(), true);
+      currentPacket.setSyncBlock(shouldSyncBlock);
+      streamer.waitAndQueuePacket(currentPacket);
+      currentPacket = null;
+      streamer.setBytesCurBlock(0);
+      lastFlushOffset = 0;
     }
   }
   
@@ -676,7 +688,7 @@ public class DFSOutputStream extends FSOutputSummer
    * Waits till all existing data is flushed and confirmations 
    * received from datanodes. 
    */
-  private void flushInternal() throws IOException {
+  protected void flushInternal() throws IOException {
     long toWaitFor;
     synchronized (this) {
       dfsClient.checkOpen();
@@ -692,7 +704,7 @@ public class DFSOutputStream extends FSOutputSummer
     streamer.waitForAckedSeqno(toWaitFor);
   }
 
-  private synchronized void start() {
+  protected synchronized void start() {
     streamer.start();
   }
   
@@ -721,7 +733,7 @@ public class DFSOutputStream extends FSOutputSummer
 
   // shutdown datastreamer and responseprocessor threads.
   // interrupt datastreamer if force is true
-  private void closeThreads(boolean force) throws IOException {
+  protected void closeThreads(boolean force) throws IOException {
     try {
       streamer.close(force);
       streamer.join();
@@ -749,7 +761,7 @@ public class DFSOutputStream extends FSOutputSummer
     }
   }
 
-  private synchronized void closeImpl() throws IOException {
+  protected synchronized void closeImpl() throws IOException {
     if (isClosed()) {
       IOException e = streamer.getLastException().getAndSet(null);
       if (e == null)
@@ -761,7 +773,7 @@ public class DFSOutputStream extends FSOutputSummer
     try {
       flushBuffer();       // flush from all upper layers
 
-      if (currentPacket != null) { 
+      if (currentPacket != null) {
         streamer.waitAndQueuePacket(currentPacket);
         currentPacket = null;
       }
@@ -792,7 +804,7 @@ public class DFSOutputStream extends FSOutputSummer
 
   // should be called holding (this) lock since setTestFilename() may 
   // be called during unit tests
-  private void completeFile(ExtendedBlock last) throws IOException {
+  protected void completeFile(ExtendedBlock last) throws IOException {
     long localstart = Time.monotonicNow();
     long sleeptime = dfsClient.getConf().
         blockWriteLocateFollowingInitialDelayMs;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ed43f21/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 6ff4c24..6bcbfde 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -1519,7 +1519,7 @@ class DataStreamer extends Daemon {
     }
   }
 
-  private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
+  protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
       throws IOException {
     int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
     long sleeptime = dfsClient.getConf().
@@ -1729,15 +1729,6 @@ class DataStreamer extends Daemon {
   }
 
   /**
-   * get the socket connecting to the first datanode in pipeline
-   *
-   * @return socket connecting to the first datanode in pipeline
-   */
-  Socket getSocket() {
-    return s;
-  }
-
-  /**
    * set socket to null
    */
   void setSocketToNull() {
@@ -1814,4 +1805,4 @@ class DataStreamer extends Daemon {
       s.close();
     }
   }
-}
\ No newline at end of file
+}