You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2012/08/26 06:00:27 UTC

svn commit: r1377372 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

Author: suresh
Date: Sun Aug 26 04:00:26 2012
New Revision: 1377372

URL: http://svn.apache.org/viewvc?rev=1377372&view=rev
Log:
HDFS-3851. DFSOutputStream class code cleanup. Contributed by Jing Zhao.

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1377372&r1=1377371&r2=1377372&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sun Aug 26 04:00:26 2012
@@ -129,6 +129,8 @@ Trunk (unreleased changes)
     HDFS-3844. Add @Override and remove {@inheritdoc} and unnecessary
     imports. (Jing Zhao via suresh)
 
+    HDFS-3851. DFSOutputStream class code cleanup. (Jing Zhao via suresh)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1377372&r1=1377371&r2=1377372&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Sun Aug 26 04:00:26 2012
@@ -56,8 +56,8 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
@@ -107,8 +107,8 @@ import com.google.common.annotations.Vis
 ****************************************************************/
 @InterfaceAudience.Private
 public class DFSOutputStream extends FSOutputSummer implements Syncable {
-  private final DFSClient dfsClient;
   private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
+  private final DFSClient dfsClient;
   private Socket s;
   // closed is accessed by different threads under different locks.
   private volatile boolean closed = false;
@@ -138,15 +138,15 @@ public class DFSOutputStream extends FSO
   private final short blockReplication; // replication factor of file
   private boolean shouldSyncBlock = false; // force blocks to disk upon close
   
-  private class Packet {
-    long    seqno;               // sequencenumber of buffer in block
-    long    offsetInBlock;       // offset in block
-    private boolean lastPacketInBlock;   // is this the last packet in block?
-    boolean syncBlock;          // this packet forces the current block to disk
-    int     numChunks;           // number of chunks currently in packet
-    int     maxChunks;           // max chunks in packet
-
+  private static class Packet {
+    private static final long HEART_BEAT_SEQNO = -1L;
+    long seqno; // sequencenumber of buffer in block
+    final long offsetInBlock; // offset in block
+    boolean syncBlock; // this packet forces the current block to disk
+    int numChunks; // number of chunks currently in packet
+    final int maxChunks; // max chunks in packet
     byte[]  buf;
+    private boolean lastPacketInBlock; // is this the last packet in block?
 
     /**
      * buf is pointed into like follows:
@@ -164,45 +164,36 @@ public class DFSOutputStream extends FSO
      */
     int checksumStart;
     int checksumPos;
-    int dataStart;
+    final int dataStart;
     int dataPos;
 
-    private static final long HEART_BEAT_SEQNO = -1L;
-
     /**
      * Create a heartbeat packet.
      */
-    Packet() {
-      this.lastPacketInBlock = false;
-      this.numChunks = 0;
-      this.offsetInBlock = 0;
-      this.seqno = HEART_BEAT_SEQNO;
-      
-      buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
-      
-      checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN;
-      maxChunks = 0;
+    Packet(int checksumSize) {
+      this(0, 0, 0, HEART_BEAT_SEQNO, checksumSize);
     }
     
     /**
      * Create a new packet.
      * 
-     * @param pktSize maximum size of the packet, including checksum data and actual data.
+     * @param pktSize maximum size of the packet, 
+     *                including checksum data and actual data.
      * @param chunksPerPkt maximum number of chunks per packet.
      * @param offsetInBlock offset in bytes into the HDFS block.
      */
-    Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
+    Packet(int pktSize, int chunksPerPkt, long offsetInBlock, 
+                              long seqno, int checksumSize) {
       this.lastPacketInBlock = false;
       this.numChunks = 0;
       this.offsetInBlock = offsetInBlock;
-      this.seqno = currentSeqno;
-      currentSeqno++;
+      this.seqno = seqno;
       
       buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
       
       checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
       checksumPos = checksumStart;
-      dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize());
+      dataStart = checksumStart + (chunksPerPkt * checksumSize);
       dataPos = dataStart;
       maxChunks = chunksPerPkt;
     }
@@ -412,6 +403,7 @@ public class DFSOutputStream extends FSO
             response.join();
             response = null;
           } catch (InterruptedException  e) {
+            DFSClient.LOG.warn("Caught exception ", e);
           }
         }
 
@@ -439,6 +431,7 @@ public class DFSOutputStream extends FSO
               try {
                 dataQueue.wait(timeout);
               } catch (InterruptedException  e) {
+                DFSClient.LOG.warn("Caught exception ", e);
               }
               doSleep = false;
               now = Time.now();
@@ -448,7 +441,7 @@ public class DFSOutputStream extends FSO
             }
             // get packet to be sent.
             if (dataQueue.isEmpty()) {
-              one = new Packet();  // heartbeat packet
+              one = new Packet(checksum.getChecksumSize());  // heartbeat packet
             } else {
               one = dataQueue.getFirst(); // regular data packet
             }
@@ -488,6 +481,7 @@ public class DFSOutputStream extends FSO
                   // wait for acks to arrive from datanodes
                   dataQueue.wait(1000);
                 } catch (InterruptedException  e) {
+                  DFSClient.LOG.warn("Caught exception ", e);
                 }
               }
             }
@@ -518,7 +512,7 @@ public class DFSOutputStream extends FSO
             blockStream.flush();   
           } catch (IOException e) {
             // HDFS-3398 treat primary DN is down since client is unable to 
-            // write to primary DN 
+            // write to primary DN
             errorIndex = 0;
             throw e;
           }
@@ -607,6 +601,7 @@ public class DFSOutputStream extends FSO
           response.close();
           response.join();
         } catch (InterruptedException  e) {
+          DFSClient.LOG.warn("Caught exception ", e);
         } finally {
           response = null;
         }
@@ -1178,6 +1173,7 @@ public class DFSOutputStream extends FSO
                   Thread.sleep(sleeptime);
                   sleeptime *= 2;
                 } catch (InterruptedException ie) {
+                  DFSClient.LOG.warn("Caught exception ", ie);
                 }
               }
             } else {
@@ -1421,7 +1417,7 @@ public class DFSOutputStream extends FSO
 
     if (currentPacket == null) {
       currentPacket = new Packet(packetSize, chunksPerPacket, 
-          bytesCurBlock);
+          bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
             currentPacket.seqno +
@@ -1468,7 +1464,8 @@ public class DFSOutputStream extends FSO
       // indicate the end of block and reset bytesCurBlock.
       //
       if (bytesCurBlock == blockSize) {
-        currentPacket = new Packet(0, 0, bytesCurBlock);
+        currentPacket = new Packet(0, 0, bytesCurBlock, 
+            currentSeqno++, this.checksum.getChecksumSize());
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
         waitAndQueueCurrentPacket();
@@ -1540,7 +1537,7 @@ public class DFSOutputStream extends FSO
             // but sync was requested.
             // Send an empty packet
             currentPacket = new Packet(packetSize, chunksPerPacket,
-                bytesCurBlock);
+                bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
           }
         } else {
           // We already flushed up to this offset.
@@ -1557,7 +1554,7 @@ public class DFSOutputStream extends FSO
             // and sync was requested.
             // So send an empty sync packet.
             currentPacket = new Packet(packetSize, chunksPerPacket,
-                bytesCurBlock);
+                bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
           } else {
             // just discard the current packet since it is already been sent.
             currentPacket = null;
@@ -1738,7 +1735,8 @@ public class DFSOutputStream extends FSO
 
       if (bytesCurBlock != 0) {
         // send an empty packet to mark the end of the block
-        currentPacket = new Packet(0, 0, bytesCurBlock);
+        currentPacket = new Packet(0, 0, bytesCurBlock, 
+            currentSeqno++, this.checksum.getChecksumSize());
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
       }
@@ -1778,6 +1776,7 @@ public class DFSOutputStream extends FSO
             DFSClient.LOG.info("Could not complete file " + src + " retrying...");
           }
         } catch (InterruptedException ie) {
+          DFSClient.LOG.warn("Caught exception ", ie);
         }
       }
     }