You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/30 17:41:09 UTC

[09/58] [abbrv] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
new file mode 100755
index 0000000..de1d1ee
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -0,0 +1,918 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.EnumSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.fs.CanSetDropBehind;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
+import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DataChecksum.Type;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Time;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+
+/****************************************************************
+ * DFSOutputStream creates files from a stream of bytes.
+ *
+ * The client application writes data that is cached internally by
+ * this stream. Data is broken up into packets, each packet is
+ * typically 64K in size. A packet comprises of chunks. Each chunk
+ * is typically 512 bytes and has an associated checksum with it.
+ *
+ * When a client application fills up the currentPacket, it is
+ * enqueued into the dataQueue of DataStreamer. DataStreamer is a
+ * thread that picks up packets from the dataQueue and sends it to
+ * the first datanode in the pipeline.
+ *
+ ****************************************************************/
+@InterfaceAudience.Private
+public class DFSOutputStream extends FSOutputSummer
+    implements Syncable, CanSetDropBehind {
+  static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class);
+  /**
+   * Number of times to retry creating a file when there are transient 
+   * errors (typically related to encryption zones and KeyProvider operations).
+   */
+  @VisibleForTesting
+  static final int CREATE_RETRY_COUNT = 10;
+  @VisibleForTesting
+  static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS =
+      CryptoProtocolVersion.supported();
+
+  protected final DFSClient dfsClient;
+  protected final ByteArrayManager byteArrayManager;
+  // closed is accessed by different threads under different locks.
+  protected volatile boolean closed = false;
+
+  protected final String src;
+  protected final long fileId;
+  protected final long blockSize;
+  protected final int bytesPerChecksum;
+
+  protected DFSPacket currentPacket = null;
+  private DataStreamer streamer;
+  protected int packetSize = 0; // write packet size, not including the header.
+  protected int chunksPerPacket = 0;
+  protected long lastFlushOffset = 0; // offset when flush was invoked
+  private long initialFileSize = 0; // at time of file open
+  private final short blockReplication; // replication factor of file
+  protected boolean shouldSyncBlock = false; // force blocks to disk upon close
+  protected final AtomicReference<CachingStrategy> cachingStrategy;
+  private FileEncryptionInfo fileEncryptionInfo;
+
+  /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
+  protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
+      long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
+    final byte[] buf;
+    final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
+
+    try {
+      buf = byteArrayManager.newByteArray(bufferSize);
+    } catch (InterruptedException ie) {
+      final InterruptedIOException iioe = new InterruptedIOException(
+          "seqno=" + seqno);
+      iioe.initCause(ie);
+      throw iioe;
+    }
+
+    return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno,
+                         getChecksumSize(), lastPacketInBlock);
+  }
+
+  @Override
+  protected void checkClosed() throws IOException {
+    if (isClosed()) {
+      getStreamer().getLastException().throwException4Close();
+    }
+  }
+
+  //
+  // returns the list of targets, if any, that is being currently used.
+  //
+  @VisibleForTesting
+  public synchronized DatanodeInfo[] getPipeline() {
+    if (getStreamer().streamerClosed()) {
+      return null;
+    }
+    DatanodeInfo[] currentNodes = getStreamer().getNodes();
+    if (currentNodes == null) {
+      return null;
+    }
+    DatanodeInfo[] value = new DatanodeInfo[currentNodes.length];
+    for (int i = 0; i < currentNodes.length; i++) {
+      value[i] = currentNodes[i];
+    }
+    return value;
+  }
+
+  /** 
+   * @return the object for computing checksum.
+   *         The type is NULL if checksum is not computed.
+   */
+  private static DataChecksum getChecksum4Compute(DataChecksum checksum,
+      HdfsFileStatus stat) {
+    if (DataStreamer.isLazyPersist(stat) && stat.getReplication() == 1) {
+      // do not compute checksum for writing to single replica to memory
+      return DataChecksum.newDataChecksum(Type.NULL,
+          checksum.getBytesPerChecksum());
+    }
+    return checksum;
+  }
+ 
+  private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
+      HdfsFileStatus stat, DataChecksum checksum) throws IOException {
+    super(getChecksum4Compute(checksum, stat));
+    this.dfsClient = dfsClient;
+    this.src = src;
+    this.fileId = stat.getFileId();
+    this.blockSize = stat.getBlockSize();
+    this.blockReplication = stat.getReplication();
+    this.fileEncryptionInfo = stat.getFileEncryptionInfo();
+    this.cachingStrategy = new AtomicReference<CachingStrategy>(
+        dfsClient.getDefaultWriteCachingStrategy());
+    if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug(
+          "Set non-null progress callback on DFSOutputStream " + src);
+    }
+    
+    this.bytesPerChecksum = checksum.getBytesPerChecksum();
+    if (bytesPerChecksum <= 0) {
+      throw new HadoopIllegalArgumentException(
+          "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0");
+    }
+    if (blockSize % bytesPerChecksum != 0) {
+      throw new HadoopIllegalArgumentException("Invalid values: "
+          + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
+          + ") must divide block size (=" + blockSize + ").");
+    }
+    this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
+  }
+
+  /** Construct a new output stream for creating a file. */
+  protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
+      EnumSet<CreateFlag> flag, Progressable progress,
+      DataChecksum checksum, String[] favoredNodes) throws IOException {
+    this(dfsClient, src, progress, stat, checksum);
+    this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
+
+    computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum);
+
+    streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
+        cachingStrategy, byteArrayManager, favoredNodes);
+  }
+
+  static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
+      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
+      short replication, long blockSize, Progressable progress, int buffersize,
+      DataChecksum checksum, String[] favoredNodes) throws IOException {
+    TraceScope scope =
+        dfsClient.getPathTraceScope("newStreamForCreate", src);
+    try {
+      HdfsFileStatus stat = null;
+
+      // Retry the create if we get a RetryStartFileException up to a maximum
+      // number of times
+      boolean shouldRetry = true;
+      int retryCount = CREATE_RETRY_COUNT;
+      while (shouldRetry) {
+        shouldRetry = false;
+        try {
+          stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
+              new EnumSetWritable<CreateFlag>(flag), createParent, replication,
+              blockSize, SUPPORTED_CRYPTO_VERSIONS);
+          break;
+        } catch (RemoteException re) {
+          IOException e = re.unwrapRemoteException(
+              AccessControlException.class,
+              DSQuotaExceededException.class,
+              QuotaByStorageTypeExceededException.class,
+              FileAlreadyExistsException.class,
+              FileNotFoundException.class,
+              ParentNotDirectoryException.class,
+              NSQuotaExceededException.class,
+              RetryStartFileException.class,
+              SafeModeException.class,
+              UnresolvedPathException.class,
+              SnapshotAccessControlException.class,
+              UnknownCryptoProtocolVersionException.class);
+          if (e instanceof RetryStartFileException) {
+            if (retryCount > 0) {
+              shouldRetry = true;
+              retryCount--;
+            } else {
+              throw new IOException("Too many retries because of encryption" +
+                  " zone operations", e);
+            }
+          } else {
+            throw e;
+          }
+        }
+      }
+      Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
+      final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
+          flag, progress, checksum, favoredNodes);
+      out.start();
+      return out;
+    } finally {
+      scope.close();
+    }
+  }
+
+  /** Construct a new output stream for append. */
+  private DFSOutputStream(DFSClient dfsClient, String src,
+      EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
+      HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
+          throws IOException {
+    this(dfsClient, src, progress, stat, checksum);
+    initialFileSize = stat.getLen(); // length of file when opened
+    this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);
+
+    boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK);
+
+    this.fileEncryptionInfo = stat.getFileEncryptionInfo();
+
+    // The last partial block of the file has to be filled.
+    if (!toNewBlock && lastBlock != null) {
+      // indicate that we are appending to an existing block
+      streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum,
+          cachingStrategy, byteArrayManager);
+      getStreamer().setBytesCurBlock(lastBlock.getBlockSize());
+      adjustPacketChunkSize(stat);
+      getStreamer().setPipelineInConstruction(lastBlock);
+    } else {
+      computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
+          bytesPerChecksum);
+      streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null,
+          dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
+          favoredNodes);
+    }
+  }
+
+  private void adjustPacketChunkSize(HdfsFileStatus stat) throws IOException{
+
+    long usedInLastBlock = stat.getLen() % blockSize;
+    int freeInLastBlock = (int)(blockSize - usedInLastBlock);
+
+    // calculate the amount of free space in the pre-existing
+    // last crc chunk
+    int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
+    int freeInCksum = bytesPerChecksum - usedInCksum;
+
+    // if there is space in the last block, then we have to
+    // append to that block
+    if (freeInLastBlock == blockSize) {
+      throw new IOException("The last block for file " +
+          src + " is full.");
+    }
+
+    if (usedInCksum > 0 && freeInCksum > 0) {
+      // if there is space in the last partial chunk, then
+      // setup in such a way that the next packet will have only
+      // one chunk that fills up the partial chunk.
+      //
+      computePacketChunkSize(0, freeInCksum);
+      setChecksumBufSize(freeInCksum);
+      getStreamer().setAppendChunk(true);
+    } else {
+      // if the remaining space in the block is smaller than
+      // that expected size of of a packet, then create
+      // smaller size packet.
+      //
+      computePacketChunkSize(
+          Math.min(dfsClient.getConf().getWritePacketSize(), freeInLastBlock),
+          bytesPerChecksum);
+    }
+  }
+
+  static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
+      EnumSet<CreateFlag> flags, int bufferSize, Progressable progress,
+      LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
+      String[] favoredNodes) throws IOException {
+    TraceScope scope =
+        dfsClient.getPathTraceScope("newStreamForAppend", src);
+    try {
+      final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
+          progress, lastBlock, stat, checksum, favoredNodes);
+      out.start();
+      return out;
+    } finally {
+      scope.close();
+    }
+  }
+
+  protected void computePacketChunkSize(int psize, int csize) {
+    final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
+    final int chunkSize = csize + getChecksumSize();
+    chunksPerPacket = Math.max(bodySize/chunkSize, 1);
+    packetSize = chunkSize*chunksPerPacket;
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
+                ", chunkSize=" + chunkSize +
+                ", chunksPerPacket=" + chunksPerPacket +
+                ", packetSize=" + packetSize);
+    }
+  }
+
+  protected TraceScope createWriteTraceScope() {
+    return dfsClient.getPathTraceScope("DFSOutputStream#write", src);
+  }
+
+  // @see FSOutputSummer#writeChunk()
+  @Override
+  protected synchronized void writeChunk(byte[] b, int offset, int len,
+      byte[] checksum, int ckoff, int cklen) throws IOException {
+    dfsClient.checkOpen();
+    checkClosed();
+
+    if (len > bytesPerChecksum) {
+      throw new IOException("writeChunk() buffer size is " + len +
+                            " is larger than supported  bytesPerChecksum " +
+                            bytesPerChecksum);
+    }
+    if (cklen != 0 && cklen != getChecksumSize()) {
+      throw new IOException("writeChunk() checksum size is supposed to be " +
+                            getChecksumSize() + " but found to be " + cklen);
+    }
+
+    if (currentPacket == null) {
+      currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
+          .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
+            currentPacket.getSeqno() +
+            ", src=" + src +
+            ", packetSize=" + packetSize +
+            ", chunksPerPacket=" + chunksPerPacket +
+            ", bytesCurBlock=" + getStreamer().getBytesCurBlock());
+      }
+    }
+
+    currentPacket.writeChecksum(checksum, ckoff, cklen);
+    currentPacket.writeData(b, offset, len);
+    currentPacket.incNumChunks();
+    getStreamer().incBytesCurBlock(len);
+
+    // If packet is full, enqueue it for transmission
+    //
+    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
+        getStreamer().getBytesCurBlock() == blockSize) {
+      enqueueCurrentPacketFull();
+    }
+  }
+
+  void enqueueCurrentPacket() throws IOException {
+    getStreamer().waitAndQueuePacket(currentPacket);
+    currentPacket = null;
+  }
+
+  void enqueueCurrentPacketFull() throws IOException {
+    LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
+        + " appendChunk={}, {}", currentPacket, src, getStreamer()
+        .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
+        getStreamer());
+    enqueueCurrentPacket();
+    adjustChunkBoundary();
+    endBlock();
+  }
+
+  /** create an empty packet to mark the end of the block. */
+  void setCurrentPacketToEmpty() throws InterruptedIOException {
+    currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+        getStreamer().getAndIncCurrentSeqno(), true);
+    currentPacket.setSyncBlock(shouldSyncBlock);
+  }
+
+  /**
+   * If the reopened file did not end at chunk boundary and the above
+   * write filled up its partial chunk. Tell the summer to generate full
+   * crc chunks from now on.
+   */
+  protected void adjustChunkBoundary() {
+    if (getStreamer().getAppendChunk() &&
+        getStreamer().getBytesCurBlock() % bytesPerChecksum == 0) {
+      getStreamer().setAppendChunk(false);
+      resetChecksumBufSize();
+    }
+
+    if (!getStreamer().getAppendChunk()) {
+      int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()),
+          dfsClient.getConf().getWritePacketSize());
+      computePacketChunkSize(psize, bytesPerChecksum);
+    }
+  }
+
+  /**
+   * if encountering a block boundary, send an empty packet to
+   * indicate the end of block and reset bytesCurBlock.
+   *
+   * @throws IOException
+   */
+  protected void endBlock() throws IOException {
+    if (getStreamer().getBytesCurBlock() == blockSize) {
+      setCurrentPacketToEmpty();
+      enqueueCurrentPacket();
+      getStreamer().setBytesCurBlock(0);
+      lastFlushOffset = 0;
+    }
+  }
+  
+  /**
+   * Flushes out to all replicas of the block. The data is in the buffers
+   * of the DNs but not necessarily in the DN's OS buffers.
+   *
+   * It is a synchronous operation. When it returns,
+   * it guarantees that flushed data become visible to new readers. 
+   * It is not guaranteed that data has been flushed to 
+   * persistent store on the datanode. 
+   * Block allocations are persisted on namenode.
+   */
+  @Override
+  public void hflush() throws IOException {
+    TraceScope scope =
+        dfsClient.getPathTraceScope("hflush", src);
+    try {
+      flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
+    } finally {
+      scope.close();
+    }
+  }
+
+  @Override
+  public void hsync() throws IOException {
+    TraceScope scope =
+        dfsClient.getPathTraceScope("hsync", src);
+    try {
+      flushOrSync(true, EnumSet.noneOf(SyncFlag.class));
+    } finally {
+      scope.close();
+    }
+  }
+  
+  /**
+   * The expected semantics is all data have flushed out to all replicas 
+   * and all replicas have done posix fsync equivalent - ie the OS has 
+   * flushed it to the disk device (but the disk may have it in its cache).
+   * 
+   * Note that only the current block is flushed to the disk device.
+   * To guarantee durable sync across block boundaries the stream should
+   * be created with {@link CreateFlag#SYNC_BLOCK}.
+   * 
+   * @param syncFlags
+   *          Indicate the semantic of the sync. Currently used to specify
+   *          whether or not to update the block length in NameNode.
+   */
+  public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
+    TraceScope scope =
+        dfsClient.getPathTraceScope("hsync", src);
+    try {
+      flushOrSync(true, syncFlags);
+    } finally {
+      scope.close();
+    }
+  }
+
+  /**
+   * Flush/Sync buffered data to DataNodes.
+   * 
+   * @param isSync
+   *          Whether or not to require all replicas to flush data to the disk
+   *          device
+   * @param syncFlags
+   *          Indicate extra detailed semantic of the flush/sync. Currently
+   *          mainly used to specify whether or not to update the file length in
+   *          the NameNode
+   * @throws IOException
+   */
+  private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
+      throws IOException {
+    dfsClient.checkOpen();
+    checkClosed();
+    try {
+      long toWaitFor;
+      long lastBlockLength = -1L;
+      boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
+      boolean endBlock = syncFlags.contains(SyncFlag.END_BLOCK);
+      synchronized (this) {
+        // flush checksum buffer, but keep checksum buffer intact if we do not
+        // need to end the current block
+        int numKept = flushBuffer(!endBlock, true);
+        // bytesCurBlock potentially incremented if there was buffered data
+
+        if (DFSClient.LOG.isDebugEnabled()) {
+          DFSClient.LOG.debug("DFSClient flush(): "
+              + " bytesCurBlock=" + getStreamer().getBytesCurBlock()
+              + " lastFlushOffset=" + lastFlushOffset
+              + " createNewBlock=" + endBlock);
+        }
+        // Flush only if we haven't already flushed till this offset.
+        if (lastFlushOffset != getStreamer().getBytesCurBlock()) {
+          assert getStreamer().getBytesCurBlock() > lastFlushOffset;
+          // record the valid offset of this flush
+          lastFlushOffset = getStreamer().getBytesCurBlock();
+          if (isSync && currentPacket == null && !endBlock) {
+            // Nothing to send right now,
+            // but sync was requested.
+            // Send an empty packet if we do not end the block right now
+            currentPacket = createPacket(packetSize, chunksPerPacket,
+                getStreamer().getBytesCurBlock(), getStreamer()
+                    .getAndIncCurrentSeqno(), false);
+          }
+        } else {
+          if (isSync && getStreamer().getBytesCurBlock() > 0 && !endBlock) {
+            // Nothing to send right now,
+            // and the block was partially written,
+            // and sync was requested.
+            // So send an empty sync packet if we do not end the block right
+            // now
+            currentPacket = createPacket(packetSize, chunksPerPacket,
+                getStreamer().getBytesCurBlock(), getStreamer()
+                    .getAndIncCurrentSeqno(), false);
+          } else if (currentPacket != null) {
+            // just discard the current packet since it is already been sent.
+            currentPacket.releaseBuffer(byteArrayManager);
+            currentPacket = null;
+          }
+        }
+        if (currentPacket != null) {
+          currentPacket.setSyncBlock(isSync);
+          enqueueCurrentPacket();
+        }
+        if (endBlock && getStreamer().getBytesCurBlock() > 0) {
+          // Need to end the current block, thus send an empty packet to
+          // indicate this is the end of the block and reset bytesCurBlock
+          currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+              getStreamer().getAndIncCurrentSeqno(), true);
+          currentPacket.setSyncBlock(shouldSyncBlock || isSync);
+          enqueueCurrentPacket();
+          getStreamer().setBytesCurBlock(0);
+          lastFlushOffset = 0;
+        } else {
+          // Restore state of stream. Record the last flush offset
+          // of the last full chunk that was flushed.
+          getStreamer().setBytesCurBlock(
+              getStreamer().getBytesCurBlock() - numKept);
+        }
+
+        toWaitFor = getStreamer().getLastQueuedSeqno();
+      } // end synchronized
+
+      getStreamer().waitForAckedSeqno(toWaitFor);
+
+      // update the block length first time irrespective of flag
+      if (updateLength || getStreamer().getPersistBlocks().get()) {
+        synchronized (this) {
+          if (!getStreamer().streamerClosed()
+              && getStreamer().getBlock() != null) {
+            lastBlockLength = getStreamer().getBlock().getNumBytes();
+          }
+        }
+      }
+      // If 1) any new blocks were allocated since the last flush, or 2) to
+      // update length in NN is required, then persist block locations on
+      // namenode.
+      if (getStreamer().getPersistBlocks().getAndSet(false) || updateLength) {
+        try {
+          dfsClient.namenode.fsync(src, fileId, dfsClient.clientName,
+              lastBlockLength);
+        } catch (IOException ioe) {
+          DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
+          // If we got an error here, it might be because some other thread called
+          // close before our hflush completed. In that case, we should throw an
+          // exception that the stream is closed.
+          checkClosed();
+          // If we aren't closed but failed to sync, we should expose that to the
+          // caller.
+          throw ioe;
+        }
+      }
+
+      synchronized(this) {
+        if (!getStreamer().streamerClosed()) {
+          getStreamer().setHflush();
+        }
+      }
+    } catch (InterruptedIOException interrupt) {
+      // This kind of error doesn't mean that the stream itself is broken - just the
+      // flushing thread got interrupted. So, we shouldn't close down the writer,
+      // but instead just propagate the error
+      throw interrupt;
+    } catch (IOException e) {
+      DFSClient.LOG.warn("Error while syncing", e);
+      synchronized (this) {
+        if (!isClosed()) {
+          getStreamer().getLastException().set(e);
+          closeThreads(true);
+        }
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
+   */
+  @Deprecated
+  public synchronized int getNumCurrentReplicas() throws IOException {
+    return getCurrentBlockReplication();
+  }
+
+  /**
+   * Note that this is not a public API;
+   * use {@link HdfsDataOutputStream#getCurrentBlockReplication()} instead.
+   * 
+   * @return the number of valid replicas of the current block
+   */
+  public synchronized int getCurrentBlockReplication() throws IOException {
+    dfsClient.checkOpen();
+    checkClosed();
+    if (getStreamer().streamerClosed()) {
+      return blockReplication; // no pipeline, return repl factor of file
+    }
+    DatanodeInfo[] currentNodes = getStreamer().getNodes();
+    if (currentNodes == null) {
+      return blockReplication; // no pipeline, return repl factor of file
+    }
+    return currentNodes.length;
+  }
+  
+  /**
+   * Waits till all existing data is flushed and confirmations 
+   * received from datanodes. 
+   */
+  protected void flushInternal() throws IOException {
+    long toWaitFor;
+    synchronized (this) {
+      dfsClient.checkOpen();
+      checkClosed();
+      //
+      // If there is data in the current buffer, send it across
+      //
+      getStreamer().queuePacket(currentPacket);
+      currentPacket = null;
+      toWaitFor = getStreamer().getLastQueuedSeqno();
+    }
+
+    getStreamer().waitForAckedSeqno(toWaitFor);
+  }
+
+  protected synchronized void start() {
+    getStreamer().start();
+  }
+  
+  /**
+   * Aborts this output stream and releases any system 
+   * resources associated with this stream.
+   */
+  synchronized void abort() throws IOException {
+    if (isClosed()) {
+      return;
+    }
+    getStreamer().getLastException().set(new IOException("Lease timeout of "
+        + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired."));
+    closeThreads(true);
+    dfsClient.endFileLease(fileId);
+  }
+
+  boolean isClosed() {
+    return closed || getStreamer().streamerClosed();
+  }
+
+  void setClosed() {
+    closed = true;
+    getStreamer().release();
+  }
+
+  // shutdown datastreamer and responseprocessor threads.
+  // interrupt datastreamer if force is true
+  protected void closeThreads(boolean force) throws IOException {
+    try {
+      getStreamer().close(force);
+      getStreamer().join();
+      getStreamer().closeSocket();
+    } catch (InterruptedException e) {
+      throw new IOException("Failed to shutdown streamer");
+    } finally {
+      getStreamer().setSocketToNull();
+      setClosed();
+    }
+  }
+  
+  /**
+   * Closes this output stream and releases any system 
+   * resources associated with this stream.
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    TraceScope scope =
+        dfsClient.getPathTraceScope("DFSOutputStream#close", src);
+    try {
+      closeImpl();
+    } finally {
+      scope.close();
+    }
+  }
+
+  protected synchronized void closeImpl() throws IOException {
+    if (isClosed()) {
+      getStreamer().getLastException().check(true);
+      return;
+    }
+
+    try {
+      flushBuffer();       // flush from all upper layers
+
+      if (currentPacket != null) {
+        enqueueCurrentPacket();
+      }
+
+      if (getStreamer().getBytesCurBlock() != 0) {
+        setCurrentPacketToEmpty();
+      }
+
+      flushInternal();             // flush all data to Datanodes
+      // get last block before destroying the streamer
+      ExtendedBlock lastBlock = getStreamer().getBlock();
+      closeThreads(false);
+      TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
+      try {
+        completeFile(lastBlock);
+      } finally {
+        scope.close();
+      }
+      dfsClient.endFileLease(fileId);
+    } catch (ClosedChannelException e) {
+    } finally {
+      setClosed();
+    }
+  }
+
+  // should be called holding (this) lock since setTestFilename() may 
+  // be called during unit tests
+  protected void completeFile(ExtendedBlock last) throws IOException {
+    long localstart = Time.monotonicNow();
+    final DfsClientConf conf = dfsClient.getConf();
+    long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
+    boolean fileComplete = false;
+    int retries = conf.getNumBlockWriteLocateFollowingRetry();
+    while (!fileComplete) {
+      fileComplete =
+          dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
+      if (!fileComplete) {
+        final int hdfsTimeout = conf.getHdfsTimeout();
+        if (!dfsClient.clientRunning
+            || (hdfsTimeout > 0
+                && localstart + hdfsTimeout < Time.monotonicNow())) {
+            String msg = "Unable to close file because dfsclient " +
+                          " was unable to contact the HDFS servers." +
+                          " clientRunning " + dfsClient.clientRunning +
+                          " hdfsTimeout " + hdfsTimeout;
+            DFSClient.LOG.info(msg);
+            throw new IOException(msg);
+        }
+        try {
+          if (retries == 0) {
+            throw new IOException("Unable to close file because the last block"
+                + " does not have enough number of replicas.");
+          }
+          retries--;
+          Thread.sleep(sleeptime);
+          sleeptime *= 2;
+          if (Time.monotonicNow() - localstart > 5000) {
+            DFSClient.LOG.info("Could not complete " + src + " retrying...");
+          }
+        } catch (InterruptedException ie) {
+          DFSClient.LOG.warn("Caught exception ", ie);
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public void setArtificialSlowdown(long period) {
+    getStreamer().setArtificialSlowdown(period);
+  }
+
+  @VisibleForTesting
+  public synchronized void setChunksPerPacket(int value) {
+    chunksPerPacket = Math.min(chunksPerPacket, value);
+    packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket;
+  }
+
+  /**
+   * Returns the size of a file as it was when this stream was opened
+   */
+  public long getInitialLen() {
+    return initialFileSize;
+  }
+
+  /**
+   * @return the FileEncryptionInfo for this stream, or null if not encrypted.
+   */
+  public FileEncryptionInfo getFileEncryptionInfo() {
+    return fileEncryptionInfo;
+  }
+
+  /**
+   * Returns the access token currently used by streamer, for testing only
+   */
+  synchronized Token<BlockTokenIdentifier> getBlockToken() {
+    return getStreamer().getBlockToken();
+  }
+
+  @Override
+  public void setDropBehind(Boolean dropBehind) throws IOException {
+    CachingStrategy prevStrategy, nextStrategy;
+    // CachingStrategy is immutable.  So build a new CachingStrategy with the
+    // modifications we want, and compare-and-swap it in.
+    do {
+      prevStrategy = this.cachingStrategy.get();
+      nextStrategy = new CachingStrategy.Builder(prevStrategy).
+                        setDropBehind(dropBehind).build();
+    } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy));
+  }
+
+  @VisibleForTesting
+  ExtendedBlock getBlock() {
+    return getStreamer().getBlock();
+  }
+
+  @VisibleForTesting
+  public long getFileId() {
+    return fileId;
+  }
+
+  /**
+   * Return the source of stream.
+   */
+  String getSrc() {
+    return src;
+  }
+
+  /**
+   * Returns the data streamer object.
+   */
+  protected DataStreamer getStreamer() {
+    return streamer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
new file mode 100755
index 0000000..22055c3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.channels.ClosedChannelException;
+import java.util.Arrays;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
+import org.apache.htrace.Span;
+
+/****************************************************************
+ * DFSPacket is used by DataStreamer and DFSOutputStream.
+ * DFSOutputStream generates packets and then ask DatStreamer
+ * to send them to datanodes.
+ ****************************************************************/
+
+@InterfaceAudience.Private
+class DFSPacket {
+  public static final long HEART_BEAT_SEQNO = -1L;
+  private static long[] EMPTY = new long[0];
+  private final long seqno; // sequence number of buffer in block
+  private final long offsetInBlock; // offset in block
+  private boolean syncBlock; // this packet forces the current block to disk
+  private int numChunks; // number of chunks currently in packet
+  private final int maxChunks; // max chunks in packet
+  private byte[] buf;
+  private final boolean lastPacketInBlock; // is this the last packet in block?
+
+  /**
+   * buf is pointed into like follows:
+   *  (C is checksum data, D is payload data)
+   *
+   * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
+   *           ^        ^               ^               ^
+   *           |        checksumPos     dataStart       dataPos
+   *           checksumStart
+   *
+   * Right before sending, we move the checksum data to immediately precede
+   * the actual data, and then insert the header into the buffer immediately
+   * preceding the checksum data, so we make sure to keep enough space in
+   * front of the checksum data to support the largest conceivable header.
+   */
+  private int checksumStart;
+  private int checksumPos;
+  private final int dataStart;
+  private int dataPos;
+  private long[] traceParents = EMPTY;
+  private int traceParentsUsed;
+  private Span span;
+
+  /**
+   * Create a new packet.
+   *
+   * @param buf the buffer storing data and checksums
+   * @param chunksPerPkt maximum number of chunks per packet.
+   * @param offsetInBlock offset in bytes into the HDFS block.
+   * @param seqno the sequence number of this packet
+   * @param checksumSize the size of checksum
+   * @param lastPacketInBlock if this is the last packet
+   */
+  DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
+                   int checksumSize, boolean lastPacketInBlock) {
+    this.lastPacketInBlock = lastPacketInBlock;
+    this.numChunks = 0;
+    this.offsetInBlock = offsetInBlock;
+    this.seqno = seqno;
+
+    this.buf = buf;
+
+    checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
+    checksumPos = checksumStart;
+    dataStart = checksumStart + (chunksPerPkt * checksumSize);
+    dataPos = dataStart;
+    maxChunks = chunksPerPkt;
+  }
+
+  /**
+   * Write data to this packet.
+   *
+   * @param inarray input array of data
+   * @param off the offset of data to write
+   * @param len the length of data to write
+   * @throws ClosedChannelException
+   */
+  synchronized void writeData(byte[] inarray, int off, int len)
+      throws ClosedChannelException {
+    checkBuffer();
+    if (dataPos + len > buf.length) {
+      throw new BufferOverflowException();
+    }
+    System.arraycopy(inarray, off, buf, dataPos, len);
+    dataPos += len;
+  }
+
+  /**
+   * Write checksums to this packet
+   *
+   * @param inarray input array of checksums
+   * @param off the offset of checksums to write
+   * @param len the length of checksums to write
+   * @throws ClosedChannelException
+   */
+  synchronized void writeChecksum(byte[] inarray, int off, int len)
+      throws ClosedChannelException {
+    checkBuffer();
+    if (len == 0) {
+      return;
+    }
+    if (checksumPos + len > dataStart) {
+      throw new BufferOverflowException();
+    }
+    System.arraycopy(inarray, off, buf, checksumPos, len);
+    checksumPos += len;
+  }
+
+  /**
+   * Write the full packet, including the header, to the given output stream.
+   *
+   * @param stm
+   * @throws IOException
+   */
+  synchronized void writeTo(DataOutputStream stm) throws IOException {
+    checkBuffer();
+
+    final int dataLen = dataPos - dataStart;
+    final int checksumLen = checksumPos - checksumStart;
+    final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+
+    PacketHeader header = new PacketHeader(
+        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
+
+    if (checksumPos != dataStart) {
+      // Move the checksum to cover the gap. This can happen for the last
+      // packet or during an hflush/hsync call.
+      System.arraycopy(buf, checksumStart, buf,
+          dataStart - checksumLen , checksumLen);
+      checksumPos = dataStart;
+      checksumStart = checksumPos - checksumLen;
+    }
+
+    final int headerStart = checksumStart - header.getSerializedSize();
+    assert checksumStart + 1 >= header.getSerializedSize();
+    assert headerStart >= 0;
+    assert headerStart + header.getSerializedSize() == checksumStart;
+
+    // Copy the header data into the buffer immediately preceding the checksum
+    // data.
+    System.arraycopy(header.getBytes(), 0, buf, headerStart,
+        header.getSerializedSize());
+
+    // corrupt the data for testing.
+    if (DFSClientFaultInjector.get().corruptPacket()) {
+      buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
+    }
+
+    // Write the now contiguous full packet to the output stream.
+    stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
+
+    // undo corruption.
+    if (DFSClientFaultInjector.get().uncorruptPacket()) {
+      buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
+    }
+  }
+
+  private synchronized void checkBuffer() throws ClosedChannelException {
+    if (buf == null) {
+      throw new ClosedChannelException();
+    }
+  }
+
+  /**
+   * Release the buffer in this packet to ByteArrayManager.
+   *
+   * @param bam
+   */
+  synchronized void releaseBuffer(ByteArrayManager bam) {
+    bam.release(buf);
+    buf = null;
+  }
+
+  /**
+   * get the packet's last byte's offset in the block
+   *
+   * @return the packet's last byte's offset in the block
+   */
+  synchronized long getLastByteOffsetBlock() {
+    return offsetInBlock + dataPos - dataStart;
+  }
+
+  /**
+   * Check if this packet is a heart beat packet
+   *
+   * @return true if the sequence number is HEART_BEAT_SEQNO
+   */
+  boolean isHeartbeatPacket() {
+    return seqno == HEART_BEAT_SEQNO;
+  }
+
+  /**
+   * check if this packet is the last packet in block
+   *
+   * @return true if the packet is the last packet
+   */
+  boolean isLastPacketInBlock(){
+    return lastPacketInBlock;
+  }
+
+  /**
+   * get sequence number of this packet
+   *
+   * @return the sequence number of this packet
+   */
+  long getSeqno(){
+    return seqno;
+  }
+
+  /**
+   * get the number of chunks this packet contains
+   *
+   * @return the number of chunks in this packet
+   */
+  synchronized int getNumChunks(){
+    return numChunks;
+  }
+
+  /**
+   * increase the number of chunks by one
+   */
+  synchronized void incNumChunks(){
+    numChunks++;
+  }
+
+  /**
+   * get the maximum number of packets
+   *
+   * @return the maximum number of packets
+   */
+  int getMaxChunks(){
+    return maxChunks;
+  }
+
+  /**
+   * set if to sync block
+   *
+   * @param syncBlock if to sync block
+   */
+  synchronized void setSyncBlock(boolean syncBlock){
+    this.syncBlock = syncBlock;
+  }
+
+  @Override
+  public String toString() {
+    return "packet seqno: " + this.seqno +
+        " offsetInBlock: " + this.offsetInBlock +
+        " lastPacketInBlock: " + this.lastPacketInBlock +
+        " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
+  }
+
+  /**
+   * Add a trace parent span for this packet.<p/>
+   *
+   * Trace parent spans for a packet are the trace spans responsible for
+   * adding data to that packet.  We store them as an array of longs for
+   * efficiency.<p/>
+   *
+   * Protected by the DFSOutputStream dataQueue lock.
+   */
+  public void addTraceParent(Span span) {
+    if (span == null) {
+      return;
+    }
+    addTraceParent(span.getSpanId());
+  }
+
+  public void addTraceParent(long id) {
+    if (traceParentsUsed == traceParents.length) {
+      int newLength = (traceParents.length == 0) ? 8 :
+          traceParents.length * 2;
+      traceParents = Arrays.copyOf(traceParents, newLength);
+    }
+    traceParents[traceParentsUsed] = id;
+    traceParentsUsed++;
+  }
+
+  /**
+   * Get the trace parent spans for this packet.<p/>
+   *
+   * Will always be non-null.<p/>
+   *
+   * Protected by the DFSOutputStream dataQueue lock.
+   */
+  public long[] getTraceParents() {
+    // Remove duplicates from the array.
+    int len = traceParentsUsed;
+    Arrays.sort(traceParents, 0, len);
+    int i = 0, j = 0;
+    long prevVal = 0; // 0 is not a valid span id
+    while (true) {
+      if (i == len) {
+        break;
+      }
+      long val = traceParents[i];
+      if (val != prevVal) {
+        traceParents[j] = val;
+        j++;
+        prevVal = val;
+      }
+      i++;
+    }
+    if (j < traceParents.length) {
+      traceParents = Arrays.copyOf(traceParents, j);
+      traceParentsUsed = traceParents.length;
+    }
+    return traceParents;
+  }
+
+  public void setTraceSpan(Span span) {
+    this.span = span;
+  }
+
+  public Span getTraceSpan() {
+    return span;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 359886e..e275afb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -590,6 +591,29 @@ public class DFSUtilClient {
     }
   }
 
+  public static int getIoFileBufferSize(Configuration conf) {
+    return conf.getInt(
+        CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
+        CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+  }
+
+  public static int getSmallBufferSize(Configuration conf) {
+    return Math.min(getIoFileBufferSize(conf) / 2, 512);
+  }
+
+  /**
+   * Probe for HDFS Encryption being enabled; this uses the value of
+   * the option {@link HdfsClientConfigKeys#DFS_ENCRYPTION_KEY_PROVIDER_URI},
+   * returning true if that property contains a non-empty, non-whitespace
+   * string.
+   * @param conf configuration to probe
+   * @return true if encryption is considered enabled.
+   */
+  public static boolean isHDFSEncryptionEnabled(Configuration conf) {
+    return !conf.getTrimmed(
+        HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty();
+  }
+
   public static InetSocketAddress getNNAddress(String address) {
     return NetUtils.createSocketAddr(address,
         HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);