You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/30 17:41:02 UTC

[02/58] [abbrv] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
deleted file mode 100644
index 4a016bd..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ /dev/null
@@ -1,1903 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
-
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
-import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
-import org.apache.hadoop.hdfs.util.ByteArrayManager;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.Time;
-import org.apache.htrace.NullScope;
-import org.apache.htrace.Sampler;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceInfo;
-import org.apache.htrace.TraceScope;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-
-/*********************************************************************
- *
- * The DataStreamer class is responsible for sending data packets to the
- * datanodes in the pipeline. It retrieves a new blockid and block locations
- * from the namenode, and starts streaming packets to the pipeline of
- * Datanodes. Every packet has a sequence number associated with
- * it. When all the packets for a block are sent out and acks for each
- * if them are received, the DataStreamer closes the current block.
- *
- * The DataStreamer thread picks up packets from the dataQueue, sends it to
- * the first datanode in the pipeline and moves it from the dataQueue to the
- * ackQueue. The ResponseProcessor receives acks from the datanodes. When an
- * successful ack for a packet is received from all datanodes, the
- * ResponseProcessor removes the corresponding packet from the ackQueue.
- *
- * In case of error, all outstanding packets are moved from ackQueue. A new
- * pipeline is setup by eliminating the bad datanode from the original
- * pipeline. The DataStreamer now starts sending packets from the dataQueue.
- *
- *********************************************************************/
-
-@InterfaceAudience.Private
-class DataStreamer extends Daemon {
-  static final Log LOG = LogFactory.getLog(DataStreamer.class);
-
-  /**
-   * Create a socket for a write pipeline
-   *
-   * @param first the first datanode
-   * @param length the pipeline length
-   * @param client client
-   * @return the socket connected to the first datanode
-   */
-  static Socket createSocketForPipeline(final DatanodeInfo first,
-      final int length, final DFSClient client) throws IOException {
-    final DfsClientConf conf = client.getConf();
-    final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Connecting to datanode " + dnAddr);
-    }
-    final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
-    final Socket sock = client.socketFactory.createSocket();
-    final int timeout = client.getDatanodeReadTimeout(length);
-    NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout());
-    sock.setSoTimeout(timeout);
-    sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Send buf size " + sock.getSendBufferSize());
-    }
-    return sock;
-  }
-
-  /**
-   * if this file is lazy persist
-   *
-   * @param stat the HdfsFileStatus of a file
-   * @return if this file is lazy persist
-   */
-  static boolean isLazyPersist(HdfsFileStatus stat) {
-    return stat.getStoragePolicy() == HdfsConstants.MEMORY_STORAGE_POLICY_ID;
-  }
-
-  /**
-   * release a list of packets to ByteArrayManager
-   *
-   * @param packets packets to be release
-   * @param bam ByteArrayManager
-   */
-  private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam) {
-    for(DFSPacket p : packets) {
-      p.releaseBuffer(bam);
-    }
-    packets.clear();
-  }
-  
-  static class LastExceptionInStreamer {
-    private IOException thrown;
-
-    synchronized void set(Throwable t) {
-      assert t != null;
-      this.thrown = t instanceof IOException ?
-          (IOException) t : new IOException(t);
-    }
-
-    synchronized void clear() {
-      thrown = null;
-    }
-
-    /** Check if there already is an exception. */
-    synchronized void check(boolean resetToNull) throws IOException {
-      if (thrown != null) {
-        if (LOG.isTraceEnabled()) {
-          // wrap and print the exception to know when the check is called
-          LOG.trace("Got Exception while checking", new Throwable(thrown));
-        }
-        final IOException e = thrown;
-        if (resetToNull) {
-          thrown = null;
-        }
-        throw e;
-      }
-    }
-
-    synchronized void throwException4Close() throws IOException {
-      check(false);
-      throw new ClosedChannelException();
-    }
-  }
-
-  static class ErrorState {
-    private boolean error = false;
-    private int badNodeIndex = -1;
-    private int restartingNodeIndex = -1;
-    private long restartingNodeDeadline = 0;
-    private final long datanodeRestartTimeout;
-
-    ErrorState(long datanodeRestartTimeout) {
-      this.datanodeRestartTimeout = datanodeRestartTimeout;
-    }
-
-    synchronized void reset() {
-      error = false;
-      badNodeIndex = -1;
-      restartingNodeIndex = -1;
-      restartingNodeDeadline = 0;
-    }
-
-    synchronized boolean hasError() {
-      return error;
-    }
-
-    synchronized boolean hasDatanodeError() {
-      return error && isNodeMarked();
-    }
-
-    synchronized void setError(boolean err) {
-      this.error = err;
-    }
-
-    synchronized void setBadNodeIndex(int index) {
-      this.badNodeIndex = index;
-    }
-
-    synchronized int getBadNodeIndex() {
-      return badNodeIndex;
-    }
-
-    synchronized int getRestartingNodeIndex() {
-      return restartingNodeIndex;
-    }
-
-    synchronized void initRestartingNode(int i, String message) {
-      restartingNodeIndex = i;
-      restartingNodeDeadline =  Time.monotonicNow() + datanodeRestartTimeout;
-      // If the data streamer has already set the primary node
-      // bad, clear it. It is likely that the write failed due to
-      // the DN shutdown. Even if it was a real failure, the pipeline
-      // recovery will take care of it.
-      badNodeIndex = -1;
-      LOG.info(message);
-    }
-
-    synchronized boolean isRestartingNode() {
-      return restartingNodeIndex >= 0;
-    }
-
-    synchronized boolean isNodeMarked() {
-      return badNodeIndex >= 0 || isRestartingNode();
-    }
-
-    /**
-     * This method is used when no explicit error report was received, but
-     * something failed. The first node is a suspect or unsure about the cause
-     * so that it is marked as failed.
-     */
-    synchronized void markFirstNodeIfNotMarked() {
-      // There should be no existing error and no ongoing restart.
-      if (!isNodeMarked()) {
-        badNodeIndex = 0;
-      }
-    }
-
-    synchronized void adjustState4RestartingNode() {
-      // Just took care of a node error while waiting for a node restart
-      if (restartingNodeIndex >= 0) {
-        // If the error came from a node further away than the restarting
-        // node, the restart must have been complete.
-        if (badNodeIndex > restartingNodeIndex) {
-          restartingNodeIndex = -1;
-        } else if (badNodeIndex < restartingNodeIndex) {
-          // the node index has shifted.
-          restartingNodeIndex--;
-        } else {
-          throw new IllegalStateException("badNodeIndex = " + badNodeIndex
-              + " = restartingNodeIndex = " + restartingNodeIndex);
-        }
-      }
-
-      if (!isRestartingNode()) {
-        error = false;
-      }
-      badNodeIndex = -1;
-    }
-
-    synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) {
-      if (restartingNodeIndex >= 0) {
-        if (!error) {
-          throw new IllegalStateException("error=false while checking" +
-              " restarting node deadline");
-        }
-
-        // check badNodeIndex
-        if (badNodeIndex == restartingNodeIndex) {
-          // ignore, if came from the restarting node
-          badNodeIndex = -1;
-        }
-        // not within the deadline
-        if (Time.monotonicNow() >= restartingNodeDeadline) {
-          // expired. declare the restarting node dead
-          restartingNodeDeadline = 0;
-          final int i = restartingNodeIndex;
-          restartingNodeIndex = -1;
-          LOG.warn("Datanode " + i + " did not restart within "
-              + datanodeRestartTimeout + "ms: " + nodes[i]);
-          // Mark the restarting node as failed. If there is any other failed
-          // node during the last pipeline construction attempt, it will not be
-          // overwritten/dropped. In this case, the restarting node will get
-          // excluded in the following attempt, if it still does not come up.
-          if (badNodeIndex == -1) {
-            badNodeIndex = i;
-          }
-        }
-      }
-    }
-  }
-
-  private volatile boolean streamerClosed = false;
-  private ExtendedBlock block; // its length is number of bytes acked
-  private Token<BlockTokenIdentifier> accessToken;
-  private DataOutputStream blockStream;
-  private DataInputStream blockReplyStream;
-  private ResponseProcessor response = null;
-  private volatile DatanodeInfo[] nodes = null; // list of targets for current block
-  private volatile StorageType[] storageTypes = null;
-  private volatile String[] storageIDs = null;
-  private final ErrorState errorState;
-
-  private BlockConstructionStage stage;  // block construction stage
-  private long bytesSent = 0; // number of bytes that've been sent
-  private final boolean isLazyPersistFile;
-
-  /** Nodes have been used in the pipeline before and have failed. */
-  private final List<DatanodeInfo> failed = new ArrayList<>();
-  /** The last ack sequence number before pipeline failure. */
-  private long lastAckedSeqnoBeforeFailure = -1;
-  private int pipelineRecoveryCount = 0;
-  /** Has the current block been hflushed? */
-  private boolean isHflushed = false;
-  /** Append on an existing block? */
-  private final boolean isAppend;
-
-  private long currentSeqno = 0;
-  private long lastQueuedSeqno = -1;
-  private long lastAckedSeqno = -1;
-  private long bytesCurBlock = 0; // bytes written in current block
-  private final LastExceptionInStreamer lastException = new LastExceptionInStreamer();
-  private Socket s;
-
-  private final DFSClient dfsClient;
-  private final String src;
-  /** Only for DataTransferProtocol.writeBlock(..) */
-  private final DataChecksum checksum4WriteBlock;
-  private final Progressable progress;
-  private final HdfsFileStatus stat;
-  // appending to existing partial block
-  private volatile boolean appendChunk = false;
-  // both dataQueue and ackQueue are protected by dataQueue lock
-  private final LinkedList<DFSPacket> dataQueue = new LinkedList<>();
-  private final LinkedList<DFSPacket> ackQueue = new LinkedList<>();
-  private final AtomicReference<CachingStrategy> cachingStrategy;
-  private final ByteArrayManager byteArrayManager;
-  //persist blocks on namenode
-  private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
-  private boolean failPacket = false;
-  private final long dfsclientSlowLogThresholdMs;
-  private long artificialSlowdown = 0;
-  // List of congested data nodes. The stream will back off if the DataNodes
-  // are congested
-  private final List<DatanodeInfo> congestedNodes = new ArrayList<>();
-  private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
-  private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
-      CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
-  private int lastCongestionBackoffTime;
-
-  private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
-  private final String[] favoredNodes;
-
-  private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src,
-                       Progressable progress, DataChecksum checksum,
-                       AtomicReference<CachingStrategy> cachingStrategy,
-                       ByteArrayManager byteArrayManage,
-                       boolean isAppend, String[] favoredNodes) {
-    this.dfsClient = dfsClient;
-    this.src = src;
-    this.progress = progress;
-    this.stat = stat;
-    this.checksum4WriteBlock = checksum;
-    this.cachingStrategy = cachingStrategy;
-    this.byteArrayManager = byteArrayManage;
-    this.isLazyPersistFile = isLazyPersist(stat);
-    this.isAppend = isAppend;
-    this.favoredNodes = favoredNodes;
-
-    final DfsClientConf conf = dfsClient.getConf();
-    this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs();
-    this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry());
-    this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
-  }
-
-  /**
-   * construction with tracing info
-   */
-  DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient,
-               String src, Progressable progress, DataChecksum checksum,
-               AtomicReference<CachingStrategy> cachingStrategy,
-               ByteArrayManager byteArrayManage, String[] favoredNodes) {
-    this(stat, dfsClient, src, progress, checksum, cachingStrategy,
-        byteArrayManage, false, favoredNodes);
-    this.block = block;
-    stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
-  }
-
-  /**
-   * Construct a data streamer for appending to the last partial block
-   * @param lastBlock last block of the file to be appended
-   * @param stat status of the file to be appended
-   * @throws IOException if error occurs
-   */
-  DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, DFSClient dfsClient,
-               String src, Progressable progress, DataChecksum checksum,
-               AtomicReference<CachingStrategy> cachingStrategy,
-               ByteArrayManager byteArrayManage) throws IOException {
-    this(stat, dfsClient, src, progress, checksum, cachingStrategy,
-        byteArrayManage, true, null);
-    stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
-    block = lastBlock.getBlock();
-    bytesSent = block.getNumBytes();
-    accessToken = lastBlock.getBlockToken();
-  }
-
-  /**
-   * Set pipeline in construction
-   *
-   * @param lastBlock the last block of a file
-   * @throws IOException
-   */
-  void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException{
-    // setup pipeline to append to the last block XXX retries??
-    setPipeline(lastBlock);
-    if (nodes.length < 1) {
-      throw new IOException("Unable to retrieve blocks locations " +
-          " for last block " + block +
-          "of file " + src);
-    }
-  }
-
-  private void setPipeline(LocatedBlock lb) {
-    setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
-  }
-
-  private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
-                           String[] storageIDs) {
-    this.nodes = nodes;
-    this.storageTypes = storageTypes;
-    this.storageIDs = storageIDs;
-  }
-
-  /**
-   * Initialize for data streaming
-   */
-  private void initDataStreaming() {
-    this.setName("DataStreamer for file " + src +
-        " block " + block);
-    response = new ResponseProcessor(nodes);
-    response.start();
-    stage = BlockConstructionStage.DATA_STREAMING;
-  }
-
-  private void endBlock() {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Closing old block " + block);
-    }
-    this.setName("DataStreamer for file " + src);
-    closeResponder();
-    closeStream();
-    setPipeline(null, null, null);
-    stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
-  }
-
-  private boolean shouldStop() {
-    return streamerClosed || errorState.hasError() || !dfsClient.clientRunning;
-  }
-
-  /*
-   * streamer thread is the only thread that opens streams to datanode,
-   * and closes them. Any error recovery is also done by this thread.
-   */
-  @Override
-  public void run() {
-    long lastPacket = Time.monotonicNow();
-    TraceScope scope = NullScope.INSTANCE;
-    while (!streamerClosed && dfsClient.clientRunning) {
-      // if the Responder encountered an error, shutdown Responder
-      if (errorState.hasError() && response != null) {
-        try {
-          response.close();
-          response.join();
-          response = null;
-        } catch (InterruptedException  e) {
-          LOG.warn("Caught exception", e);
-        }
-      }
-
-      DFSPacket one;
-      try {
-        // process datanode IO errors if any
-        boolean doSleep = processDatanodeError();
-
-        final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2; 
-        synchronized (dataQueue) {
-          // wait for a packet to be sent.
-          long now = Time.monotonicNow();
-          while ((!shouldStop() && dataQueue.size() == 0 &&
-              (stage != BlockConstructionStage.DATA_STREAMING ||
-                  stage == BlockConstructionStage.DATA_STREAMING &&
-                      now - lastPacket < halfSocketTimeout)) || doSleep ) {
-            long timeout = halfSocketTimeout - (now-lastPacket);
-            timeout = timeout <= 0 ? 1000 : timeout;
-            timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
-                timeout : 1000;
-            try {
-              dataQueue.wait(timeout);
-            } catch (InterruptedException  e) {
-              LOG.warn("Caught exception", e);
-            }
-            doSleep = false;
-            now = Time.monotonicNow();
-          }
-          if (shouldStop()) {
-            continue;
-          }
-          // get packet to be sent.
-          if (dataQueue.isEmpty()) {
-            one = createHeartbeatPacket();
-          } else {
-            try {
-              backOffIfNecessary();
-            } catch (InterruptedException e) {
-              LOG.warn("Caught exception", e);
-            }
-            one = dataQueue.getFirst(); // regular data packet
-            long parents[] = one.getTraceParents();
-            if (parents.length > 0) {
-              scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0]));
-              // TODO: use setParents API once it's available from HTrace 3.2
-              // scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS);
-              // scope.getSpan().setParents(parents);
-            }
-          }
-        }
-
-        // get new block from namenode.
-        if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("Allocating new block");
-          }
-          setPipeline(nextBlockOutputStream());
-          initDataStreaming();
-        } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("Append to block " + block);
-          }
-          setupPipelineForAppendOrRecovery();
-          if (streamerClosed) {
-            continue;
-          }
-          initDataStreaming();
-        }
-
-        long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
-        if (lastByteOffsetInBlock > stat.getBlockSize()) {
-          throw new IOException("BlockSize " + stat.getBlockSize() +
-              " is smaller than data size. " +
-              " Offset of packet in block " +
-              lastByteOffsetInBlock +
-              " Aborting file " + src);
-        }
-
-        if (one.isLastPacketInBlock()) {
-          // wait for all data packets have been successfully acked
-          synchronized (dataQueue) {
-            while (!shouldStop() && ackQueue.size() != 0) {
-              try {
-                // wait for acks to arrive from datanodes
-                dataQueue.wait(1000);
-              } catch (InterruptedException  e) {
-                LOG.warn("Caught exception", e);
-              }
-            }
-          }
-          if (shouldStop()) {
-            continue;
-          }
-          stage = BlockConstructionStage.PIPELINE_CLOSE;
-        }
-
-        // send the packet
-        Span span = null;
-        synchronized (dataQueue) {
-          // move packet from dataQueue to ackQueue
-          if (!one.isHeartbeatPacket()) {
-            span = scope.detach();
-            one.setTraceSpan(span);
-            dataQueue.removeFirst();
-            ackQueue.addLast(one);
-            dataQueue.notifyAll();
-          }
-        }
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("DataStreamer block " + block +
-              " sending packet " + one);
-        }
-
-        // write out data to remote datanode
-        TraceScope writeScope = Trace.startSpan("writeTo", span);
-        try {
-          one.writeTo(blockStream);
-          blockStream.flush();
-        } catch (IOException e) {
-          // HDFS-3398 treat primary DN is down since client is unable to
-          // write to primary DN. If a failed or restarting node has already
-          // been recorded by the responder, the following call will have no
-          // effect. Pipeline recovery can handle only one node error at a
-          // time. If the primary node fails again during the recovery, it
-          // will be taken out then.
-          errorState.markFirstNodeIfNotMarked();
-          throw e;
-        } finally {
-          writeScope.close();
-        }
-        lastPacket = Time.monotonicNow();
-
-        // update bytesSent
-        long tmpBytesSent = one.getLastByteOffsetBlock();
-        if (bytesSent < tmpBytesSent) {
-          bytesSent = tmpBytesSent;
-        }
-
-        if (shouldStop()) {
-          continue;
-        }
-
-        // Is this block full?
-        if (one.isLastPacketInBlock()) {
-          // wait for the close packet has been acked
-          synchronized (dataQueue) {
-            while (!shouldStop() && ackQueue.size() != 0) {
-              dataQueue.wait(1000);// wait for acks to arrive from datanodes
-            }
-          }
-          if (shouldStop()) {
-            continue;
-          }
-
-          endBlock();
-        }
-        if (progress != null) { progress.progress(); }
-
-        // This is used by unit test to trigger race conditions.
-        if (artificialSlowdown != 0 && dfsClient.clientRunning) {
-          Thread.sleep(artificialSlowdown);
-        }
-      } catch (Throwable e) {
-        // Log warning if there was a real error.
-        if (!errorState.isRestartingNode()) {
-          // Since their messages are descriptive enough, do not always
-          // log a verbose stack-trace WARN for quota exceptions.
-          if (e instanceof QuotaExceededException) {
-            LOG.debug("DataStreamer Quota Exception", e);
-          } else {
-            LOG.warn("DataStreamer Exception", e);
-          }
-        }
-        lastException.set(e);
-        assert !(e instanceof NullPointerException);
-        errorState.setError(true);
-        if (!errorState.isNodeMarked()) {
-          // Not a datanode issue
-          streamerClosed = true;
-        }
-      } finally {
-        scope.close();
-      }
-    }
-    closeInternal();
-  }
-
-  private void closeInternal() {
-    closeResponder();       // close and join
-    closeStream();
-    streamerClosed = true;
-    release();
-    synchronized (dataQueue) {
-      dataQueue.notifyAll();
-    }
-  }
-
-  /**
-   * release the DFSPackets in the two queues
-   *
-   */
-  void release() {
-    synchronized (dataQueue) {
-      releaseBuffer(dataQueue, byteArrayManager);
-      releaseBuffer(ackQueue, byteArrayManager);
-    }
-  }
-
-  /**
-   * wait for the ack of seqno
-   *
-   * @param seqno the sequence number to be acked
-   * @throws IOException
-   */
-  void waitForAckedSeqno(long seqno) throws IOException {
-    TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER);
-    try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Waiting for ack for: " + seqno);
-      }
-      long begin = Time.monotonicNow();
-      try {
-        synchronized (dataQueue) {
-          while (!streamerClosed) {
-            checkClosed();
-            if (lastAckedSeqno >= seqno) {
-              break;
-            }
-            try {
-              dataQueue.wait(1000); // when we receive an ack, we notify on
-              // dataQueue
-            } catch (InterruptedException ie) {
-              throw new InterruptedIOException(
-                  "Interrupted while waiting for data to be acknowledged by pipeline");
-            }
-          }
-        }
-        checkClosed();
-      } catch (ClosedChannelException e) {
-      }
-      long duration = Time.monotonicNow() - begin;
-      if (duration > dfsclientSlowLogThresholdMs) {
-        LOG.warn("Slow waitForAckedSeqno took " + duration
-            + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
-      }
-    } finally {
-      scope.close();
-    }
-  }
-
-  /**
-   * wait for space of dataQueue and queue the packet
-   *
-   * @param packet  the DFSPacket to be queued
-   * @throws IOException
-   */
-  void waitAndQueuePacket(DFSPacket packet) throws IOException {
-    synchronized (dataQueue) {
-      try {
-        // If queue is full, then wait till we have enough space
-        boolean firstWait = true;
-        try {
-          while (!streamerClosed && dataQueue.size() + ackQueue.size() >
-              dfsClient.getConf().getWriteMaxPackets()) {
-            if (firstWait) {
-              Span span = Trace.currentSpan();
-              if (span != null) {
-                span.addTimelineAnnotation("dataQueue.wait");
-              }
-              firstWait = false;
-            }
-            try {
-              dataQueue.wait();
-            } catch (InterruptedException e) {
-              // If we get interrupted while waiting to queue data, we still need to get rid
-              // of the current packet. This is because we have an invariant that if
-              // currentPacket gets full, it will get queued before the next writeChunk.
-              //
-              // Rather than wait around for space in the queue, we should instead try to
-              // return to the caller as soon as possible, even though we slightly overrun
-              // the MAX_PACKETS length.
-              Thread.currentThread().interrupt();
-              break;
-            }
-          }
-        } finally {
-          Span span = Trace.currentSpan();
-          if ((span != null) && (!firstWait)) {
-            span.addTimelineAnnotation("end.wait");
-          }
-        }
-        checkClosed();
-        queuePacket(packet);
-      } catch (ClosedChannelException e) {
-      }
-    }
-  }
-
-  /*
-   * close the streamer, should be called only by an external thread
-   * and only after all data to be sent has been flushed to datanode.
-   *
-   * Interrupt this data streamer if force is true
-   *
-   * @param force if this data stream is forced to be closed
-   */
-  void close(boolean force) {
-    streamerClosed = true;
-    synchronized (dataQueue) {
-      dataQueue.notifyAll();
-    }
-    if (force) {
-      this.interrupt();
-    }
-  }
-
-
-  private void checkClosed() throws IOException {
-    if (streamerClosed) {
-      lastException.throwException4Close();
-    }
-  }
-
-  private void closeResponder() {
-    if (response != null) {
-      try {
-        response.close();
-        response.join();
-      } catch (InterruptedException  e) {
-        LOG.warn("Caught exception", e);
-      } finally {
-        response = null;
-      }
-    }
-  }
-
-  private void closeStream() {
-    final MultipleIOException.Builder b = new MultipleIOException.Builder();
-
-    if (blockStream != null) {
-      try {
-        blockStream.close();
-      } catch (IOException e) {
-        b.add(e);
-      } finally {
-        blockStream = null;
-      }
-    }
-    if (blockReplyStream != null) {
-      try {
-        blockReplyStream.close();
-      } catch (IOException e) {
-        b.add(e);
-      } finally {
-        blockReplyStream = null;
-      }
-    }
-    if (null != s) {
-      try {
-        s.close();
-      } catch (IOException e) {
-        b.add(e);
-      } finally {
-        s = null;
-      }
-    }
-
-    final IOException ioe = b.build();
-    if (ioe != null) {
-      lastException.set(ioe);
-    }
-  }
-
-  /**
-   * Examine whether it is worth waiting for a node to restart.
-   * @param index the node index
-   */
-  boolean shouldWaitForRestart(int index) {
-    // Only one node in the pipeline.
-    if (nodes.length == 1) {
-      return true;
-    }
-
-    // Is it a local node?
-    InetAddress addr = null;
-    try {
-      addr = InetAddress.getByName(nodes[index].getIpAddr());
-    } catch (java.net.UnknownHostException e) {
-      // we are passing an ip address. this should not happen.
-      assert false;
-    }
-
-    if (addr != null && NetUtils.isLocalAddress(addr)) {
-      return true;
-    }
-    return false;
-  }
-
-  //
-  // Processes responses from the datanodes.  A packet is removed
-  // from the ackQueue when its response arrives.
-  //
-  private class ResponseProcessor extends Daemon {
-
-    private volatile boolean responderClosed = false;
-    private DatanodeInfo[] targets = null;
-    private boolean isLastPacketInBlock = false;
-
-    ResponseProcessor (DatanodeInfo[] targets) {
-      this.targets = targets;
-    }
-
-    @Override
-    public void run() {
-
-      setName("ResponseProcessor for block " + block);
-      PipelineAck ack = new PipelineAck();
-
-      TraceScope scope = NullScope.INSTANCE;
-      while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
-        // process responses from datanodes.
-        try {
-          // read an ack from the pipeline
-          long begin = Time.monotonicNow();
-          ack.readFields(blockReplyStream);
-          long duration = Time.monotonicNow() - begin;
-          if (duration > dfsclientSlowLogThresholdMs
-              && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
-            LOG.warn("Slow ReadProcessor read fields took " + duration
-                + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
-                + ack + ", targets: " + Arrays.asList(targets));
-          } else if (LOG.isDebugEnabled()) {
-            LOG.debug("DFSClient " + ack);
-          }
-
-          long seqno = ack.getSeqno();
-          // processes response status from datanodes.
-          ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<>();
-          for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
-            final Status reply = PipelineAck.getStatusFromHeader(ack
-                .getHeaderFlag(i));
-            if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) ==
-                PipelineAck.ECN.CONGESTED) {
-              congestedNodesFromAck.add(targets[i]);
-            }
-            // Restart will not be treated differently unless it is
-            // the local node or the only one in the pipeline.
-            if (PipelineAck.isRestartOOBStatus(reply) &&
-                shouldWaitForRestart(i)) {
-              final String message = "Datanode " + i + " is restarting: "
-                  + targets[i];
-              errorState.initRestartingNode(i, message);
-              throw new IOException(message);
-            }
-            // node error
-            if (reply != SUCCESS) {
-              errorState.setBadNodeIndex(i); // mark bad datanode
-              throw new IOException("Bad response " + reply +
-                  " for " + block + " from datanode " + targets[i]);
-            }
-          }
-
-          if (!congestedNodesFromAck.isEmpty()) {
-            synchronized (congestedNodes) {
-              congestedNodes.clear();
-              congestedNodes.addAll(congestedNodesFromAck);
-            }
-          } else {
-            synchronized (congestedNodes) {
-              congestedNodes.clear();
-              lastCongestionBackoffTime = 0;
-            }
-          }
-
-          assert seqno != PipelineAck.UNKOWN_SEQNO :
-              "Ack for unknown seqno should be a failed ack: " + ack;
-          if (seqno == DFSPacket.HEART_BEAT_SEQNO) {  // a heartbeat ack
-            continue;
-          }
-
-          // a success ack for a data packet
-          DFSPacket one;
-          synchronized (dataQueue) {
-            one = ackQueue.getFirst();
-          }
-          if (one.getSeqno() != seqno) {
-            throw new IOException("ResponseProcessor: Expecting seqno " +
-                " for block " + block +
-                one.getSeqno() + " but received " + seqno);
-          }
-          isLastPacketInBlock = one.isLastPacketInBlock();
-
-          // Fail the packet write for testing in order to force a
-          // pipeline recovery.
-          if (DFSClientFaultInjector.get().failPacket() &&
-              isLastPacketInBlock) {
-            failPacket = true;
-            throw new IOException(
-                "Failing the last packet for testing.");
-          }
-
-          // update bytesAcked
-          block.setNumBytes(one.getLastByteOffsetBlock());
-
-          synchronized (dataQueue) {
-            scope = Trace.continueSpan(one.getTraceSpan());
-            one.setTraceSpan(null);
-            lastAckedSeqno = seqno;
-            ackQueue.removeFirst();
-            dataQueue.notifyAll();
-
-            one.releaseBuffer(byteArrayManager);
-          }
-        } catch (Exception e) {
-          if (!responderClosed) {
-            lastException.set(e);
-            errorState.setError(true);
-            errorState.markFirstNodeIfNotMarked();
-            synchronized (dataQueue) {
-              dataQueue.notifyAll();
-            }
-            if (!errorState.isRestartingNode()) {
-              LOG.warn("Exception for " + block, e);
-            }
-            responderClosed = true;
-          }
-        } finally {
-            scope.close();
-        }
-      }
-    }
-
-    void close() {
-      responderClosed = true;
-      this.interrupt();
-    }
-  }
-
-  /**
-   * If this stream has encountered any errors, shutdown threads
-   * and mark the stream as closed.
-   *
-   * @return true if it should sleep for a while after returning.
-   */
-  private boolean processDatanodeError() throws IOException {
-    if (!errorState.hasDatanodeError()) {
-      return false;
-    }
-    if (response != null) {
-      LOG.info("Error Recovery for " + block +
-          " waiting for responder to exit. ");
-      return true;
-    }
-    closeStream();
-
-    // move packets from ack queue to front of the data queue
-    synchronized (dataQueue) {
-      dataQueue.addAll(0, ackQueue);
-      ackQueue.clear();
-    }
-
-    // Record the new pipeline failure recovery.
-    if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
-      lastAckedSeqnoBeforeFailure = lastAckedSeqno;
-      pipelineRecoveryCount = 1;
-    } else {
-      // If we had to recover the pipeline five times in a row for the
-      // same packet, this client likely has corrupt data or corrupting
-      // during transmission.
-      if (++pipelineRecoveryCount > 5) {
-        LOG.warn("Error recovering pipeline for writing " +
-            block + ". Already retried 5 times for the same packet.");
-        lastException.set(new IOException("Failing write. Tried pipeline " +
-            "recovery 5 times without success."));
-        streamerClosed = true;
-        return false;
-      }
-    }
-    boolean doSleep = setupPipelineForAppendOrRecovery();
-
-    if (!streamerClosed && dfsClient.clientRunning) {
-      if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
-
-        // If we had an error while closing the pipeline, we go through a fast-path
-        // where the BlockReceiver does not run. Instead, the DataNode just finalizes
-        // the block immediately during the 'connect ack' process. So, we want to pull
-        // the end-of-block packet from the dataQueue, since we don't actually have
-        // a true pipeline to send it over.
-        //
-        // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
-        // a client waiting on close() will be aware that the flush finished.
-        synchronized (dataQueue) {
-          DFSPacket endOfBlockPacket = dataQueue.remove();  // remove the end of block packet
-          Span span = endOfBlockPacket.getTraceSpan();
-          if (span != null) {
-            // Close any trace span associated with this Packet
-            TraceScope scope = Trace.continueSpan(span);
-            scope.close();
-          }
-          assert endOfBlockPacket.isLastPacketInBlock();
-          assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
-          lastAckedSeqno = endOfBlockPacket.getSeqno();
-          dataQueue.notifyAll();
-        }
-        endBlock();
-      } else {
-        initDataStreaming();
-      }
-    }
-
-    return doSleep;
-  }
-
-  void setHflush() {
-    isHflushed = true;
-  }
-
-  private int findNewDatanode(final DatanodeInfo[] original
-  ) throws IOException {
-    if (nodes.length != original.length + 1) {
-      throw new IOException(
-          new StringBuilder()
-              .append("Failed to replace a bad datanode on the existing pipeline ")
-              .append("due to no more good datanodes being available to try. ")
-              .append("(Nodes: current=").append(Arrays.asList(nodes))
-              .append(", original=").append(Arrays.asList(original)).append("). ")
-              .append("The current failed datanode replacement policy is ")
-              .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ")
-              .append("a client may configure this via '")
-              .append(BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY)
-              .append("' in its configuration.")
-              .toString());
-    }
-    for(int i = 0; i < nodes.length; i++) {
-      int j = 0;
-      for(; j < original.length && !nodes[i].equals(original[j]); j++);
-      if (j == original.length) {
-        return i;
-      }
-    }
-    throw new IOException("Failed: new datanode not found: nodes="
-        + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
-  }
-
-  private void addDatanode2ExistingPipeline() throws IOException {
-    if (DataTransferProtocol.LOG.isDebugEnabled()) {
-      DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
-    }
-      /*
-       * Is data transfer necessary?  We have the following cases.
-       *
-       * Case 1: Failure in Pipeline Setup
-       * - Append
-       *    + Transfer the stored replica, which may be a RBW or a finalized.
-       * - Create
-       *    + If no data, then no transfer is required.
-       *    + If there are data written, transfer RBW. This case may happens
-       *      when there are streaming failure earlier in this pipeline.
-       *
-       * Case 2: Failure in Streaming
-       * - Append/Create:
-       *    + transfer RBW
-       *
-       * Case 3: Failure in Close
-       * - Append/Create:
-       *    + no transfer, let NameNode replicates the block.
-       */
-    if (!isAppend && lastAckedSeqno < 0
-        && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
-      //no data have been written
-      return;
-    } else if (stage == BlockConstructionStage.PIPELINE_CLOSE
-        || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
-      //pipeline is closing
-      return;
-    }
-
-    //get a new datanode
-    final DatanodeInfo[] original = nodes;
-    final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
-        src, stat.getFileId(), block, nodes, storageIDs,
-        failed.toArray(new DatanodeInfo[failed.size()]),
-        1, dfsClient.clientName);
-    setPipeline(lb);
-
-    //find the new datanode
-    final int d = findNewDatanode(original);
-
-    //transfer replica
-    final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
-    final DatanodeInfo[] targets = {nodes[d]};
-    final StorageType[] targetStorageTypes = {storageTypes[d]};
-    transfer(src, targets, targetStorageTypes, lb.getBlockToken());
-  }
-
-  private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
-                        final StorageType[] targetStorageTypes,
-                        final Token<BlockTokenIdentifier> blockToken) throws IOException {
-    //transfer replica to the new datanode
-    Socket sock = null;
-    DataOutputStream out = null;
-    DataInputStream in = null;
-    try {
-      sock = createSocketForPipeline(src, 2, dfsClient);
-      final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
-      final long readTimeout = dfsClient.getDatanodeReadTimeout(2);
-
-      OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
-      InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
-      IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
-          unbufOut, unbufIn, dfsClient, blockToken, src);
-      unbufOut = saslStreams.out;
-      unbufIn = saslStreams.in;
-      out = new DataOutputStream(new BufferedOutputStream(unbufOut,
-          DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
-      in = new DataInputStream(unbufIn);
-
-      //send the TRANSFER_BLOCK request
-      new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
-          targets, targetStorageTypes);
-      out.flush();
-
-      //ack
-      BlockOpResponseProto response =
-          BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
-      if (SUCCESS != response.getStatus()) {
-        throw new IOException("Failed to add a datanode");
-      }
-    } finally {
-      IOUtils.closeStream(in);
-      IOUtils.closeStream(out);
-      IOUtils.closeSocket(sock);
-    }
-  }
-
-  /**
-   * Open a DataStreamer to a DataNode pipeline so that
-   * it can be written to.
-   * This happens when a file is appended or data streaming fails
-   * It keeps on trying until a pipeline is setup
-   */
-  private boolean setupPipelineForAppendOrRecovery() throws IOException {
-    // check number of datanodes
-    if (nodes == null || nodes.length == 0) {
-      String msg = "Could not get block locations. " + "Source file \""
-          + src + "\" - Aborting...";
-      LOG.warn(msg);
-      lastException.set(new IOException(msg));
-      streamerClosed = true;
-      return false;
-    }
-
-    boolean success = false;
-    long newGS = 0L;
-    while (!success && !streamerClosed && dfsClient.clientRunning) {
-      if (!handleRestartingDatanode()) {
-        return false;
-      }
-
-      final boolean isRecovery = errorState.hasError();
-      if (!handleBadDatanode()) {
-        return false;
-      }
-
-      handleDatanodeReplacement();
-
-      // get a new generation stamp and an access token
-      final LocatedBlock lb = updateBlockForPipeline();
-      newGS = lb.getBlock().getGenerationStamp();
-      accessToken = lb.getBlockToken();
-
-      // set up the pipeline again with the remaining nodes
-      success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
-
-      failPacket4Testing();
-
-      errorState.checkRestartingNodeDeadline(nodes);
-    } // while
-
-    if (success) {
-      block = updatePipeline(newGS);
-    }
-    return false; // do not sleep, continue processing
-  }
-
-  /**
-   * Sleep if a node is restarting.
-   * This process is repeated until the deadline or the node starts back up.
-   * @return true if it should continue.
-   */
-  private boolean handleRestartingDatanode() {
-    if (errorState.isRestartingNode()) {
-      // 4 seconds or the configured deadline period, whichever is shorter.
-      // This is the retry interval and recovery will be retried in this
-      // interval until timeout or success.
-      final long delay = Math.min(errorState.datanodeRestartTimeout, 4000L);
-      try {
-        Thread.sleep(delay);
-      } catch (InterruptedException ie) {
-        lastException.set(new IOException(
-            "Interrupted while waiting for restarting "
-            + nodes[errorState.getRestartingNodeIndex()]));
-        streamerClosed = true;
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Remove bad node from list of nodes if badNodeIndex was set.
-   * @return true if it should continue.
-   */
-  private boolean handleBadDatanode() {
-    final int badNodeIndex = errorState.getBadNodeIndex();
-    if (badNodeIndex >= 0) {
-      if (nodes.length <= 1) {
-        lastException.set(new IOException("All datanodes "
-            + Arrays.toString(nodes) + " are bad. Aborting..."));
-        streamerClosed = true;
-        return false;
-      }
-
-      LOG.warn("Error Recovery for " + block + " in pipeline "
-          + Arrays.toString(nodes) + ": datanode " + badNodeIndex
-          + "("+ nodes[badNodeIndex] + ") is bad.");
-      failed.add(nodes[badNodeIndex]);
-
-      DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
-      arraycopy(nodes, newnodes, badNodeIndex);
-
-      final StorageType[] newStorageTypes = new StorageType[newnodes.length];
-      arraycopy(storageTypes, newStorageTypes, badNodeIndex);
-
-      final String[] newStorageIDs = new String[newnodes.length];
-      arraycopy(storageIDs, newStorageIDs, badNodeIndex);
-
-      setPipeline(newnodes, newStorageTypes, newStorageIDs);
-
-      errorState.adjustState4RestartingNode();
-      lastException.clear();
-    }
-    return true;
-  }
-
-  /** Add a datanode if replace-datanode policy is satisfied. */
-  private void handleDatanodeReplacement() throws IOException {
-    if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(stat.getReplication(),
-        nodes, isAppend, isHflushed)) {
-      try {
-        addDatanode2ExistingPipeline();
-      } catch(IOException ioe) {
-        if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
-          throw ioe;
-        }
-        LOG.warn("Failed to replace datanode."
-            + " Continue with the remaining datanodes since "
-            + BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY
-            + " is set to true.", ioe);
-      }
-    }
-  }
-
-  private void failPacket4Testing() {
-    if (failPacket) { // for testing
-      failPacket = false;
-      try {
-        // Give DNs time to send in bad reports. In real situations,
-        // good reports should follow bad ones, if client committed
-        // with those nodes.
-        Thread.sleep(2000);
-      } catch (InterruptedException ie) {}
-    }
-  }
-
-  LocatedBlock updateBlockForPipeline() throws IOException {
-    return dfsClient.namenode.updateBlockForPipeline(
-        block, dfsClient.clientName);
-  }
-
-  /** update pipeline at the namenode */
-  ExtendedBlock updatePipeline(long newGS) throws IOException {
-    final ExtendedBlock newBlock = new ExtendedBlock(
-        block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
-    dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
-        nodes, storageIDs);
-    return newBlock;
-  }
-
-  /**
-   * Open a DataStreamer to a DataNode so that it can be written to.
-   * This happens when a file is created and each time a new block is allocated.
-   * Must get block ID and the IDs of the destinations from the namenode.
-   * Returns the list of target datanodes.
-   */
-  private LocatedBlock nextBlockOutputStream() throws IOException {
-    LocatedBlock lb = null;
-    DatanodeInfo[] nodes = null;
-    StorageType[] storageTypes = null;
-    int count = dfsClient.getConf().getNumBlockWriteRetry();
-    boolean success = false;
-    ExtendedBlock oldBlock = block;
-    do {
-      errorState.reset();
-      lastException.clear();
-      success = false;
-
-      DatanodeInfo[] excluded =
-          excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
-              .keySet()
-              .toArray(new DatanodeInfo[0]);
-      block = oldBlock;
-      lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
-      block = lb.getBlock();
-      block.setNumBytes(0);
-      bytesSent = 0;
-      accessToken = lb.getBlockToken();
-      nodes = lb.getLocations();
-      storageTypes = lb.getStorageTypes();
-
-      //
-      // Connect to first DataNode in the list.
-      //
-      success = createBlockOutputStream(nodes, storageTypes, 0L, false);
-
-      if (!success) {
-        LOG.info("Abandoning " + block);
-        dfsClient.namenode.abandonBlock(block, stat.getFileId(), src,
-            dfsClient.clientName);
-        block = null;
-        final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
-        LOG.info("Excluding datanode " + badNode);
-        excludedNodes.put(badNode, badNode);
-      }
-    } while (!success && --count >= 0);
-
-    if (!success) {
-      throw new IOException("Unable to create new block.");
-    }
-    return lb;
-  }
-
-  // connects to the first datanode in the pipeline
-  // Returns true if success, otherwise return failure.
-  //
-  private boolean createBlockOutputStream(DatanodeInfo[] nodes,
-      StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
-    if (nodes.length == 0) {
-      LOG.info("nodes are empty for write pipeline of " + block);
-      return false;
-    }
-    Status pipelineStatus = SUCCESS;
-    String firstBadLink = "";
-    boolean checkRestart = false;
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("pipeline = " + Arrays.asList(nodes));
-    }
-
-    // persist blocks on namenode on next flush
-    persistBlocks.set(true);
-
-    int refetchEncryptionKey = 1;
-    while (true) {
-      boolean result = false;
-      DataOutputStream out = null;
-      try {
-        assert null == s : "Previous socket unclosed";
-        assert null == blockReplyStream : "Previous blockReplyStream unclosed";
-        s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
-        long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
-        long readTimeout = dfsClient.getDatanodeReadTimeout(nodes.length);
-
-        OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
-        InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
-        IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
-            unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
-        unbufOut = saslStreams.out;
-        unbufIn = saslStreams.in;
-        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
-            DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
-        blockReplyStream = new DataInputStream(unbufIn);
-
-        //
-        // Xmit header info to datanode
-        //
-
-        BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage;
-
-        // We cannot change the block length in 'block' as it counts the number
-        // of bytes ack'ed.
-        ExtendedBlock blockCopy = new ExtendedBlock(block);
-        blockCopy.setNumBytes(stat.getBlockSize());
-
-        boolean[] targetPinnings = getPinnings(nodes, true);
-        // send the request
-        new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
-            dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
-            nodes.length, block.getNumBytes(), bytesSent, newGS,
-            checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
-            (targetPinnings == null ? false : targetPinnings[0]), targetPinnings);
-
-        // receive ack for connect
-        BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
-            PBHelperClient.vintPrefixed(blockReplyStream));
-        pipelineStatus = resp.getStatus();
-        firstBadLink = resp.getFirstBadLink();
-
-        // Got an restart OOB ack.
-        // If a node is already restarting, this status is not likely from
-        // the same node. If it is from a different node, it is not
-        // from the local datanode. Thus it is safe to treat this as a
-        // regular node error.
-        if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
-            !errorState.isRestartingNode()) {
-          checkRestart = true;
-          throw new IOException("A datanode is restarting.");
-        }
-		
-        String logInfo = "ack with firstBadLink as " + firstBadLink;
-        DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);
-
-        assert null == blockStream : "Previous blockStream unclosed";
-        blockStream = out;
-        result =  true; // success
-        errorState.reset();
-      } catch (IOException ie) {
-        if (!errorState.isRestartingNode()) {
-          LOG.info("Exception in createBlockOutputStream", ie);
-        }
-        if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
-          LOG.info("Will fetch a new encryption key and retry, "
-              + "encryption key was invalid when connecting to "
-              + nodes[0] + " : " + ie);
-          // The encryption key used is invalid.
-          refetchEncryptionKey--;
-          dfsClient.clearDataEncryptionKey();
-          // Don't close the socket/exclude this node just yet. Try again with
-          // a new encryption key.
-          continue;
-        }
-
-        // find the datanode that matches
-        if (firstBadLink.length() != 0) {
-          for (int i = 0; i < nodes.length; i++) {
-            // NB: Unconditionally using the xfer addr w/o hostname
-            if (firstBadLink.equals(nodes[i].getXferAddr())) {
-              errorState.setBadNodeIndex(i);
-              break;
-            }
-          }
-        } else {
-          assert checkRestart == false;
-          errorState.setBadNodeIndex(0);
-        }
-
-        final int i = errorState.getBadNodeIndex();
-        // Check whether there is a restart worth waiting for.
-        if (checkRestart && shouldWaitForRestart(i)) {
-          errorState.initRestartingNode(i, "Datanode " + i + " is restarting: " + nodes[i]);
-        }
-        errorState.setError(true);
-        lastException.set(ie);
-        result =  false;  // error
-      } finally {
-        if (!result) {
-          IOUtils.closeSocket(s);
-          s = null;
-          IOUtils.closeStream(out);
-          out = null;
-          IOUtils.closeStream(blockReplyStream);
-          blockReplyStream = null;
-        }
-      }
-      return result;
-    }
-  }
-
-  private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) {
-    if (favoredNodes == null) {
-      return null;
-    } else {
-      boolean[] pinnings = new boolean[nodes.length];
-      HashSet<String> favoredSet =
-          new HashSet<String>(Arrays.asList(favoredNodes));
-      for (int i = 0; i < nodes.length; i++) {
-        pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(nodes[i].getXferAddrWithHostname() +
-              " was chosen by name node (favored=" + pinnings[i] + ").");
-        }
-      }
-      if (shouldLog && !favoredSet.isEmpty()) {
-        // There is one or more favored nodes that were not allocated.
-        LOG.warn("These favored nodes were specified but not chosen: "
-            + favoredSet + " Specified favored nodes: "
-            + Arrays.toString(favoredNodes));
-
-      }
-      return pinnings;
-    }
-  }
-
-  protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
-      throws IOException {
-    final DfsClientConf conf = dfsClient.getConf(); 
-    int retries = conf.getNumBlockWriteLocateFollowingRetry();
-    long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
-    while (true) {
-      long localstart = Time.monotonicNow();
-      while (true) {
-        try {
-          return dfsClient.namenode.addBlock(src, dfsClient.clientName,
-              block, excludedNodes, stat.getFileId(), favoredNodes);
-        } catch (RemoteException e) {
-          IOException ue =
-              e.unwrapRemoteException(FileNotFoundException.class,
-                  AccessControlException.class,
-                  NSQuotaExceededException.class,
-                  DSQuotaExceededException.class,
-                  QuotaByStorageTypeExceededException.class,
-                  UnresolvedPathException.class);
-          if (ue != e) {
-            throw ue; // no need to retry these exceptions
-          }
-
-
-          if (NotReplicatedYetException.class.getName().
-              equals(e.getClassName())) {
-            if (retries == 0) {
-              throw e;
-            } else {
-              --retries;
-              LOG.info("Exception while adding a block", e);
-              long elapsed = Time.monotonicNow() - localstart;
-              if (elapsed > 5000) {
-                LOG.info("Waiting for replication for "
-                    + (elapsed / 1000) + " seconds");
-              }
-              try {
-                LOG.warn("NotReplicatedYetException sleeping " + src
-                    + " retries left " + retries);
-                Thread.sleep(sleeptime);
-                sleeptime *= 2;
-              } catch (InterruptedException ie) {
-                LOG.warn("Caught exception", ie);
-              }
-            }
-          } else {
-            throw e;
-          }
-
-        }
-      }
-    }
-  }
-
-  /**
-   * This function sleeps for a certain amount of time when the writing
-   * pipeline is congested. The function calculates the time based on a
-   * decorrelated filter.
-   *
-   * @see
-   * <a href="http://www.awsarchitectureblog.com/2015/03/backoff.html">
-   *   http://www.awsarchitectureblog.com/2015/03/backoff.html</a>
-   */
-  private void backOffIfNecessary() throws InterruptedException {
-    int t = 0;
-    synchronized (congestedNodes) {
-      if (!congestedNodes.isEmpty()) {
-        StringBuilder sb = new StringBuilder("DataNode");
-        for (DatanodeInfo i : congestedNodes) {
-          sb.append(' ').append(i);
-        }
-        int range = Math.abs(lastCongestionBackoffTime * 3 -
-                                CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
-        int base = Math.min(lastCongestionBackoffTime * 3,
-                            CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
-        t = Math.min(CONGESTION_BACK_OFF_MAX_TIME_IN_MS,
-                     (int)(base + Math.random() * range));
-        lastCongestionBackoffTime = t;
-        sb.append(" are congested. Backing off for ").append(t).append(" ms");
-        LOG.info(sb.toString());
-        congestedNodes.clear();
-      }
-    }
-    if (t != 0) {
-      Thread.sleep(t);
-    }
-  }
-
-  /**
-   * get the block this streamer is writing to
-   *
-   * @return the block this streamer is writing to
-   */
-  ExtendedBlock getBlock() {
-    return block;
-  }
-
-  /**
-   * return the target datanodes in the pipeline
-   *
-   * @return the target datanodes in the pipeline
-   */
-  DatanodeInfo[] getNodes() {
-    return nodes;
-  }
-
-  /**
-   * return the token of the block
-   *
-   * @return the token of the block
-   */
-  Token<BlockTokenIdentifier> getBlockToken() {
-    return accessToken;
-  }
-
-  /**
-   * Put a packet to the data queue
-   *
-   * @param packet the packet to be put into the data queued
-   */
-  void queuePacket(DFSPacket packet) {
-    synchronized (dataQueue) {
-      if (packet == null) return;
-      packet.addTraceParent(Trace.currentSpan());
-      dataQueue.addLast(packet);
-      lastQueuedSeqno = packet.getSeqno();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Queued packet " + packet.getSeqno());
-      }
-      dataQueue.notifyAll();
-    }
-  }
-
-  /**
-   * For heartbeat packets, create buffer directly by new byte[]
-   * since heartbeats should not be blocked.
-   */
-  private DFSPacket createHeartbeatPacket() throws InterruptedIOException {
-    final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
-    return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, 0, false);
-  }
-
-  private static LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes(
-      long excludedNodesCacheExpiry) {
-    return CacheBuilder.newBuilder()
-        .expireAfterWrite(excludedNodesCacheExpiry, TimeUnit.MILLISECONDS)
-        .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
-          @Override
-          public void onRemoval(
-              RemovalNotification<DatanodeInfo, DatanodeInfo> notification) {
-            LOG.info("Removing node " + notification.getKey()
-                + " from the excluded nodes list");
-          }
-        }).build(new CacheLoader<DatanodeInfo, DatanodeInfo>() {
-          @Override
-          public DatanodeInfo load(DatanodeInfo key) throws Exception {
-            return key;
-          }
-        });
-  }
-
-  private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
-    System.arraycopy(srcs, 0, dsts, 0, skipIndex);
-    System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);
-  }
-
-  /**
-   * check if to persist blocks on namenode
-   *
-   * @return if to persist blocks on namenode
-   */
-  AtomicBoolean getPersistBlocks(){
-    return persistBlocks;
-  }
-
-  /**
-   * check if to append a chunk
-   *
-   * @param appendChunk if to append a chunk
-   */
-  void setAppendChunk(boolean appendChunk){
-    this.appendChunk = appendChunk;
-  }
-
-  /**
-   * get if to append a chunk
-   *
-   * @return if to append a chunk
-   */
-  boolean getAppendChunk(){
-    return appendChunk;
-  }
-
-  /**
-   * @return the last exception
-   */
-  LastExceptionInStreamer getLastException(){
-    return lastException;
-  }
-
-  /**
-   * set socket to null
-   */
-  void setSocketToNull() {
-    this.s = null;
-  }
-
-  /**
-   * return current sequence number and then increase it by 1
-   *
-   * @return current sequence number before increasing
-   */
-  long getAndIncCurrentSeqno() {
-    long old = this.currentSeqno;
-    this.currentSeqno++;
-    return old;
-  }
-
-  /**
-   * get last queued sequence number
-   *
-   * @return last queued sequence number
-   */
-  long getLastQueuedSeqno() {
-    return lastQueuedSeqno;
-  }
-
-  /**
-   * get the number of bytes of current block
-   *
-   * @return the number of bytes of current block
-   */
-  long getBytesCurBlock() {
-    return bytesCurBlock;
-  }
-
-  /**
-   * set the bytes of current block that have been written
-   *
-   * @param bytesCurBlock bytes of current block that have been written
-   */
-  void setBytesCurBlock(long bytesCurBlock) {
-    this.bytesCurBlock = bytesCurBlock;
-  }
-
-  /**
-   * increase bytes of current block by len.
-   *
-   * @param len how many bytes to increase to current block
-   */
-  void incBytesCurBlock(long len) {
-    this.bytesCurBlock += len;
-  }
-
-  /**
-   * set artificial slow down for unit test
-   *
-   * @param period artificial slow down
-   */
-  void setArtificialSlowdown(long period) {
-    this.artificialSlowdown = period;
-  }
-
-  /**
-   * if this streamer is to terminate
-   *
-   * @return if this streamer is to terminate
-   */
-  boolean streamerClosed(){
-    return streamerClosed;
-  }
-
-  void closeSocket() throws IOException {
-    if (s != null) {
-      s.close();
-    }
-  }
-
-  @Override
-  public String toString() {
-    return  (block == null? null: block.getLocalBlock())
-        + "@" + Arrays.toString(getNodes());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
index ef9f27a..b6bf6cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
@@ -31,9 +31,7 @@ public class HdfsConfiguration extends Configuration {
     addDeprecatedKeys();
 
     // adds the default resources
-    Configuration.addDefaultResource("hdfs-default.xml");
-    Configuration.addDefaultResource("hdfs-site.xml");
-
+    HdfsConfigurationLoader.init();
   }
 
   public HdfsConfiguration() {
@@ -52,9 +50,10 @@ public class HdfsConfiguration extends Configuration {
    * This method is here so that when invoked, HdfsConfiguration is class-loaded if
    * it hasn't already been previously loaded.  Upon loading the class, the static 
    * initializer block above will be executed to add the deprecated keys and to add
-   * the default resources.   It is safe for this method to be called multiple times 
-   * as the static initializer block will only get invoked once.
-   * 
+   * the default resources via {@link HdfsConfigurationLoader#init()}. It is
+   * safe for this method to be called multiple times as the static initializer
+   * block will only get invoked once.
+   *
    * This replaces the previously, dangerous practice of other classes calling
    * Configuration.addDefaultResource("hdfs-default.xml") directly without loading 
    * HdfsConfiguration class first, thereby skipping the key deprecation

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
deleted file mode 100644
index f03e179..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.security.token.Token;
-
-@InterfaceAudience.Private
-public interface RemotePeerFactory {
-  /**
-   * @param addr          The address to connect to.
-   * @param blockToken    Token used during optional SASL negotiation
-   * @param datanodeId    ID of destination DataNode
-   * @return              A new Peer connected to the address.
-   *
-   * @throws IOException  If there was an error connecting or creating 
-   *                      the remote socket, encrypted stream, etc.
-   */
-  Peer newConnectedPeer(InetSocketAddress addr,
-      Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
-      throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java
deleted file mode 100644
index ec17bb8..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Thrown when an unknown cipher suite is encountered.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Stable
-public class UnknownCipherSuiteException extends IOException {
-  public UnknownCipherSuiteException(String msg) {
-    super(msg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java
deleted file mode 100644
index 0aac8c8..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class UnknownCryptoProtocolVersionException extends IOException {
-  private static final long serialVersionUID = 8957192l;
-
-  public UnknownCryptoProtocolVersionException() {
-    super();
-  }
-
-  public UnknownCryptoProtocolVersionException(String msg) {
-    super(msg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java
deleted file mode 100644
index 2655c40..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.XAttr;
-import org.apache.hadoop.fs.XAttr.NameSpace;
-import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-@InterfaceAudience.Private
-public class XAttrHelper {
-  
-  /**
-   * Build <code>XAttr</code> from xattr name with prefix.
-   */
-  public static XAttr buildXAttr(String name) {
-    return buildXAttr(name, null);
-  }
-  
-  /**
-   * Build <code>XAttr</code> from name with prefix and value.
-   * Name can not be null. Value can be null. The name and prefix 
-   * are validated.
-   * Both name and namespace are case sensitive.
-   */
-  public static XAttr buildXAttr(String name, byte[] value) {
-    Preconditions.checkNotNull(name, "XAttr name cannot be null.");
-    
-    final int prefixIndex = name.indexOf(".");
-    if (prefixIndex < 3) {// Prefix length is at least 3.
-      throw new HadoopIllegalArgumentException("An XAttr name must be " +
-          "prefixed with user/trusted/security/system/raw, followed by a '.'");
-    } else if (prefixIndex == name.length() - 1) {
-      throw new HadoopIllegalArgumentException("XAttr name cannot be empty.");
-    }
-    
-    NameSpace ns;
-    final String prefix = name.substring(0, prefixIndex);
-    if (StringUtils.equalsIgnoreCase(prefix, NameSpace.USER.toString())) {
-      ns = NameSpace.USER;
-    } else if (
-        StringUtils.equalsIgnoreCase(prefix, NameSpace.TRUSTED.toString())) {
-      ns = NameSpace.TRUSTED;
-    } else if (
-        StringUtils.equalsIgnoreCase(prefix, NameSpace.SYSTEM.toString())) {
-      ns = NameSpace.SYSTEM;
-    } else if (
-        StringUtils.equalsIgnoreCase(prefix, NameSpace.SECURITY.toString())) {
-      ns = NameSpace.SECURITY;
-    } else if (
-        StringUtils.equalsIgnoreCase(prefix, NameSpace.RAW.toString())) {
-      ns = NameSpace.RAW;
-    } else {
-      throw new HadoopIllegalArgumentException("An XAttr name must be " +
-          "prefixed with user/trusted/security/system/raw, followed by a '.'");
-    }
-    XAttr xAttr = (new XAttr.Builder()).setNameSpace(ns).setName(name.
-        substring(prefixIndex + 1)).setValue(value).build();
-    
-    return xAttr;
-  }
-  
-  /**
-   * Build xattr name with prefix as <code>XAttr</code> list.
-   */
-  public static List<XAttr> buildXAttrAsList(String name) {
-    XAttr xAttr = buildXAttr(name);
-    List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
-    xAttrs.add(xAttr);
-    
-    return xAttrs;
-  }
-  
-  /**
-   * Get value of first xattr from <code>XAttr</code> list
-   */
-  public static byte[] getFirstXAttrValue(List<XAttr> xAttrs) {
-    byte[] value = null;
-    XAttr xAttr = getFirstXAttr(xAttrs);
-    if (xAttr != null) {
-      value = xAttr.getValue();
-      if (value == null) {
-        value = new byte[0]; // xattr exists, but no value.
-      }
-    }
-    return value;
-  }
-  
-  /**
-   * Get first xattr from <code>XAttr</code> list
-   */
-  public static XAttr getFirstXAttr(List<XAttr> xAttrs) {
-    if (xAttrs != null && !xAttrs.isEmpty()) {
-      return xAttrs.get(0);
-    }
-    
-    return null;
-  }
-  
-  /**
-   * Build xattr map from <code>XAttr</code> list, the key is 
-   * xattr name with prefix, and value is xattr value. 
-   */
-  public static Map<String, byte[]> buildXAttrMap(List<XAttr> xAttrs) {
-    if (xAttrs == null) {
-      return null;
-    }
-    Map<String, byte[]> xAttrMap = Maps.newHashMap();
-    for (XAttr xAttr : xAttrs) {
-      String name = getPrefixedName(xAttr);
-      byte[] value = xAttr.getValue();
-      if (value == null) {
-        value = new byte[0];
-      }
-      xAttrMap.put(name, value);
-    }
-    
-    return xAttrMap;
-  }
-  
-  /**
-   * Get name with prefix from <code>XAttr</code>
-   */
-  public static String getPrefixedName(XAttr xAttr) {
-    if (xAttr == null) {
-      return null;
-    }
-
-    return getPrefixedName(xAttr.getNameSpace(), xAttr.getName());
-  }
-
-  public static String getPrefixedName(XAttr.NameSpace ns, String name) {
-    return StringUtils.toLowerCase(ns.toString()) + "." + name;
-  }
-
-  /**
-   * Build <code>XAttr</code> list from xattr name list.
-   */
-  public static List<XAttr> buildXAttrs(List<String> names) {
-    if (names == null || names.isEmpty()) {
-      throw new HadoopIllegalArgumentException("XAttr names can not be " +
-          "null or empty.");
-    }
-    
-    List<XAttr> xAttrs = Lists.newArrayListWithCapacity(names.size());
-    for (String name : names) {
-      xAttrs.add(buildXAttr(name, null));
-    }
-    return xAttrs;
-  } 
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java
deleted file mode 100644
index e8ac686..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.client;
-
-import java.io.InputStream;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.crypto.CryptoInputStream;
-import org.apache.hadoop.hdfs.DFSInputStream;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-
-import com.google.common.base.Preconditions;
-
-/**
- * The Hdfs implementation of {@link FSDataInputStream}.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class HdfsDataInputStream extends FSDataInputStream {
-  public HdfsDataInputStream(DFSInputStream in) throws IOException {
-    super(in);
-  }
-
-  public HdfsDataInputStream(CryptoInputStream in) throws IOException {
-    super(in);
-    Preconditions.checkArgument(in.getWrappedStream() instanceof DFSInputStream,
-        "CryptoInputStream should wrap a DFSInputStream");
-  }
-
-  private DFSInputStream getDFSInputStream() {
-    if (in instanceof CryptoInputStream) {
-      return (DFSInputStream) ((CryptoInputStream) in).getWrappedStream();
-    }
-    return (DFSInputStream) in;
-  }
-
-  /**
-   * Get a reference to the wrapped output stream. We always want to return the
-   * actual underlying InputStream, even when we're using a CryptoStream. e.g.
-   * in the delegated methods below.
-   *
-   * @return the underlying output stream
-   */
-  public InputStream getWrappedStream() {
-      return in;
-  }
-
-  /**
-   * Get the datanode from which the stream is currently reading.
-   */
-  public DatanodeInfo getCurrentDatanode() {
-    return getDFSInputStream().getCurrentDatanode();
-  }
-
-  /**
-   * Get the block containing the target position.
-   */
-  public ExtendedBlock getCurrentBlock() {
-    return getDFSInputStream().getCurrentBlock();
-  }
-
-  /**
-   * Get the collection of blocks that has already been located.
-   */
-  public List<LocatedBlock> getAllBlocks() throws IOException {
-    return getDFSInputStream().getAllBlocks();
-  }
-
-  /**
-   * Get the visible length of the file. It will include the length of the last
-   * block even if that is in UnderConstruction state.
-   * 
-   * @return The visible length of the file.
-   */
-  public long getVisibleLength() throws IOException {
-    return getDFSInputStream().getFileLength();
-  }
-
-  /**
-   * Get statistics about the reads which this DFSInputStream has done.
-   * Note that because HdfsDataInputStream is buffered, these stats may
-   * be higher than you would expect just by adding up the number of
-   * bytes read through HdfsDataInputStream.
-   */
-  public DFSInputStream.ReadStatistics getReadStatistics() {
-    return getDFSInputStream().getReadStatistics();
-  }
-
-  public void clearReadStatistics() {
-    getDFSInputStream().clearReadStatistics();
-  }
-}