You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/29 22:30:17 UTC

[15/50] [abbrv] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
deleted file mode 100755
index de1d1ee..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ /dev/null
@@ -1,918 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.channels.ClosedChannelException;
-import java.util.EnumSet;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.crypto.CryptoProtocolVersion;
-import org.apache.hadoop.fs.CanSetDropBehind;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSOutputSummer;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileEncryptionInfo;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Syncable;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
-import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
-import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
-import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
-import org.apache.hadoop.hdfs.util.ByteArrayManager;
-import org.apache.hadoop.io.EnumSetWritable;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.DataChecksum.Type;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.Time;
-import org.apache.htrace.Sampler;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-
-/****************************************************************
- * DFSOutputStream creates files from a stream of bytes.
- *
- * The client application writes data that is cached internally by
- * this stream. Data is broken up into packets, each packet is
- * typically 64K in size. A packet comprises of chunks. Each chunk
- * is typically 512 bytes and has an associated checksum with it.
- *
- * When a client application fills up the currentPacket, it is
- * enqueued into the dataQueue of DataStreamer. DataStreamer is a
- * thread that picks up packets from the dataQueue and sends it to
- * the first datanode in the pipeline.
- *
- ****************************************************************/
-@InterfaceAudience.Private
-public class DFSOutputStream extends FSOutputSummer
-    implements Syncable, CanSetDropBehind {
-  static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class);
-  /**
-   * Number of times to retry creating a file when there are transient 
-   * errors (typically related to encryption zones and KeyProvider operations).
-   */
-  @VisibleForTesting
-  static final int CREATE_RETRY_COUNT = 10;
-  @VisibleForTesting
-  static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS =
-      CryptoProtocolVersion.supported();
-
-  protected final DFSClient dfsClient;
-  protected final ByteArrayManager byteArrayManager;
-  // closed is accessed by different threads under different locks.
-  protected volatile boolean closed = false;
-
-  protected final String src;
-  protected final long fileId;
-  protected final long blockSize;
-  protected final int bytesPerChecksum;
-
-  protected DFSPacket currentPacket = null;
-  private DataStreamer streamer;
-  protected int packetSize = 0; // write packet size, not including the header.
-  protected int chunksPerPacket = 0;
-  protected long lastFlushOffset = 0; // offset when flush was invoked
-  private long initialFileSize = 0; // at time of file open
-  private final short blockReplication; // replication factor of file
-  protected boolean shouldSyncBlock = false; // force blocks to disk upon close
-  protected final AtomicReference<CachingStrategy> cachingStrategy;
-  private FileEncryptionInfo fileEncryptionInfo;
-
-  /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
-  protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
-      long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
-    final byte[] buf;
-    final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
-
-    try {
-      buf = byteArrayManager.newByteArray(bufferSize);
-    } catch (InterruptedException ie) {
-      final InterruptedIOException iioe = new InterruptedIOException(
-          "seqno=" + seqno);
-      iioe.initCause(ie);
-      throw iioe;
-    }
-
-    return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno,
-                         getChecksumSize(), lastPacketInBlock);
-  }
-
-  @Override
-  protected void checkClosed() throws IOException {
-    if (isClosed()) {
-      getStreamer().getLastException().throwException4Close();
-    }
-  }
-
-  //
-  // returns the list of targets, if any, that is being currently used.
-  //
-  @VisibleForTesting
-  public synchronized DatanodeInfo[] getPipeline() {
-    if (getStreamer().streamerClosed()) {
-      return null;
-    }
-    DatanodeInfo[] currentNodes = getStreamer().getNodes();
-    if (currentNodes == null) {
-      return null;
-    }
-    DatanodeInfo[] value = new DatanodeInfo[currentNodes.length];
-    for (int i = 0; i < currentNodes.length; i++) {
-      value[i] = currentNodes[i];
-    }
-    return value;
-  }
-
-  /** 
-   * @return the object for computing checksum.
-   *         The type is NULL if checksum is not computed.
-   */
-  private static DataChecksum getChecksum4Compute(DataChecksum checksum,
-      HdfsFileStatus stat) {
-    if (DataStreamer.isLazyPersist(stat) && stat.getReplication() == 1) {
-      // do not compute checksum for writing to single replica to memory
-      return DataChecksum.newDataChecksum(Type.NULL,
-          checksum.getBytesPerChecksum());
-    }
-    return checksum;
-  }
- 
-  private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
-      HdfsFileStatus stat, DataChecksum checksum) throws IOException {
-    super(getChecksum4Compute(checksum, stat));
-    this.dfsClient = dfsClient;
-    this.src = src;
-    this.fileId = stat.getFileId();
-    this.blockSize = stat.getBlockSize();
-    this.blockReplication = stat.getReplication();
-    this.fileEncryptionInfo = stat.getFileEncryptionInfo();
-    this.cachingStrategy = new AtomicReference<CachingStrategy>(
-        dfsClient.getDefaultWriteCachingStrategy());
-    if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug(
-          "Set non-null progress callback on DFSOutputStream " + src);
-    }
-    
-    this.bytesPerChecksum = checksum.getBytesPerChecksum();
-    if (bytesPerChecksum <= 0) {
-      throw new HadoopIllegalArgumentException(
-          "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0");
-    }
-    if (blockSize % bytesPerChecksum != 0) {
-      throw new HadoopIllegalArgumentException("Invalid values: "
-          + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
-          + ") must divide block size (=" + blockSize + ").");
-    }
-    this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
-  }
-
-  /** Construct a new output stream for creating a file. */
-  protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
-      EnumSet<CreateFlag> flag, Progressable progress,
-      DataChecksum checksum, String[] favoredNodes) throws IOException {
-    this(dfsClient, src, progress, stat, checksum);
-    this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
-
-    computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum);
-
-    streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
-        cachingStrategy, byteArrayManager, favoredNodes);
-  }
-
-  static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
-      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
-      short replication, long blockSize, Progressable progress, int buffersize,
-      DataChecksum checksum, String[] favoredNodes) throws IOException {
-    TraceScope scope =
-        dfsClient.getPathTraceScope("newStreamForCreate", src);
-    try {
-      HdfsFileStatus stat = null;
-
-      // Retry the create if we get a RetryStartFileException up to a maximum
-      // number of times
-      boolean shouldRetry = true;
-      int retryCount = CREATE_RETRY_COUNT;
-      while (shouldRetry) {
-        shouldRetry = false;
-        try {
-          stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
-              new EnumSetWritable<CreateFlag>(flag), createParent, replication,
-              blockSize, SUPPORTED_CRYPTO_VERSIONS);
-          break;
-        } catch (RemoteException re) {
-          IOException e = re.unwrapRemoteException(
-              AccessControlException.class,
-              DSQuotaExceededException.class,
-              QuotaByStorageTypeExceededException.class,
-              FileAlreadyExistsException.class,
-              FileNotFoundException.class,
-              ParentNotDirectoryException.class,
-              NSQuotaExceededException.class,
-              RetryStartFileException.class,
-              SafeModeException.class,
-              UnresolvedPathException.class,
-              SnapshotAccessControlException.class,
-              UnknownCryptoProtocolVersionException.class);
-          if (e instanceof RetryStartFileException) {
-            if (retryCount > 0) {
-              shouldRetry = true;
-              retryCount--;
-            } else {
-              throw new IOException("Too many retries because of encryption" +
-                  " zone operations", e);
-            }
-          } else {
-            throw e;
-          }
-        }
-      }
-      Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
-      final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
-          flag, progress, checksum, favoredNodes);
-      out.start();
-      return out;
-    } finally {
-      scope.close();
-    }
-  }
-
-  /** Construct a new output stream for append. */
-  private DFSOutputStream(DFSClient dfsClient, String src,
-      EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
-      HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
-          throws IOException {
-    this(dfsClient, src, progress, stat, checksum);
-    initialFileSize = stat.getLen(); // length of file when opened
-    this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);
-
-    boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK);
-
-    this.fileEncryptionInfo = stat.getFileEncryptionInfo();
-
-    // The last partial block of the file has to be filled.
-    if (!toNewBlock && lastBlock != null) {
-      // indicate that we are appending to an existing block
-      streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum,
-          cachingStrategy, byteArrayManager);
-      getStreamer().setBytesCurBlock(lastBlock.getBlockSize());
-      adjustPacketChunkSize(stat);
-      getStreamer().setPipelineInConstruction(lastBlock);
-    } else {
-      computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
-          bytesPerChecksum);
-      streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null,
-          dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
-          favoredNodes);
-    }
-  }
-
-  private void adjustPacketChunkSize(HdfsFileStatus stat) throws IOException{
-
-    long usedInLastBlock = stat.getLen() % blockSize;
-    int freeInLastBlock = (int)(blockSize - usedInLastBlock);
-
-    // calculate the amount of free space in the pre-existing
-    // last crc chunk
-    int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
-    int freeInCksum = bytesPerChecksum - usedInCksum;
-
-    // if there is space in the last block, then we have to
-    // append to that block
-    if (freeInLastBlock == blockSize) {
-      throw new IOException("The last block for file " +
-          src + " is full.");
-    }
-
-    if (usedInCksum > 0 && freeInCksum > 0) {
-      // if there is space in the last partial chunk, then
-      // setup in such a way that the next packet will have only
-      // one chunk that fills up the partial chunk.
-      //
-      computePacketChunkSize(0, freeInCksum);
-      setChecksumBufSize(freeInCksum);
-      getStreamer().setAppendChunk(true);
-    } else {
-      // if the remaining space in the block is smaller than
-      // that expected size of of a packet, then create
-      // smaller size packet.
-      //
-      computePacketChunkSize(
-          Math.min(dfsClient.getConf().getWritePacketSize(), freeInLastBlock),
-          bytesPerChecksum);
-    }
-  }
-
-  static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
-      EnumSet<CreateFlag> flags, int bufferSize, Progressable progress,
-      LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
-      String[] favoredNodes) throws IOException {
-    TraceScope scope =
-        dfsClient.getPathTraceScope("newStreamForAppend", src);
-    try {
-      final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
-          progress, lastBlock, stat, checksum, favoredNodes);
-      out.start();
-      return out;
-    } finally {
-      scope.close();
-    }
-  }
-
-  protected void computePacketChunkSize(int psize, int csize) {
-    final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
-    final int chunkSize = csize + getChecksumSize();
-    chunksPerPacket = Math.max(bodySize/chunkSize, 1);
-    packetSize = chunkSize*chunksPerPacket;
-    if (DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
-                ", chunkSize=" + chunkSize +
-                ", chunksPerPacket=" + chunksPerPacket +
-                ", packetSize=" + packetSize);
-    }
-  }
-
-  protected TraceScope createWriteTraceScope() {
-    return dfsClient.getPathTraceScope("DFSOutputStream#write", src);
-  }
-
-  // @see FSOutputSummer#writeChunk()
-  @Override
-  protected synchronized void writeChunk(byte[] b, int offset, int len,
-      byte[] checksum, int ckoff, int cklen) throws IOException {
-    dfsClient.checkOpen();
-    checkClosed();
-
-    if (len > bytesPerChecksum) {
-      throw new IOException("writeChunk() buffer size is " + len +
-                            " is larger than supported  bytesPerChecksum " +
-                            bytesPerChecksum);
-    }
-    if (cklen != 0 && cklen != getChecksumSize()) {
-      throw new IOException("writeChunk() checksum size is supposed to be " +
-                            getChecksumSize() + " but found to be " + cklen);
-    }
-
-    if (currentPacket == null) {
-      currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
-          .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
-            currentPacket.getSeqno() +
-            ", src=" + src +
-            ", packetSize=" + packetSize +
-            ", chunksPerPacket=" + chunksPerPacket +
-            ", bytesCurBlock=" + getStreamer().getBytesCurBlock());
-      }
-    }
-
-    currentPacket.writeChecksum(checksum, ckoff, cklen);
-    currentPacket.writeData(b, offset, len);
-    currentPacket.incNumChunks();
-    getStreamer().incBytesCurBlock(len);
-
-    // If packet is full, enqueue it for transmission
-    //
-    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
-        getStreamer().getBytesCurBlock() == blockSize) {
-      enqueueCurrentPacketFull();
-    }
-  }
-
-  void enqueueCurrentPacket() throws IOException {
-    getStreamer().waitAndQueuePacket(currentPacket);
-    currentPacket = null;
-  }
-
-  void enqueueCurrentPacketFull() throws IOException {
-    LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
-        + " appendChunk={}, {}", currentPacket, src, getStreamer()
-        .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
-        getStreamer());
-    enqueueCurrentPacket();
-    adjustChunkBoundary();
-    endBlock();
-  }
-
-  /** create an empty packet to mark the end of the block. */
-  void setCurrentPacketToEmpty() throws InterruptedIOException {
-    currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
-        getStreamer().getAndIncCurrentSeqno(), true);
-    currentPacket.setSyncBlock(shouldSyncBlock);
-  }
-
-  /**
-   * If the reopened file did not end at chunk boundary and the above
-   * write filled up its partial chunk. Tell the summer to generate full
-   * crc chunks from now on.
-   */
-  protected void adjustChunkBoundary() {
-    if (getStreamer().getAppendChunk() &&
-        getStreamer().getBytesCurBlock() % bytesPerChecksum == 0) {
-      getStreamer().setAppendChunk(false);
-      resetChecksumBufSize();
-    }
-
-    if (!getStreamer().getAppendChunk()) {
-      int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()),
-          dfsClient.getConf().getWritePacketSize());
-      computePacketChunkSize(psize, bytesPerChecksum);
-    }
-  }
-
-  /**
-   * if encountering a block boundary, send an empty packet to
-   * indicate the end of block and reset bytesCurBlock.
-   *
-   * @throws IOException
-   */
-  protected void endBlock() throws IOException {
-    if (getStreamer().getBytesCurBlock() == blockSize) {
-      setCurrentPacketToEmpty();
-      enqueueCurrentPacket();
-      getStreamer().setBytesCurBlock(0);
-      lastFlushOffset = 0;
-    }
-  }
-  
-  /**
-   * Flushes out to all replicas of the block. The data is in the buffers
-   * of the DNs but not necessarily in the DN's OS buffers.
-   *
-   * It is a synchronous operation. When it returns,
-   * it guarantees that flushed data become visible to new readers. 
-   * It is not guaranteed that data has been flushed to 
-   * persistent store on the datanode. 
-   * Block allocations are persisted on namenode.
-   */
-  @Override
-  public void hflush() throws IOException {
-    TraceScope scope =
-        dfsClient.getPathTraceScope("hflush", src);
-    try {
-      flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
-    } finally {
-      scope.close();
-    }
-  }
-
-  @Override
-  public void hsync() throws IOException {
-    TraceScope scope =
-        dfsClient.getPathTraceScope("hsync", src);
-    try {
-      flushOrSync(true, EnumSet.noneOf(SyncFlag.class));
-    } finally {
-      scope.close();
-    }
-  }
-  
-  /**
-   * The expected semantics is all data have flushed out to all replicas 
-   * and all replicas have done posix fsync equivalent - ie the OS has 
-   * flushed it to the disk device (but the disk may have it in its cache).
-   * 
-   * Note that only the current block is flushed to the disk device.
-   * To guarantee durable sync across block boundaries the stream should
-   * be created with {@link CreateFlag#SYNC_BLOCK}.
-   * 
-   * @param syncFlags
-   *          Indicate the semantic of the sync. Currently used to specify
-   *          whether or not to update the block length in NameNode.
-   */
-  public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
-    TraceScope scope =
-        dfsClient.getPathTraceScope("hsync", src);
-    try {
-      flushOrSync(true, syncFlags);
-    } finally {
-      scope.close();
-    }
-  }
-
-  /**
-   * Flush/Sync buffered data to DataNodes.
-   * 
-   * @param isSync
-   *          Whether or not to require all replicas to flush data to the disk
-   *          device
-   * @param syncFlags
-   *          Indicate extra detailed semantic of the flush/sync. Currently
-   *          mainly used to specify whether or not to update the file length in
-   *          the NameNode
-   * @throws IOException
-   */
-  private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
-      throws IOException {
-    dfsClient.checkOpen();
-    checkClosed();
-    try {
-      long toWaitFor;
-      long lastBlockLength = -1L;
-      boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
-      boolean endBlock = syncFlags.contains(SyncFlag.END_BLOCK);
-      synchronized (this) {
-        // flush checksum buffer, but keep checksum buffer intact if we do not
-        // need to end the current block
-        int numKept = flushBuffer(!endBlock, true);
-        // bytesCurBlock potentially incremented if there was buffered data
-
-        if (DFSClient.LOG.isDebugEnabled()) {
-          DFSClient.LOG.debug("DFSClient flush(): "
-              + " bytesCurBlock=" + getStreamer().getBytesCurBlock()
-              + " lastFlushOffset=" + lastFlushOffset
-              + " createNewBlock=" + endBlock);
-        }
-        // Flush only if we haven't already flushed till this offset.
-        if (lastFlushOffset != getStreamer().getBytesCurBlock()) {
-          assert getStreamer().getBytesCurBlock() > lastFlushOffset;
-          // record the valid offset of this flush
-          lastFlushOffset = getStreamer().getBytesCurBlock();
-          if (isSync && currentPacket == null && !endBlock) {
-            // Nothing to send right now,
-            // but sync was requested.
-            // Send an empty packet if we do not end the block right now
-            currentPacket = createPacket(packetSize, chunksPerPacket,
-                getStreamer().getBytesCurBlock(), getStreamer()
-                    .getAndIncCurrentSeqno(), false);
-          }
-        } else {
-          if (isSync && getStreamer().getBytesCurBlock() > 0 && !endBlock) {
-            // Nothing to send right now,
-            // and the block was partially written,
-            // and sync was requested.
-            // So send an empty sync packet if we do not end the block right
-            // now
-            currentPacket = createPacket(packetSize, chunksPerPacket,
-                getStreamer().getBytesCurBlock(), getStreamer()
-                    .getAndIncCurrentSeqno(), false);
-          } else if (currentPacket != null) {
-            // just discard the current packet since it is already been sent.
-            currentPacket.releaseBuffer(byteArrayManager);
-            currentPacket = null;
-          }
-        }
-        if (currentPacket != null) {
-          currentPacket.setSyncBlock(isSync);
-          enqueueCurrentPacket();
-        }
-        if (endBlock && getStreamer().getBytesCurBlock() > 0) {
-          // Need to end the current block, thus send an empty packet to
-          // indicate this is the end of the block and reset bytesCurBlock
-          currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
-              getStreamer().getAndIncCurrentSeqno(), true);
-          currentPacket.setSyncBlock(shouldSyncBlock || isSync);
-          enqueueCurrentPacket();
-          getStreamer().setBytesCurBlock(0);
-          lastFlushOffset = 0;
-        } else {
-          // Restore state of stream. Record the last flush offset
-          // of the last full chunk that was flushed.
-          getStreamer().setBytesCurBlock(
-              getStreamer().getBytesCurBlock() - numKept);
-        }
-
-        toWaitFor = getStreamer().getLastQueuedSeqno();
-      } // end synchronized
-
-      getStreamer().waitForAckedSeqno(toWaitFor);
-
-      // update the block length first time irrespective of flag
-      if (updateLength || getStreamer().getPersistBlocks().get()) {
-        synchronized (this) {
-          if (!getStreamer().streamerClosed()
-              && getStreamer().getBlock() != null) {
-            lastBlockLength = getStreamer().getBlock().getNumBytes();
-          }
-        }
-      }
-      // If 1) any new blocks were allocated since the last flush, or 2) to
-      // update length in NN is required, then persist block locations on
-      // namenode.
-      if (getStreamer().getPersistBlocks().getAndSet(false) || updateLength) {
-        try {
-          dfsClient.namenode.fsync(src, fileId, dfsClient.clientName,
-              lastBlockLength);
-        } catch (IOException ioe) {
-          DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
-          // If we got an error here, it might be because some other thread called
-          // close before our hflush completed. In that case, we should throw an
-          // exception that the stream is closed.
-          checkClosed();
-          // If we aren't closed but failed to sync, we should expose that to the
-          // caller.
-          throw ioe;
-        }
-      }
-
-      synchronized(this) {
-        if (!getStreamer().streamerClosed()) {
-          getStreamer().setHflush();
-        }
-      }
-    } catch (InterruptedIOException interrupt) {
-      // This kind of error doesn't mean that the stream itself is broken - just the
-      // flushing thread got interrupted. So, we shouldn't close down the writer,
-      // but instead just propagate the error
-      throw interrupt;
-    } catch (IOException e) {
-      DFSClient.LOG.warn("Error while syncing", e);
-      synchronized (this) {
-        if (!isClosed()) {
-          getStreamer().getLastException().set(e);
-          closeThreads(true);
-        }
-      }
-      throw e;
-    }
-  }
-
-  /**
-   * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
-   */
-  @Deprecated
-  public synchronized int getNumCurrentReplicas() throws IOException {
-    return getCurrentBlockReplication();
-  }
-
-  /**
-   * Note that this is not a public API;
-   * use {@link HdfsDataOutputStream#getCurrentBlockReplication()} instead.
-   * 
-   * @return the number of valid replicas of the current block
-   */
-  public synchronized int getCurrentBlockReplication() throws IOException {
-    dfsClient.checkOpen();
-    checkClosed();
-    if (getStreamer().streamerClosed()) {
-      return blockReplication; // no pipeline, return repl factor of file
-    }
-    DatanodeInfo[] currentNodes = getStreamer().getNodes();
-    if (currentNodes == null) {
-      return blockReplication; // no pipeline, return repl factor of file
-    }
-    return currentNodes.length;
-  }
-  
-  /**
-   * Waits till all existing data is flushed and confirmations 
-   * received from datanodes. 
-   */
-  protected void flushInternal() throws IOException {
-    long toWaitFor;
-    synchronized (this) {
-      dfsClient.checkOpen();
-      checkClosed();
-      //
-      // If there is data in the current buffer, send it across
-      //
-      getStreamer().queuePacket(currentPacket);
-      currentPacket = null;
-      toWaitFor = getStreamer().getLastQueuedSeqno();
-    }
-
-    getStreamer().waitForAckedSeqno(toWaitFor);
-  }
-
-  protected synchronized void start() {
-    getStreamer().start();
-  }
-  
-  /**
-   * Aborts this output stream and releases any system 
-   * resources associated with this stream.
-   */
-  synchronized void abort() throws IOException {
-    if (isClosed()) {
-      return;
-    }
-    getStreamer().getLastException().set(new IOException("Lease timeout of "
-        + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired."));
-    closeThreads(true);
-    dfsClient.endFileLease(fileId);
-  }
-
-  boolean isClosed() {
-    return closed || getStreamer().streamerClosed();
-  }
-
-  void setClosed() {
-    closed = true;
-    getStreamer().release();
-  }
-
-  // shutdown datastreamer and responseprocessor threads.
-  // interrupt datastreamer if force is true
-  protected void closeThreads(boolean force) throws IOException {
-    try {
-      getStreamer().close(force);
-      getStreamer().join();
-      getStreamer().closeSocket();
-    } catch (InterruptedException e) {
-      throw new IOException("Failed to shutdown streamer");
-    } finally {
-      getStreamer().setSocketToNull();
-      setClosed();
-    }
-  }
-  
-  /**
-   * Closes this output stream and releases any system 
-   * resources associated with this stream.
-   */
-  @Override
-  public synchronized void close() throws IOException {
-    TraceScope scope =
-        dfsClient.getPathTraceScope("DFSOutputStream#close", src);
-    try {
-      closeImpl();
-    } finally {
-      scope.close();
-    }
-  }
-
-  protected synchronized void closeImpl() throws IOException {
-    if (isClosed()) {
-      getStreamer().getLastException().check(true);
-      return;
-    }
-
-    try {
-      flushBuffer();       // flush from all upper layers
-
-      if (currentPacket != null) {
-        enqueueCurrentPacket();
-      }
-
-      if (getStreamer().getBytesCurBlock() != 0) {
-        setCurrentPacketToEmpty();
-      }
-
-      flushInternal();             // flush all data to Datanodes
-      // get last block before destroying the streamer
-      ExtendedBlock lastBlock = getStreamer().getBlock();
-      closeThreads(false);
-      TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
-      try {
-        completeFile(lastBlock);
-      } finally {
-        scope.close();
-      }
-      dfsClient.endFileLease(fileId);
-    } catch (ClosedChannelException e) {
-    } finally {
-      setClosed();
-    }
-  }
-
-  // should be called holding (this) lock since setTestFilename() may 
-  // be called during unit tests
-  protected void completeFile(ExtendedBlock last) throws IOException {
-    long localstart = Time.monotonicNow();
-    final DfsClientConf conf = dfsClient.getConf();
-    long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
-    boolean fileComplete = false;
-    int retries = conf.getNumBlockWriteLocateFollowingRetry();
-    while (!fileComplete) {
-      fileComplete =
-          dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
-      if (!fileComplete) {
-        final int hdfsTimeout = conf.getHdfsTimeout();
-        if (!dfsClient.clientRunning
-            || (hdfsTimeout > 0
-                && localstart + hdfsTimeout < Time.monotonicNow())) {
-            String msg = "Unable to close file because dfsclient " +
-                          " was unable to contact the HDFS servers." +
-                          " clientRunning " + dfsClient.clientRunning +
-                          " hdfsTimeout " + hdfsTimeout;
-            DFSClient.LOG.info(msg);
-            throw new IOException(msg);
-        }
-        try {
-          if (retries == 0) {
-            throw new IOException("Unable to close file because the last block"
-                + " does not have enough number of replicas.");
-          }
-          retries--;
-          Thread.sleep(sleeptime);
-          sleeptime *= 2;
-          if (Time.monotonicNow() - localstart > 5000) {
-            DFSClient.LOG.info("Could not complete " + src + " retrying...");
-          }
-        } catch (InterruptedException ie) {
-          DFSClient.LOG.warn("Caught exception ", ie);
-        }
-      }
-    }
-  }
-
-  @VisibleForTesting
-  public void setArtificialSlowdown(long period) {
-    getStreamer().setArtificialSlowdown(period);
-  }
-
-  @VisibleForTesting
-  public synchronized void setChunksPerPacket(int value) {
-    chunksPerPacket = Math.min(chunksPerPacket, value);
-    packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket;
-  }
-
-  /**
-   * Returns the size of a file as it was when this stream was opened
-   */
-  public long getInitialLen() {
-    return initialFileSize;
-  }
-
-  /**
-   * @return the FileEncryptionInfo for this stream, or null if not encrypted.
-   */
-  public FileEncryptionInfo getFileEncryptionInfo() {
-    return fileEncryptionInfo;
-  }
-
-  /**
-   * Returns the access token currently used by streamer, for testing only
-   */
-  synchronized Token<BlockTokenIdentifier> getBlockToken() {
-    return getStreamer().getBlockToken();
-  }
-
-  @Override
-  public void setDropBehind(Boolean dropBehind) throws IOException {
-    CachingStrategy prevStrategy, nextStrategy;
-    // CachingStrategy is immutable.  So build a new CachingStrategy with the
-    // modifications we want, and compare-and-swap it in.
-    do {
-      prevStrategy = this.cachingStrategy.get();
-      nextStrategy = new CachingStrategy.Builder(prevStrategy).
-                        setDropBehind(dropBehind).build();
-    } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy));
-  }
-
-  @VisibleForTesting
-  ExtendedBlock getBlock() {
-    return getStreamer().getBlock();
-  }
-
-  @VisibleForTesting
-  public long getFileId() {
-    return fileId;
-  }
-
-  /**
-   * Return the source of stream.
-   */
-  String getSrc() {
-    return src;
-  }
-
-  /**
-   * Returns the data streamer object.
-   */
-  protected DataStreamer getStreamer() {
-    return streamer;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
deleted file mode 100755
index 22055c3..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.BufferOverflowException;
-import java.nio.channels.ClosedChannelException;
-import java.util.Arrays;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.util.ByteArrayManager;
-import org.apache.htrace.Span;
-
-/****************************************************************
- * DFSPacket is used by DataStreamer and DFSOutputStream.
- * DFSOutputStream generates packets and then ask DatStreamer
- * to send them to datanodes.
- ****************************************************************/
-
-@InterfaceAudience.Private
-class DFSPacket {
-  public static final long HEART_BEAT_SEQNO = -1L;
-  private static long[] EMPTY = new long[0];
-  private final long seqno; // sequence number of buffer in block
-  private final long offsetInBlock; // offset in block
-  private boolean syncBlock; // this packet forces the current block to disk
-  private int numChunks; // number of chunks currently in packet
-  private final int maxChunks; // max chunks in packet
-  private byte[] buf;
-  private final boolean lastPacketInBlock; // is this the last packet in block?
-
-  /**
-   * buf is pointed into like follows:
-   *  (C is checksum data, D is payload data)
-   *
-   * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
-   *           ^        ^               ^               ^
-   *           |        checksumPos     dataStart       dataPos
-   *           checksumStart
-   *
-   * Right before sending, we move the checksum data to immediately precede
-   * the actual data, and then insert the header into the buffer immediately
-   * preceding the checksum data, so we make sure to keep enough space in
-   * front of the checksum data to support the largest conceivable header.
-   */
-  private int checksumStart;
-  private int checksumPos;
-  private final int dataStart;
-  private int dataPos;
-  private long[] traceParents = EMPTY;
-  private int traceParentsUsed;
-  private Span span;
-
-  /**
-   * Create a new packet.
-   *
-   * @param buf the buffer storing data and checksums
-   * @param chunksPerPkt maximum number of chunks per packet.
-   * @param offsetInBlock offset in bytes into the HDFS block.
-   * @param seqno the sequence number of this packet
-   * @param checksumSize the size of checksum
-   * @param lastPacketInBlock if this is the last packet
-   */
-  DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
-                   int checksumSize, boolean lastPacketInBlock) {
-    this.lastPacketInBlock = lastPacketInBlock;
-    this.numChunks = 0;
-    this.offsetInBlock = offsetInBlock;
-    this.seqno = seqno;
-
-    this.buf = buf;
-
-    checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
-    checksumPos = checksumStart;
-    dataStart = checksumStart + (chunksPerPkt * checksumSize);
-    dataPos = dataStart;
-    maxChunks = chunksPerPkt;
-  }
-
-  /**
-   * Write data to this packet.
-   *
-   * @param inarray input array of data
-   * @param off the offset of data to write
-   * @param len the length of data to write
-   * @throws ClosedChannelException
-   */
-  synchronized void writeData(byte[] inarray, int off, int len)
-      throws ClosedChannelException {
-    checkBuffer();
-    if (dataPos + len > buf.length) {
-      throw new BufferOverflowException();
-    }
-    System.arraycopy(inarray, off, buf, dataPos, len);
-    dataPos += len;
-  }
-
-  /**
-   * Write checksums to this packet
-   *
-   * @param inarray input array of checksums
-   * @param off the offset of checksums to write
-   * @param len the length of checksums to write
-   * @throws ClosedChannelException
-   */
-  synchronized void writeChecksum(byte[] inarray, int off, int len)
-      throws ClosedChannelException {
-    checkBuffer();
-    if (len == 0) {
-      return;
-    }
-    if (checksumPos + len > dataStart) {
-      throw new BufferOverflowException();
-    }
-    System.arraycopy(inarray, off, buf, checksumPos, len);
-    checksumPos += len;
-  }
-
-  /**
-   * Write the full packet, including the header, to the given output stream.
-   *
-   * @param stm
-   * @throws IOException
-   */
-  synchronized void writeTo(DataOutputStream stm) throws IOException {
-    checkBuffer();
-
-    final int dataLen = dataPos - dataStart;
-    final int checksumLen = checksumPos - checksumStart;
-    final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
-
-    PacketHeader header = new PacketHeader(
-        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
-
-    if (checksumPos != dataStart) {
-      // Move the checksum to cover the gap. This can happen for the last
-      // packet or during an hflush/hsync call.
-      System.arraycopy(buf, checksumStart, buf,
-          dataStart - checksumLen , checksumLen);
-      checksumPos = dataStart;
-      checksumStart = checksumPos - checksumLen;
-    }
-
-    final int headerStart = checksumStart - header.getSerializedSize();
-    assert checksumStart + 1 >= header.getSerializedSize();
-    assert headerStart >= 0;
-    assert headerStart + header.getSerializedSize() == checksumStart;
-
-    // Copy the header data into the buffer immediately preceding the checksum
-    // data.
-    System.arraycopy(header.getBytes(), 0, buf, headerStart,
-        header.getSerializedSize());
-
-    // corrupt the data for testing.
-    if (DFSClientFaultInjector.get().corruptPacket()) {
-      buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
-    }
-
-    // Write the now contiguous full packet to the output stream.
-    stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
-
-    // undo corruption.
-    if (DFSClientFaultInjector.get().uncorruptPacket()) {
-      buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
-    }
-  }
-
-  private synchronized void checkBuffer() throws ClosedChannelException {
-    if (buf == null) {
-      throw new ClosedChannelException();
-    }
-  }
-
-  /**
-   * Release the buffer in this packet to ByteArrayManager.
-   *
-   * @param bam
-   */
-  synchronized void releaseBuffer(ByteArrayManager bam) {
-    bam.release(buf);
-    buf = null;
-  }
-
-  /**
-   * get the packet's last byte's offset in the block
-   *
-   * @return the packet's last byte's offset in the block
-   */
-  synchronized long getLastByteOffsetBlock() {
-    return offsetInBlock + dataPos - dataStart;
-  }
-
-  /**
-   * Check if this packet is a heart beat packet
-   *
-   * @return true if the sequence number is HEART_BEAT_SEQNO
-   */
-  boolean isHeartbeatPacket() {
-    return seqno == HEART_BEAT_SEQNO;
-  }
-
-  /**
-   * check if this packet is the last packet in block
-   *
-   * @return true if the packet is the last packet
-   */
-  boolean isLastPacketInBlock(){
-    return lastPacketInBlock;
-  }
-
-  /**
-   * get sequence number of this packet
-   *
-   * @return the sequence number of this packet
-   */
-  long getSeqno(){
-    return seqno;
-  }
-
-  /**
-   * get the number of chunks this packet contains
-   *
-   * @return the number of chunks in this packet
-   */
-  synchronized int getNumChunks(){
-    return numChunks;
-  }
-
-  /**
-   * increase the number of chunks by one
-   */
-  synchronized void incNumChunks(){
-    numChunks++;
-  }
-
-  /**
-   * get the maximum number of packets
-   *
-   * @return the maximum number of packets
-   */
-  int getMaxChunks(){
-    return maxChunks;
-  }
-
-  /**
-   * set if to sync block
-   *
-   * @param syncBlock if to sync block
-   */
-  synchronized void setSyncBlock(boolean syncBlock){
-    this.syncBlock = syncBlock;
-  }
-
-  @Override
-  public String toString() {
-    return "packet seqno: " + this.seqno +
-        " offsetInBlock: " + this.offsetInBlock +
-        " lastPacketInBlock: " + this.lastPacketInBlock +
-        " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
-  }
-
-  /**
-   * Add a trace parent span for this packet.<p/>
-   *
-   * Trace parent spans for a packet are the trace spans responsible for
-   * adding data to that packet.  We store them as an array of longs for
-   * efficiency.<p/>
-   *
-   * Protected by the DFSOutputStream dataQueue lock.
-   */
-  public void addTraceParent(Span span) {
-    if (span == null) {
-      return;
-    }
-    addTraceParent(span.getSpanId());
-  }
-
-  public void addTraceParent(long id) {
-    if (traceParentsUsed == traceParents.length) {
-      int newLength = (traceParents.length == 0) ? 8 :
-          traceParents.length * 2;
-      traceParents = Arrays.copyOf(traceParents, newLength);
-    }
-    traceParents[traceParentsUsed] = id;
-    traceParentsUsed++;
-  }
-
-  /**
-   * Get the trace parent spans for this packet.<p/>
-   *
-   * Will always be non-null.<p/>
-   *
-   * Protected by the DFSOutputStream dataQueue lock.
-   */
-  public long[] getTraceParents() {
-    // Remove duplicates from the array.
-    int len = traceParentsUsed;
-    Arrays.sort(traceParents, 0, len);
-    int i = 0, j = 0;
-    long prevVal = 0; // 0 is not a valid span id
-    while (true) {
-      if (i == len) {
-        break;
-      }
-      long val = traceParents[i];
-      if (val != prevVal) {
-        traceParents[j] = val;
-        j++;
-        prevVal = val;
-      }
-      i++;
-    }
-    if (j < traceParents.length) {
-      traceParents = Arrays.copyOf(traceParents, j);
-      traceParentsUsed = traceParents.length;
-    }
-    return traceParents;
-  }
-
-  public void setTraceSpan(Span span) {
-    this.span = span;
-  }
-
-  public Span getTraceSpan() {
-    return span;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index fe9e342..5b11ac2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -1441,27 +1440,4 @@ public class DFSUtil {
     return cryptoProvider;
   }
 
-  public static int getIoFileBufferSize(Configuration conf) {
-    return conf.getInt(
-      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
-      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
-  }
-
-  public static int getSmallBufferSize(Configuration conf) {
-    return Math.min(getIoFileBufferSize(conf) / 2, 512);
-  }
-
-  /**
-   * Probe for HDFS Encryption being enabled; this uses the value of
-   * the option {@link DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI},
-   * returning true if that property contains a non-empty, non-whitespace
-   * string.
-   * @param conf configuration to probe
-   * @return true if encryption is considered enabled.
-   */
-  public static boolean isHDFSEncryptionEnabled(Configuration conf) {
-    return !conf.getTrimmed(
-        DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty();
-  }
-
 }