You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/30 17:41:38 UTC

[38/58] [abbrv] hadoop git commit: Merge remote-tracking branch 'apache/trunk' into HDFS-7285

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 0000000,7a40d73..78eaa6c
mode 000000,100755..100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@@ -1,0 -1,917 +1,982 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hdfs;
+ 
+ import java.io.FileNotFoundException;
+ import java.io.IOException;
+ import java.io.InterruptedIOException;
+ import java.nio.channels.ClosedChannelException;
+ import java.util.EnumSet;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import org.apache.hadoop.HadoopIllegalArgumentException;
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.crypto.CryptoProtocolVersion;
+ import org.apache.hadoop.fs.CanSetDropBehind;
+ import org.apache.hadoop.fs.CreateFlag;
+ import org.apache.hadoop.fs.FSOutputSummer;
+ import org.apache.hadoop.fs.FileAlreadyExistsException;
+ import org.apache.hadoop.fs.FileEncryptionInfo;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.ParentNotDirectoryException;
+ import org.apache.hadoop.fs.Syncable;
+ import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+ import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+ import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+ import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
+ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
++import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+ import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
+ import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
+ import org.apache.hadoop.hdfs.util.ByteArrayManager;
+ import org.apache.hadoop.io.EnumSetWritable;
+ import org.apache.hadoop.ipc.RemoteException;
+ import org.apache.hadoop.security.AccessControlException;
+ import org.apache.hadoop.security.token.Token;
+ import org.apache.hadoop.util.DataChecksum;
+ import org.apache.hadoop.util.DataChecksum.Type;
+ import org.apache.hadoop.util.Progressable;
+ import org.apache.hadoop.util.Time;
+ import org.apache.htrace.core.TraceScope;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Preconditions;
+ 
+ 
+ /****************************************************************
+  * DFSOutputStream creates files from a stream of bytes.
+  *
+  * The client application writes data that is cached internally by
+  * this stream. Data is broken up into packets, each packet is
+  * typically 64K in size. A packet comprises of chunks. Each chunk
+  * is typically 512 bytes and has an associated checksum with it.
+  *
+  * When a client application fills up the currentPacket, it is
+  * enqueued into the dataQueue of DataStreamer. DataStreamer is a
+  * thread that picks up packets from the dataQueue and sends it to
+  * the first datanode in the pipeline.
+  *
+  ****************************************************************/
+ @InterfaceAudience.Private
+ public class DFSOutputStream extends FSOutputSummer
+     implements Syncable, CanSetDropBehind {
+   static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class);
+   /**
+    * Number of times to retry creating a file when there are transient 
+    * errors (typically related to encryption zones and KeyProvider operations).
+    */
+   @VisibleForTesting
+   static final int CREATE_RETRY_COUNT = 10;
+   @VisibleForTesting
+   static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS =
+       CryptoProtocolVersion.supported();
+ 
+   protected final DFSClient dfsClient;
+   protected final ByteArrayManager byteArrayManager;
+   // closed is accessed by different threads under different locks.
+   protected volatile boolean closed = false;
+ 
+   protected final String src;
+   protected final long fileId;
+   protected final long blockSize;
+   protected final int bytesPerChecksum;
+ 
+   protected DFSPacket currentPacket = null;
 -  private DataStreamer streamer;
++  protected DataStreamer streamer;
+   protected int packetSize = 0; // write packet size, not including the header.
+   protected int chunksPerPacket = 0;
+   protected long lastFlushOffset = 0; // offset when flush was invoked
+   private long initialFileSize = 0; // at time of file open
+   private final short blockReplication; // replication factor of file
+   protected boolean shouldSyncBlock = false; // force blocks to disk upon close
+   protected final AtomicReference<CachingStrategy> cachingStrategy;
+   private FileEncryptionInfo fileEncryptionInfo;
+ 
+   /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
+   protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
+       long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
+     final byte[] buf;
+     final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
+ 
+     try {
+       buf = byteArrayManager.newByteArray(bufferSize);
+     } catch (InterruptedException ie) {
+       final InterruptedIOException iioe = new InterruptedIOException(
+           "seqno=" + seqno);
+       iioe.initCause(ie);
+       throw iioe;
+     }
+ 
+     return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno,
 -                         getChecksumSize(), lastPacketInBlock);
++        getChecksumSize(), lastPacketInBlock);
+   }
+ 
+   @Override
+   protected void checkClosed() throws IOException {
+     if (isClosed()) {
+       getStreamer().getLastException().throwException4Close();
+     }
+   }
+ 
+   //
+   // returns the list of targets, if any, that is being currently used.
+   //
+   @VisibleForTesting
+   public synchronized DatanodeInfo[] getPipeline() {
+     if (getStreamer().streamerClosed()) {
+       return null;
+     }
+     DatanodeInfo[] currentNodes = getStreamer().getNodes();
+     if (currentNodes == null) {
+       return null;
+     }
+     DatanodeInfo[] value = new DatanodeInfo[currentNodes.length];
+     for (int i = 0; i < currentNodes.length; i++) {
+       value[i] = currentNodes[i];
+     }
+     return value;
+   }
+ 
 -  /** 
++  /**
+    * @return the object for computing checksum.
+    *         The type is NULL if checksum is not computed.
+    */
+   private static DataChecksum getChecksum4Compute(DataChecksum checksum,
+       HdfsFileStatus stat) {
+     if (DataStreamer.isLazyPersist(stat) && stat.getReplication() == 1) {
+       // do not compute checksum for writing to single replica to memory
+       return DataChecksum.newDataChecksum(Type.NULL,
+           checksum.getBytesPerChecksum());
+     }
+     return checksum;
+   }
 - 
++
+   private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
+       HdfsFileStatus stat, DataChecksum checksum) throws IOException {
+     super(getChecksum4Compute(checksum, stat));
+     this.dfsClient = dfsClient;
+     this.src = src;
+     this.fileId = stat.getFileId();
+     this.blockSize = stat.getBlockSize();
+     this.blockReplication = stat.getReplication();
+     this.fileEncryptionInfo = stat.getFileEncryptionInfo();
+     this.cachingStrategy = new AtomicReference<CachingStrategy>(
+         dfsClient.getDefaultWriteCachingStrategy());
+     if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
+       DFSClient.LOG.debug(
+           "Set non-null progress callback on DFSOutputStream " + src);
+     }
 -    
++
+     this.bytesPerChecksum = checksum.getBytesPerChecksum();
+     if (bytesPerChecksum <= 0) {
+       throw new HadoopIllegalArgumentException(
+           "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0");
+     }
+     if (blockSize % bytesPerChecksum != 0) {
+       throw new HadoopIllegalArgumentException("Invalid values: "
+           + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
+           + ") must divide block size (=" + blockSize + ").");
+     }
+     this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
+   }
+ 
+   /** Construct a new output stream for creating a file. */
+   protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
+       EnumSet<CreateFlag> flag, Progressable progress,
 -      DataChecksum checksum, String[] favoredNodes) throws IOException {
++      DataChecksum checksum, String[] favoredNodes, boolean createStreamer)
++      throws IOException {
+     this(dfsClient, src, progress, stat, checksum);
+     this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
+ 
+     computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum);
+ 
 -    streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
 -        cachingStrategy, byteArrayManager, favoredNodes);
++    if (createStreamer) {
++      streamer = new DataStreamer(stat, null, dfsClient, src, progress,
++          checksum, cachingStrategy, byteArrayManager, favoredNodes);
++    }
+   }
+ 
+   static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
+       FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
+       short replication, long blockSize, Progressable progress, int buffersize,
+       DataChecksum checksum, String[] favoredNodes) throws IOException {
+     TraceScope scope =
+         dfsClient.newPathTraceScope("newStreamForCreate", src);
+     try {
+       HdfsFileStatus stat = null;
+ 
+       // Retry the create if we get a RetryStartFileException up to a maximum
+       // number of times
+       boolean shouldRetry = true;
+       int retryCount = CREATE_RETRY_COUNT;
+       while (shouldRetry) {
+         shouldRetry = false;
+         try {
+           stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
+               new EnumSetWritable<CreateFlag>(flag), createParent, replication,
+               blockSize, SUPPORTED_CRYPTO_VERSIONS);
+           break;
+         } catch (RemoteException re) {
+           IOException e = re.unwrapRemoteException(
+               AccessControlException.class,
+               DSQuotaExceededException.class,
+               QuotaByStorageTypeExceededException.class,
+               FileAlreadyExistsException.class,
+               FileNotFoundException.class,
+               ParentNotDirectoryException.class,
+               NSQuotaExceededException.class,
+               RetryStartFileException.class,
+               SafeModeException.class,
+               UnresolvedPathException.class,
+               SnapshotAccessControlException.class,
+               UnknownCryptoProtocolVersionException.class);
+           if (e instanceof RetryStartFileException) {
+             if (retryCount > 0) {
+               shouldRetry = true;
+               retryCount--;
+             } else {
+               throw new IOException("Too many retries because of encryption" +
+                   " zone operations", e);
+             }
+           } else {
+             throw e;
+           }
+         }
+       }
+       Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
 -      final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
 -          flag, progress, checksum, favoredNodes);
++      final DFSOutputStream out;
++      if(stat.getErasureCodingPolicy() != null) {
++        out = new DFSStripedOutputStream(dfsClient, src, stat,
++            flag, progress, checksum, favoredNodes);
++      } else {
++        out = new DFSOutputStream(dfsClient, src, stat,
++            flag, progress, checksum, favoredNodes, true);
++      }
+       out.start();
+       return out;
+     } finally {
+       scope.close();
+     }
+   }
+ 
+   /** Construct a new output stream for append. */
+   private DFSOutputStream(DFSClient dfsClient, String src,
+       EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
+       HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
 -          throws IOException {
++      throws IOException {
+     this(dfsClient, src, progress, stat, checksum);
+     initialFileSize = stat.getLen(); // length of file when opened
+     this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);
+ 
+     boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK);
+ 
+     this.fileEncryptionInfo = stat.getFileEncryptionInfo();
+ 
+     // The last partial block of the file has to be filled.
+     if (!toNewBlock && lastBlock != null) {
+       // indicate that we are appending to an existing block
+       streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum,
+           cachingStrategy, byteArrayManager);
+       getStreamer().setBytesCurBlock(lastBlock.getBlockSize());
+       adjustPacketChunkSize(stat);
+       getStreamer().setPipelineInConstruction(lastBlock);
+     } else {
+       computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
+           bytesPerChecksum);
+       streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null,
+           dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
+           favoredNodes);
+     }
+   }
+ 
+   private void adjustPacketChunkSize(HdfsFileStatus stat) throws IOException{
+ 
+     long usedInLastBlock = stat.getLen() % blockSize;
+     int freeInLastBlock = (int)(blockSize - usedInLastBlock);
+ 
+     // calculate the amount of free space in the pre-existing
+     // last crc chunk
+     int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
+     int freeInCksum = bytesPerChecksum - usedInCksum;
+ 
+     // if there is space in the last block, then we have to
+     // append to that block
+     if (freeInLastBlock == blockSize) {
+       throw new IOException("The last block for file " +
+           src + " is full.");
+     }
+ 
+     if (usedInCksum > 0 && freeInCksum > 0) {
+       // if there is space in the last partial chunk, then
+       // setup in such a way that the next packet will have only
+       // one chunk that fills up the partial chunk.
+       //
+       computePacketChunkSize(0, freeInCksum);
+       setChecksumBufSize(freeInCksum);
+       getStreamer().setAppendChunk(true);
+     } else {
+       // if the remaining space in the block is smaller than
+       // that expected size of of a packet, then create
+       // smaller size packet.
+       //
+       computePacketChunkSize(
+           Math.min(dfsClient.getConf().getWritePacketSize(), freeInLastBlock),
+           bytesPerChecksum);
+     }
+   }
+ 
+   static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
+       EnumSet<CreateFlag> flags, int bufferSize, Progressable progress,
+       LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
+       String[] favoredNodes) throws IOException {
+     TraceScope scope =
+         dfsClient.newPathTraceScope("newStreamForAppend", src);
++    if(stat.getErasureCodingPolicy() != null) {
++      throw new IOException("Not support appending to a striping layout file yet.");
++    }
+     try {
+       final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
+           progress, lastBlock, stat, checksum, favoredNodes);
+       out.start();
+       return out;
+     } finally {
+       scope.close();
+     }
+   }
+ 
+   protected void computePacketChunkSize(int psize, int csize) {
+     final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
+     final int chunkSize = csize + getChecksumSize();
+     chunksPerPacket = Math.max(bodySize/chunkSize, 1);
+     packetSize = chunkSize*chunksPerPacket;
+     if (DFSClient.LOG.isDebugEnabled()) {
+       DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
+                 ", chunkSize=" + chunkSize +
+                 ", chunksPerPacket=" + chunksPerPacket +
+                 ", packetSize=" + packetSize);
+     }
+   }
+ 
+   protected TraceScope createWriteTraceScope() {
+     return dfsClient.newPathTraceScope("DFSOutputStream#write", src);
+   }
+ 
+   // @see FSOutputSummer#writeChunk()
+   @Override
+   protected synchronized void writeChunk(byte[] b, int offset, int len,
+       byte[] checksum, int ckoff, int cklen) throws IOException {
+     dfsClient.checkOpen();
+     checkClosed();
+ 
+     if (len > bytesPerChecksum) {
+       throw new IOException("writeChunk() buffer size is " + len +
+                             " is larger than supported  bytesPerChecksum " +
+                             bytesPerChecksum);
+     }
+     if (cklen != 0 && cklen != getChecksumSize()) {
+       throw new IOException("writeChunk() checksum size is supposed to be " +
+                             getChecksumSize() + " but found to be " + cklen);
+     }
+ 
+     if (currentPacket == null) {
+       currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
+           .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
 -      if (DFSClient.LOG.isDebugEnabled()) {
 -        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
++      if (LOG.isDebugEnabled()) {
++        LOG.debug("WriteChunk allocating new packet seqno=" +
+             currentPacket.getSeqno() +
+             ", src=" + src +
+             ", packetSize=" + packetSize +
+             ", chunksPerPacket=" + chunksPerPacket +
 -            ", bytesCurBlock=" + getStreamer().getBytesCurBlock());
++            ", bytesCurBlock=" + getStreamer().getBytesCurBlock() + ", " + this);
+       }
+     }
+ 
+     currentPacket.writeChecksum(checksum, ckoff, cklen);
+     currentPacket.writeData(b, offset, len);
+     currentPacket.incNumChunks();
+     getStreamer().incBytesCurBlock(len);
+ 
+     // If packet is full, enqueue it for transmission
 -    //
+     if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
+         getStreamer().getBytesCurBlock() == blockSize) {
+       enqueueCurrentPacketFull();
+     }
+   }
+ 
+   void enqueueCurrentPacket() throws IOException {
+     getStreamer().waitAndQueuePacket(currentPacket);
+     currentPacket = null;
+   }
+ 
+   void enqueueCurrentPacketFull() throws IOException {
+     LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
 -        + " appendChunk={}, {}", currentPacket, src, getStreamer()
 -        .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
++            + " appendChunk={}, {}", currentPacket, src, getStreamer()
++            .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
+         getStreamer());
+     enqueueCurrentPacket();
+     adjustChunkBoundary();
+     endBlock();
+   }
+ 
+   /** create an empty packet to mark the end of the block. */
+   void setCurrentPacketToEmpty() throws InterruptedIOException {
+     currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+         getStreamer().getAndIncCurrentSeqno(), true);
+     currentPacket.setSyncBlock(shouldSyncBlock);
+   }
+ 
+   /**
+    * If the reopened file did not end at chunk boundary and the above
+    * write filled up its partial chunk. Tell the summer to generate full
+    * crc chunks from now on.
+    */
+   protected void adjustChunkBoundary() {
+     if (getStreamer().getAppendChunk() &&
+         getStreamer().getBytesCurBlock() % bytesPerChecksum == 0) {
+       getStreamer().setAppendChunk(false);
+       resetChecksumBufSize();
+     }
+ 
+     if (!getStreamer().getAppendChunk()) {
+       int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()),
+           dfsClient.getConf().getWritePacketSize());
+       computePacketChunkSize(psize, bytesPerChecksum);
+     }
+   }
+ 
+   /**
+    * if encountering a block boundary, send an empty packet to
+    * indicate the end of block and reset bytesCurBlock.
+    *
+    * @throws IOException
+    */
 -  protected void endBlock() throws IOException {
++  void endBlock() throws IOException {
+     if (getStreamer().getBytesCurBlock() == blockSize) {
+       setCurrentPacketToEmpty();
+       enqueueCurrentPacket();
+       getStreamer().setBytesCurBlock(0);
+       lastFlushOffset = 0;
+     }
+   }
 -  
++
+   /**
+    * Flushes out to all replicas of the block. The data is in the buffers
+    * of the DNs but not necessarily in the DN's OS buffers.
+    *
+    * It is a synchronous operation. When it returns,
+    * it guarantees that flushed data become visible to new readers. 
+    * It is not guaranteed that data has been flushed to 
+    * persistent store on the datanode. 
+    * Block allocations are persisted on namenode.
+    */
+   @Override
+   public void hflush() throws IOException {
+     TraceScope scope =
+         dfsClient.newPathTraceScope("hflush", src);
+     try {
+       flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
+     } finally {
+       scope.close();
+     }
+   }
+ 
+   @Override
+   public void hsync() throws IOException {
+     TraceScope scope =
+         dfsClient.newPathTraceScope("hsync", src);
+     try {
+       flushOrSync(true, EnumSet.noneOf(SyncFlag.class));
+     } finally {
+       scope.close();
+     }
+   }
 -  
++
+   /**
+    * The expected semantics is all data have flushed out to all replicas 
+    * and all replicas have done posix fsync equivalent - ie the OS has 
+    * flushed it to the disk device (but the disk may have it in its cache).
 -   * 
++   *
+    * Note that only the current block is flushed to the disk device.
+    * To guarantee durable sync across block boundaries the stream should
+    * be created with {@link CreateFlag#SYNC_BLOCK}.
 -   * 
++   *
+    * @param syncFlags
+    *          Indicate the semantic of the sync. Currently used to specify
+    *          whether or not to update the block length in NameNode.
+    */
+   public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
+     TraceScope scope =
+         dfsClient.newPathTraceScope("hsync", src);
+     try {
+       flushOrSync(true, syncFlags);
+     } finally {
+       scope.close();
+     }
+   }
+ 
+   /**
+    * Flush/Sync buffered data to DataNodes.
 -   * 
++   *
+    * @param isSync
+    *          Whether or not to require all replicas to flush data to the disk
+    *          device
+    * @param syncFlags
+    *          Indicate extra detailed semantic of the flush/sync. Currently
+    *          mainly used to specify whether or not to update the file length in
+    *          the NameNode
+    * @throws IOException
+    */
+   private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
+       throws IOException {
+     dfsClient.checkOpen();
+     checkClosed();
+     try {
+       long toWaitFor;
+       long lastBlockLength = -1L;
+       boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
+       boolean endBlock = syncFlags.contains(SyncFlag.END_BLOCK);
+       synchronized (this) {
+         // flush checksum buffer, but keep checksum buffer intact if we do not
+         // need to end the current block
+         int numKept = flushBuffer(!endBlock, true);
+         // bytesCurBlock potentially incremented if there was buffered data
+ 
+         if (DFSClient.LOG.isDebugEnabled()) {
+           DFSClient.LOG.debug("DFSClient flush(): "
+               + " bytesCurBlock=" + getStreamer().getBytesCurBlock()
+               + " lastFlushOffset=" + lastFlushOffset
+               + " createNewBlock=" + endBlock);
+         }
+         // Flush only if we haven't already flushed till this offset.
+         if (lastFlushOffset != getStreamer().getBytesCurBlock()) {
+           assert getStreamer().getBytesCurBlock() > lastFlushOffset;
+           // record the valid offset of this flush
+           lastFlushOffset = getStreamer().getBytesCurBlock();
+           if (isSync && currentPacket == null && !endBlock) {
+             // Nothing to send right now,
+             // but sync was requested.
+             // Send an empty packet if we do not end the block right now
+             currentPacket = createPacket(packetSize, chunksPerPacket,
+                 getStreamer().getBytesCurBlock(), getStreamer()
+                     .getAndIncCurrentSeqno(), false);
+           }
+         } else {
+           if (isSync && getStreamer().getBytesCurBlock() > 0 && !endBlock) {
+             // Nothing to send right now,
+             // and the block was partially written,
+             // and sync was requested.
+             // So send an empty sync packet if we do not end the block right
+             // now
+             currentPacket = createPacket(packetSize, chunksPerPacket,
+                 getStreamer().getBytesCurBlock(), getStreamer()
+                     .getAndIncCurrentSeqno(), false);
+           } else if (currentPacket != null) {
+             // just discard the current packet since it is already been sent.
+             currentPacket.releaseBuffer(byteArrayManager);
+             currentPacket = null;
+           }
+         }
+         if (currentPacket != null) {
+           currentPacket.setSyncBlock(isSync);
+           enqueueCurrentPacket();
+         }
+         if (endBlock && getStreamer().getBytesCurBlock() > 0) {
+           // Need to end the current block, thus send an empty packet to
+           // indicate this is the end of the block and reset bytesCurBlock
+           currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+               getStreamer().getAndIncCurrentSeqno(), true);
+           currentPacket.setSyncBlock(shouldSyncBlock || isSync);
+           enqueueCurrentPacket();
+           getStreamer().setBytesCurBlock(0);
+           lastFlushOffset = 0;
+         } else {
+           // Restore state of stream. Record the last flush offset
+           // of the last full chunk that was flushed.
+           getStreamer().setBytesCurBlock(
+               getStreamer().getBytesCurBlock() - numKept);
+         }
+ 
+         toWaitFor = getStreamer().getLastQueuedSeqno();
+       } // end synchronized
+ 
+       getStreamer().waitForAckedSeqno(toWaitFor);
+ 
+       // update the block length first time irrespective of flag
+       if (updateLength || getStreamer().getPersistBlocks().get()) {
+         synchronized (this) {
+           if (!getStreamer().streamerClosed()
+               && getStreamer().getBlock() != null) {
+             lastBlockLength = getStreamer().getBlock().getNumBytes();
+           }
+         }
+       }
+       // If 1) any new blocks were allocated since the last flush, or 2) to
+       // update length in NN is required, then persist block locations on
+       // namenode.
+       if (getStreamer().getPersistBlocks().getAndSet(false) || updateLength) {
+         try {
+           dfsClient.namenode.fsync(src, fileId, dfsClient.clientName,
+               lastBlockLength);
+         } catch (IOException ioe) {
+           DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
+           // If we got an error here, it might be because some other thread called
+           // close before our hflush completed. In that case, we should throw an
+           // exception that the stream is closed.
+           checkClosed();
+           // If we aren't closed but failed to sync, we should expose that to the
+           // caller.
+           throw ioe;
+         }
+       }
+ 
+       synchronized(this) {
+         if (!getStreamer().streamerClosed()) {
+           getStreamer().setHflush();
+         }
+       }
+     } catch (InterruptedIOException interrupt) {
+       // This kind of error doesn't mean that the stream itself is broken - just the
+       // flushing thread got interrupted. So, we shouldn't close down the writer,
+       // but instead just propagate the error
+       throw interrupt;
+     } catch (IOException e) {
+       DFSClient.LOG.warn("Error while syncing", e);
+       synchronized (this) {
+         if (!isClosed()) {
+           getStreamer().getLastException().set(e);
+           closeThreads(true);
+         }
+       }
+       throw e;
+     }
+   }
+ 
+   /**
+    * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
+    */
+   @Deprecated
+   public synchronized int getNumCurrentReplicas() throws IOException {
+     return getCurrentBlockReplication();
+   }
+ 
+   /**
+    * Note that this is not a public API;
+    * use {@link HdfsDataOutputStream#getCurrentBlockReplication()} instead.
 -   * 
++   *
+    * @return the number of valid replicas of the current block
+    */
+   public synchronized int getCurrentBlockReplication() throws IOException {
+     dfsClient.checkOpen();
+     checkClosed();
+     if (getStreamer().streamerClosed()) {
+       return blockReplication; // no pipeline, return repl factor of file
+     }
+     DatanodeInfo[] currentNodes = getStreamer().getNodes();
+     if (currentNodes == null) {
+       return blockReplication; // no pipeline, return repl factor of file
+     }
+     return currentNodes.length;
+   }
 -  
++
+   /**
+    * Waits till all existing data is flushed and confirmations 
+    * received from datanodes. 
+    */
+   protected void flushInternal() throws IOException {
+     long toWaitFor;
+     synchronized (this) {
+       dfsClient.checkOpen();
+       checkClosed();
+       //
+       // If there is data in the current buffer, send it across
+       //
+       getStreamer().queuePacket(currentPacket);
+       currentPacket = null;
+       toWaitFor = getStreamer().getLastQueuedSeqno();
+     }
+ 
+     getStreamer().waitForAckedSeqno(toWaitFor);
+   }
+ 
+   protected synchronized void start() {
+     getStreamer().start();
+   }
 -  
++
+   /**
+    * Aborts this output stream and releases any system 
+    * resources associated with this stream.
+    */
+   synchronized void abort() throws IOException {
+     if (isClosed()) {
+       return;
+     }
+     getStreamer().getLastException().set(new IOException("Lease timeout of "
+         + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired."));
+     closeThreads(true);
+     dfsClient.endFileLease(fileId);
+   }
+ 
+   boolean isClosed() {
+     return closed || getStreamer().streamerClosed();
+   }
+ 
+   void setClosed() {
+     closed = true;
+     getStreamer().release();
+   }
+ 
+   // shutdown datastreamer and responseprocessor threads.
+   // interrupt datastreamer if force is true
+   protected void closeThreads(boolean force) throws IOException {
+     try {
+       getStreamer().close(force);
+       getStreamer().join();
+       getStreamer().closeSocket();
+     } catch (InterruptedException e) {
+       throw new IOException("Failed to shutdown streamer");
+     } finally {
+       getStreamer().setSocketToNull();
+       setClosed();
+     }
+   }
 -  
++
+   /**
+    * Closes this output stream and releases any system 
+    * resources associated with this stream.
+    */
+   @Override
+   public synchronized void close() throws IOException {
+     TraceScope scope =
+         dfsClient.newPathTraceScope("DFSOutputStream#close", src);
+     try {
+       closeImpl();
+     } finally {
+       scope.close();
+     }
+   }
+ 
+   protected synchronized void closeImpl() throws IOException {
+     if (isClosed()) {
+       getStreamer().getLastException().check(true);
+       return;
+     }
+ 
+     try {
+       flushBuffer();       // flush from all upper layers
+ 
+       if (currentPacket != null) {
+         enqueueCurrentPacket();
+       }
+ 
+       if (getStreamer().getBytesCurBlock() != 0) {
+         setCurrentPacketToEmpty();
+       }
+ 
+       flushInternal();             // flush all data to Datanodes
+       // get last block before destroying the streamer
+       ExtendedBlock lastBlock = getStreamer().getBlock();
+       closeThreads(false);
+       TraceScope scope = dfsClient.getTracer().newScope("completeFile");
+       try {
+         completeFile(lastBlock);
+       } finally {
+         scope.close();
+       }
+       dfsClient.endFileLease(fileId);
+     } catch (ClosedChannelException e) {
+     } finally {
+       setClosed();
+     }
+   }
+ 
+   // should be called holding (this) lock since setTestFilename() may 
+   // be called during unit tests
+   protected void completeFile(ExtendedBlock last) throws IOException {
+     long localstart = Time.monotonicNow();
+     final DfsClientConf conf = dfsClient.getConf();
+     long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
+     boolean fileComplete = false;
+     int retries = conf.getNumBlockWriteLocateFollowingRetry();
+     while (!fileComplete) {
+       fileComplete =
+           dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
+       if (!fileComplete) {
+         final int hdfsTimeout = conf.getHdfsTimeout();
+         if (!dfsClient.clientRunning
+             || (hdfsTimeout > 0
+                 && localstart + hdfsTimeout < Time.monotonicNow())) {
+             String msg = "Unable to close file because dfsclient " +
+                           " was unable to contact the HDFS servers." +
+                           " clientRunning " + dfsClient.clientRunning +
+                           " hdfsTimeout " + hdfsTimeout;
+             DFSClient.LOG.info(msg);
+             throw new IOException(msg);
+         }
+         try {
+           if (retries == 0) {
+             throw new IOException("Unable to close file because the last block"
+                 + " does not have enough number of replicas.");
+           }
+           retries--;
+           Thread.sleep(sleeptime);
+           sleeptime *= 2;
+           if (Time.monotonicNow() - localstart > 5000) {
+             DFSClient.LOG.info("Could not complete " + src + " retrying...");
+           }
+         } catch (InterruptedException ie) {
+           DFSClient.LOG.warn("Caught exception ", ie);
+         }
+       }
+     }
+   }
+ 
+   @VisibleForTesting
+   public void setArtificialSlowdown(long period) {
+     getStreamer().setArtificialSlowdown(period);
+   }
+ 
+   @VisibleForTesting
+   public synchronized void setChunksPerPacket(int value) {
+     chunksPerPacket = Math.min(chunksPerPacket, value);
+     packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket;
+   }
+ 
+   /**
+    * Returns the size of a file as it was when this stream was opened
+    */
+   public long getInitialLen() {
+     return initialFileSize;
+   }
+ 
+   /**
+    * @return the FileEncryptionInfo for this stream, or null if not encrypted.
+    */
+   public FileEncryptionInfo getFileEncryptionInfo() {
+     return fileEncryptionInfo;
+   }
+ 
+   /**
+    * Returns the access token currently used by streamer, for testing only
+    */
+   synchronized Token<BlockTokenIdentifier> getBlockToken() {
+     return getStreamer().getBlockToken();
+   }
+ 
+   @Override
+   public void setDropBehind(Boolean dropBehind) throws IOException {
+     CachingStrategy prevStrategy, nextStrategy;
+     // CachingStrategy is immutable.  So build a new CachingStrategy with the
+     // modifications we want, and compare-and-swap it in.
+     do {
+       prevStrategy = this.cachingStrategy.get();
+       nextStrategy = new CachingStrategy.Builder(prevStrategy).
 -                        setDropBehind(dropBehind).build();
++          setDropBehind(dropBehind).build();
+     } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy));
+   }
+ 
+   @VisibleForTesting
+   ExtendedBlock getBlock() {
+     return getStreamer().getBlock();
+   }
+ 
+   @VisibleForTesting
+   public long getFileId() {
+     return fileId;
+   }
+ 
+   /**
+    * Return the source of stream.
+    */
+   String getSrc() {
+     return src;
+   }
+ 
+   /**
+    * Returns the data streamer object.
+    */
+   protected DataStreamer getStreamer() {
+     return streamer;
+   }
++
++  @Override
++  public String toString() {
++    return getClass().getSimpleName() + ":" + streamer;
++  }
++
++  static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, DFSClient dfsClient,
++      String src, ExtendedBlock prevBlock, long fileId, String[] favoredNodes)
++      throws IOException {
++    final DfsClientConf conf = dfsClient.getConf();
++    int retries = conf.getNumBlockWriteLocateFollowingRetry();
++    long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
++    long localstart = Time.monotonicNow();
++    while (true) {
++      try {
++        return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock,
++            excludedNodes, fileId, favoredNodes);
++      } catch (RemoteException e) {
++        IOException ue = e.unwrapRemoteException(FileNotFoundException.class,
++            AccessControlException.class,
++            NSQuotaExceededException.class,
++            DSQuotaExceededException.class,
++            QuotaByStorageTypeExceededException.class,
++            UnresolvedPathException.class);
++        if (ue != e) {
++          throw ue; // no need to retry these exceptions
++        }
++        if (NotReplicatedYetException.class.getName().equals(e.getClassName())) {
++          if (retries == 0) {
++            throw e;
++          } else {
++            --retries;
++            LOG.info("Exception while adding a block", e);
++            long elapsed = Time.monotonicNow() - localstart;
++            if (elapsed > 5000) {
++              LOG.info("Waiting for replication for " + (elapsed / 1000)
++                  + " seconds");
++            }
++            try {
++              LOG.warn("NotReplicatedYetException sleeping " + src
++                  + " retries left " + retries);
++              Thread.sleep(sleeptime);
++              sleeptime *= 2;
++            } catch (InterruptedException ie) {
++              LOG.warn("Caught exception", ie);
++            }
++          }
++        } else {
++          throw e;
++        }
++      }
++    }
++  }
+ }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
index 0000000,9a8ca6f..191691b
mode 000000,100755..100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
@@@ -1,0 -1,350 +1,364 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hdfs;
+ 
+ import java.io.DataOutputStream;
+ import java.io.IOException;
+ import java.nio.BufferOverflowException;
++import java.nio.ByteBuffer;
+ import java.nio.channels.ClosedChannelException;
+ import java.util.Arrays;
+ 
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+ import org.apache.hadoop.hdfs.util.ByteArrayManager;
+ import org.apache.htrace.core.Span;
+ import org.apache.htrace.core.SpanId;
+ import org.apache.htrace.core.TraceScope;
+ 
+ /****************************************************************
+  * DFSPacket is used by DataStreamer and DFSOutputStream.
+  * DFSOutputStream generates packets and then ask DatStreamer
+  * to send them to datanodes.
+  ****************************************************************/
+ 
+ @InterfaceAudience.Private
 -class DFSPacket {
++public class DFSPacket {
+   public static final long HEART_BEAT_SEQNO = -1L;
+   private static SpanId[] EMPTY = new SpanId[0];
+   private final long seqno; // sequence number of buffer in block
+   private final long offsetInBlock; // offset in block
+   private boolean syncBlock; // this packet forces the current block to disk
+   private int numChunks; // number of chunks currently in packet
+   private final int maxChunks; // max chunks in packet
+   private byte[] buf;
+   private final boolean lastPacketInBlock; // is this the last packet in block?
+ 
+   /**
+    * buf is pointed into like follows:
+    *  (C is checksum data, D is payload data)
+    *
+    * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
+    *           ^        ^               ^               ^
+    *           |        checksumPos     dataStart       dataPos
+    *           checksumStart
+    *
+    * Right before sending, we move the checksum data to immediately precede
+    * the actual data, and then insert the header into the buffer immediately
+    * preceding the checksum data, so we make sure to keep enough space in
+    * front of the checksum data to support the largest conceivable header.
+    */
+   private int checksumStart;
+   private int checksumPos;
+   private final int dataStart;
+   private int dataPos;
+   private SpanId[] traceParents = EMPTY;
+   private int traceParentsUsed;
+   private TraceScope scope;
+ 
+   /**
+    * Create a new packet.
+    *
+    * @param buf the buffer storing data and checksums
+    * @param chunksPerPkt maximum number of chunks per packet.
+    * @param offsetInBlock offset in bytes into the HDFS block.
+    * @param seqno the sequence number of this packet
+    * @param checksumSize the size of checksum
+    * @param lastPacketInBlock if this is the last packet
+    */
 -  DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
++  public DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
+                    int checksumSize, boolean lastPacketInBlock) {
+     this.lastPacketInBlock = lastPacketInBlock;
+     this.numChunks = 0;
+     this.offsetInBlock = offsetInBlock;
+     this.seqno = seqno;
+ 
+     this.buf = buf;
+ 
+     checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
+     checksumPos = checksumStart;
+     dataStart = checksumStart + (chunksPerPkt * checksumSize);
+     dataPos = dataStart;
+     maxChunks = chunksPerPkt;
+   }
+ 
+   /**
+    * Write data to this packet.
+    *
+    * @param inarray input array of data
+    * @param off the offset of data to write
+    * @param len the length of data to write
+    * @throws ClosedChannelException
+    */
+   synchronized void writeData(byte[] inarray, int off, int len)
+       throws ClosedChannelException {
+     checkBuffer();
+     if (dataPos + len > buf.length) {
+       throw new BufferOverflowException();
+     }
+     System.arraycopy(inarray, off, buf, dataPos, len);
+     dataPos += len;
+   }
+ 
++  public synchronized void writeData(ByteBuffer inBuffer, int len)
++      throws ClosedChannelException {
++    checkBuffer();
++    len =  len > inBuffer.remaining() ? inBuffer.remaining() : len;
++    if (dataPos + len > buf.length) {
++      throw new BufferOverflowException();
++    }
++    for (int i = 0; i < len; i++) {
++      buf[dataPos + i] = inBuffer.get();
++    }
++    dataPos += len;
++  }
++
+   /**
+    * Write checksums to this packet
+    *
+    * @param inarray input array of checksums
+    * @param off the offset of checksums to write
+    * @param len the length of checksums to write
+    * @throws ClosedChannelException
+    */
 -  synchronized void writeChecksum(byte[] inarray, int off, int len)
++  public synchronized void writeChecksum(byte[] inarray, int off, int len)
+       throws ClosedChannelException {
+     checkBuffer();
+     if (len == 0) {
+       return;
+     }
+     if (checksumPos + len > dataStart) {
+       throw new BufferOverflowException();
+     }
+     System.arraycopy(inarray, off, buf, checksumPos, len);
+     checksumPos += len;
+   }
+ 
+   /**
+    * Write the full packet, including the header, to the given output stream.
+    *
+    * @param stm
+    * @throws IOException
+    */
 -  synchronized void writeTo(DataOutputStream stm) throws IOException {
++  public synchronized void writeTo(DataOutputStream stm) throws IOException {
+     checkBuffer();
+ 
+     final int dataLen = dataPos - dataStart;
+     final int checksumLen = checksumPos - checksumStart;
+     final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+ 
+     PacketHeader header = new PacketHeader(
+         pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
+ 
+     if (checksumPos != dataStart) {
+       // Move the checksum to cover the gap. This can happen for the last
+       // packet or during an hflush/hsync call.
+       System.arraycopy(buf, checksumStart, buf,
+           dataStart - checksumLen , checksumLen);
+       checksumPos = dataStart;
+       checksumStart = checksumPos - checksumLen;
+     }
+ 
+     final int headerStart = checksumStart - header.getSerializedSize();
+     assert checksumStart + 1 >= header.getSerializedSize();
+     assert headerStart >= 0;
+     assert headerStart + header.getSerializedSize() == checksumStart;
+ 
+     // Copy the header data into the buffer immediately preceding the checksum
+     // data.
+     System.arraycopy(header.getBytes(), 0, buf, headerStart,
+         header.getSerializedSize());
+ 
+     // corrupt the data for testing.
+     if (DFSClientFaultInjector.get().corruptPacket()) {
+       buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
+     }
+ 
+     // Write the now contiguous full packet to the output stream.
+     stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
+ 
+     // undo corruption.
+     if (DFSClientFaultInjector.get().uncorruptPacket()) {
+       buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
+     }
+   }
+ 
+   private synchronized void checkBuffer() throws ClosedChannelException {
+     if (buf == null) {
+       throw new ClosedChannelException();
+     }
+   }
+ 
+   /**
+    * Release the buffer in this packet to ByteArrayManager.
+    *
+    * @param bam
+    */
+   synchronized void releaseBuffer(ByteArrayManager bam) {
+     bam.release(buf);
+     buf = null;
+   }
+ 
+   /**
+    * get the packet's last byte's offset in the block
+    *
+    * @return the packet's last byte's offset in the block
+    */
+   synchronized long getLastByteOffsetBlock() {
+     return offsetInBlock + dataPos - dataStart;
+   }
+ 
+   /**
+    * Check if this packet is a heart beat packet
+    *
+    * @return true if the sequence number is HEART_BEAT_SEQNO
+    */
+   boolean isHeartbeatPacket() {
+     return seqno == HEART_BEAT_SEQNO;
+   }
+ 
+   /**
+    * check if this packet is the last packet in block
+    *
+    * @return true if the packet is the last packet
+    */
 -  boolean isLastPacketInBlock(){
++  boolean isLastPacketInBlock() {
+     return lastPacketInBlock;
+   }
+ 
+   /**
+    * get sequence number of this packet
+    *
+    * @return the sequence number of this packet
+    */
 -  long getSeqno(){
++  long getSeqno() {
+     return seqno;
+   }
+ 
+   /**
+    * get the number of chunks this packet contains
+    *
+    * @return the number of chunks in this packet
+    */
 -  synchronized int getNumChunks(){
++  synchronized int getNumChunks() {
+     return numChunks;
+   }
+ 
+   /**
+    * increase the number of chunks by one
+    */
 -  synchronized void incNumChunks(){
++  synchronized void incNumChunks() {
+     numChunks++;
+   }
+ 
+   /**
+    * get the maximum number of packets
+    *
+    * @return the maximum number of packets
+    */
 -  int getMaxChunks(){
++  int getMaxChunks() {
+     return maxChunks;
+   }
+ 
+   /**
+    * set if to sync block
+    *
+    * @param syncBlock if to sync block
+    */
 -  synchronized void setSyncBlock(boolean syncBlock){
++  synchronized void setSyncBlock(boolean syncBlock) {
+     this.syncBlock = syncBlock;
+   }
+ 
+   @Override
+   public String toString() {
+     return "packet seqno: " + this.seqno +
+         " offsetInBlock: " + this.offsetInBlock +
+         " lastPacketInBlock: " + this.lastPacketInBlock +
+         " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
+   }
+ 
+   /**
+    * Add a trace parent span for this packet.<p/>
+    *
+    * Trace parent spans for a packet are the trace spans responsible for
+    * adding data to that packet.  We store them as an array of longs for
+    * efficiency.<p/>
+    *
+    * Protected by the DFSOutputStream dataQueue lock.
+    */
+   public void addTraceParent(Span span) {
+     if (span == null) {
+       return;
+     }
+     addTraceParent(span.getSpanId());
+   }
+ 
+   public void addTraceParent(SpanId id) {
+     if (!id.isValid()) {
+       return;
+     }
+     if (traceParentsUsed == traceParents.length) {
+       int newLength = (traceParents.length == 0) ? 8 :
+           traceParents.length * 2;
+       traceParents = Arrays.copyOf(traceParents, newLength);
+     }
+     traceParents[traceParentsUsed] = id;
+     traceParentsUsed++;
+   }
+ 
+   /**
+    * Get the trace parent spans for this packet.<p/>
+    *
+    * Will always be non-null.<p/>
+    *
+    * Protected by the DFSOutputStream dataQueue lock.
+    */
+   public SpanId[] getTraceParents() {
+     // Remove duplicates from the array.
+     int len = traceParentsUsed;
+     Arrays.sort(traceParents, 0, len);
+     int i = 0, j = 0;
+     SpanId prevVal = SpanId.INVALID;
+     while (true) {
+       if (i == len) {
+         break;
+       }
+       SpanId val = traceParents[i];
+       if (!val.equals(prevVal)) {
+         traceParents[j] = val;
+         j++;
+         prevVal = val;
+       }
+       i++;
+     }
+     if (j < traceParents.length) {
+       traceParents = Arrays.copyOf(traceParents, j);
+       traceParentsUsed = traceParents.length;
+     }
+     return traceParents;
+   }
+ 
+   public void setTraceScope(TraceScope scope) {
+     this.scope = scope;
+   }
+ 
+   public TraceScope getTraceScope() {
+     return scope;
+   }
+ }