You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/03/24 19:07:36 UTC

[1/2] hadoop git commit: HDFS-7854. Separate class DataStreamer out of DFSOutputStream. Contributed by Li Bo.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 570a83ae8 -> a16bfff71


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16bfff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
new file mode 100644
index 0000000..6047825
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -0,0 +1,1754 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Time;
+import org.apache.htrace.NullScope;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceInfo;
+import org.apache.htrace.TraceScope;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+/*********************************************************************
+ *
+ * The DataStreamer class is responsible for sending data packets to the
+ * datanodes in the pipeline. It retrieves a new blockid and block locations
+ * from the namenode, and starts streaming packets to the pipeline of
+ * Datanodes. Every packet has a sequence number associated with
+ * it. When all the packets for a block are sent out and acks for each
+ * if them are received, the DataStreamer closes the current block.
+ *
+ * The DataStreamer thread picks up packets from the dataQueue, sends it to
+ * the first datanode in the pipeline and moves it from the dataQueue to the
+ * ackQueue. The ResponseProcessor receives acks from the datanodes. When an
+ * successful ack for a packet is received from all datanodes, the
+ * ResponseProcessor removes the corresponding packet from the ackQueue.
+ *
+ * In case of error, all outstanding packets are moved from ackQueue. A new
+ * pipeline is setup by eliminating the bad datanode from the original
+ * pipeline. The DataStreamer now starts sending packets from the dataQueue.
+ *
+ *********************************************************************/
+
+class DataStreamer extends Daemon {
+  /**
+   * Create a socket for a write pipeline
+   *
+   * @param first the first datanode
+   * @param length the pipeline length
+   * @param client client
+   * @return the socket connected to the first datanode
+   */
+  static Socket createSocketForPipeline(final DatanodeInfo first,
+      final int length, final DFSClient client) throws IOException {
+    final String dnAddr = first.getXferAddr(
+        client.getConf().connectToDnViaHostname);
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
+    }
+    final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
+    final Socket sock = client.socketFactory.createSocket();
+    final int timeout = client.getDatanodeReadTimeout(length);
+    NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), client.getConf().socketTimeout);
+    sock.setSoTimeout(timeout);
+    sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
+    if(DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("Send buf size " + sock.getSendBufferSize());
+    }
+    return sock;
+  }
+
+  /**
+   * if this file is lazy persist
+   *
+   * @param stat the HdfsFileStatus of a file
+   * @return if this file is lazy persist
+   */
+  static boolean isLazyPersist(HdfsFileStatus stat) {
+    final BlockStoragePolicy p = blockStoragePolicySuite.getPolicy(
+        HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
+    return p != null && stat.getStoragePolicy() == p.getId();
+  }
+
+  /**
+   * release a list of packets to ByteArrayManager
+   *
+   * @param packets packets to be release
+   * @param bam ByteArrayManager
+   */
+  private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam) {
+    for(DFSPacket p : packets) {
+      p.releaseBuffer(bam);
+    }
+    packets.clear();
+  }
+
+  private volatile boolean streamerClosed = false;
+  private ExtendedBlock block; // its length is number of bytes acked
+  private Token<BlockTokenIdentifier> accessToken;
+  private DataOutputStream blockStream;
+  private DataInputStream blockReplyStream;
+  private ResponseProcessor response = null;
+  private volatile DatanodeInfo[] nodes = null; // list of targets for current block
+  private volatile StorageType[] storageTypes = null;
+  private volatile String[] storageIDs = null;
+  private String[] favoredNodes;
+  volatile boolean hasError = false;
+  volatile int errorIndex = -1;
+  // Restarting node index
+  AtomicInteger restartingNodeIndex = new AtomicInteger(-1);
+  private long restartDeadline = 0; // Deadline of DN restart
+  private BlockConstructionStage stage;  // block construction stage
+  private long bytesSent = 0; // number of bytes that've been sent
+  private final boolean isLazyPersistFile;
+
+  /** Nodes have been used in the pipeline before and have failed. */
+  private final List<DatanodeInfo> failed = new ArrayList<>();
+  /** The last ack sequence number before pipeline failure. */
+  private long lastAckedSeqnoBeforeFailure = -1;
+  private int pipelineRecoveryCount = 0;
+  /** Has the current block been hflushed? */
+  private boolean isHflushed = false;
+  /** Append on an existing block? */
+  private boolean isAppend;
+
+  private long currentSeqno = 0;
+  private long lastQueuedSeqno = -1;
+  private long lastAckedSeqno = -1;
+  private long bytesCurBlock = 0; // bytes written in current block
+  private final AtomicReference<IOException> lastException = new AtomicReference<>();
+  private Socket s;
+
+  private final DFSClient dfsClient;
+  private final String src;
+  /** Only for DataTransferProtocol.writeBlock(..) */
+  private final DataChecksum checksum4WriteBlock;
+  private final Progressable progress;
+  private final HdfsFileStatus stat;
+  // appending to existing partial block
+  private volatile boolean appendChunk = false;
+  // both dataQueue and ackQueue are protected by dataQueue lock
+  private final LinkedList<DFSPacket> dataQueue = new LinkedList<>();
+  private final LinkedList<DFSPacket> ackQueue = new LinkedList<>();
+  private final AtomicReference<CachingStrategy> cachingStrategy;
+  private final ByteArrayManager byteArrayManager;
+  private static final BlockStoragePolicySuite blockStoragePolicySuite =
+      BlockStoragePolicySuite.createDefaultSuite();
+  //persist blocks on namenode
+  private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
+  private boolean failPacket = false;
+  private final long dfsclientSlowLogThresholdMs;
+  private long artificialSlowdown = 0;
+
+  private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
+
+  private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src,
+                       Progressable progress, DataChecksum checksum,
+                       AtomicReference<CachingStrategy> cachingStrategy,
+                       ByteArrayManager byteArrayManage){
+    this.dfsClient = dfsClient;
+    this.src = src;
+    this.progress = progress;
+    this.stat = stat;
+    this.checksum4WriteBlock = checksum;
+    this.cachingStrategy = cachingStrategy;
+    this.byteArrayManager = byteArrayManage;
+    isLazyPersistFile = isLazyPersist(stat);
+    this.dfsclientSlowLogThresholdMs =
+        dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
+    excludedNodes = initExcludedNodes();
+  }
+
+  /**
+   * construction with tracing info
+   */
+  DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient,
+               String src, Progressable progress, DataChecksum checksum,
+               AtomicReference<CachingStrategy> cachingStrategy,
+               ByteArrayManager byteArrayManage) {
+    this(stat, dfsClient, src, progress, checksum, cachingStrategy,
+        byteArrayManage);
+    isAppend = false;
+    this.block = block;
+    stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+  }
+
+  /**
+   * Construct a data streamer for appending to the last partial block
+   * @param lastBlock last block of the file to be appended
+   * @param stat status of the file to be appended
+   * @throws IOException if error occurs
+   */
+  DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, DFSClient dfsClient,
+               String src, Progressable progress, DataChecksum checksum,
+               AtomicReference<CachingStrategy> cachingStrategy,
+               ByteArrayManager byteArrayManage) throws IOException {
+    this(stat, dfsClient, src, progress, checksum, cachingStrategy,
+        byteArrayManage);
+    isAppend = true;
+    stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
+    block = lastBlock.getBlock();
+    bytesSent = block.getNumBytes();
+    accessToken = lastBlock.getBlockToken();
+  }
+
+  /**
+   * Set pipeline in construction
+   *
+   * @param lastBlock the last block of a file
+   * @throws IOException
+   */
+  void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException{
+    // setup pipeline to append to the last block XXX retries??
+    setPipeline(lastBlock);
+    errorIndex = -1;   // no errors yet.
+    if (nodes.length < 1) {
+      throw new IOException("Unable to retrieve blocks locations " +
+          " for last block " + block +
+          "of file " + src);
+    }
+  }
+
+  private void setPipeline(LocatedBlock lb) {
+    setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
+  }
+
+  private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
+                           String[] storageIDs) {
+    this.nodes = nodes;
+    this.storageTypes = storageTypes;
+    this.storageIDs = storageIDs;
+  }
+
+  /**
+   * Set favored nodes
+   *
+   * @param favoredNodes favored nodes
+   */
+  void setFavoredNodes(String[] favoredNodes) {
+    this.favoredNodes = favoredNodes;
+  }
+
+  /**
+   * Initialize for data streaming
+   */
+  private void initDataStreaming() {
+    this.setName("DataStreamer for file " + src +
+        " block " + block);
+    response = new ResponseProcessor(nodes);
+    response.start();
+    stage = BlockConstructionStage.DATA_STREAMING;
+  }
+
+  private void endBlock() {
+    if(DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("Closing old block " + block);
+    }
+    this.setName("DataStreamer for file " + src);
+    closeResponder();
+    closeStream();
+    setPipeline(null, null, null);
+    stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+  }
+
+  /*
+   * streamer thread is the only thread that opens streams to datanode,
+   * and closes them. Any error recovery is also done by this thread.
+   */
+  @Override
+  public void run() {
+    long lastPacket = Time.monotonicNow();
+    TraceScope scope = NullScope.INSTANCE;
+    while (!streamerClosed && dfsClient.clientRunning) {
+      // if the Responder encountered an error, shutdown Responder
+      if (hasError && response != null) {
+        try {
+          response.close();
+          response.join();
+          response = null;
+        } catch (InterruptedException  e) {
+          DFSClient.LOG.warn("Caught exception ", e);
+        }
+      }
+
+      DFSPacket one;
+      try {
+        // process datanode IO errors if any
+        boolean doSleep = false;
+        if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) {
+          doSleep = processDatanodeError();
+        }
+
+        synchronized (dataQueue) {
+          // wait for a packet to be sent.
+          long now = Time.monotonicNow();
+          while ((!streamerClosed && !hasError && dfsClient.clientRunning
+              && dataQueue.size() == 0 &&
+              (stage != BlockConstructionStage.DATA_STREAMING ||
+                  stage == BlockConstructionStage.DATA_STREAMING &&
+                      now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
+            long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
+            timeout = timeout <= 0 ? 1000 : timeout;
+            timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
+                timeout : 1000;
+            try {
+              dataQueue.wait(timeout);
+            } catch (InterruptedException  e) {
+              DFSClient.LOG.warn("Caught exception ", e);
+            }
+            doSleep = false;
+            now = Time.monotonicNow();
+          }
+          if (streamerClosed || hasError || !dfsClient.clientRunning) {
+            continue;
+          }
+          // get packet to be sent.
+          if (dataQueue.isEmpty()) {
+            one = createHeartbeatPacket();
+            assert one != null;
+          } else {
+            one = dataQueue.getFirst(); // regular data packet
+            long parents[] = one.getTraceParents();
+            if (parents.length > 0) {
+              scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0]));
+              // TODO: use setParents API once it's available from HTrace 3.2
+              // scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS);
+              // scope.getSpan().setParents(parents);
+            }
+          }
+        }
+
+        // get new block from namenode.
+        if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
+          if(DFSClient.LOG.isDebugEnabled()) {
+            DFSClient.LOG.debug("Allocating new block");
+          }
+          setPipeline(nextBlockOutputStream());
+          initDataStreaming();
+        } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
+          if(DFSClient.LOG.isDebugEnabled()) {
+            DFSClient.LOG.debug("Append to block " + block);
+          }
+          setupPipelineForAppendOrRecovery();
+          initDataStreaming();
+        }
+
+        long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
+        if (lastByteOffsetInBlock > stat.getBlockSize()) {
+          throw new IOException("BlockSize " + stat.getBlockSize() +
+              " is smaller than data size. " +
+              " Offset of packet in block " +
+              lastByteOffsetInBlock +
+              " Aborting file " + src);
+        }
+
+        if (one.isLastPacketInBlock()) {
+          // wait for all data packets have been successfully acked
+          synchronized (dataQueue) {
+            while (!streamerClosed && !hasError &&
+                ackQueue.size() != 0 && dfsClient.clientRunning) {
+              try {
+                // wait for acks to arrive from datanodes
+                dataQueue.wait(1000);
+              } catch (InterruptedException  e) {
+                DFSClient.LOG.warn("Caught exception ", e);
+              }
+            }
+          }
+          if (streamerClosed || hasError || !dfsClient.clientRunning) {
+            continue;
+          }
+          stage = BlockConstructionStage.PIPELINE_CLOSE;
+        }
+
+        // send the packet
+        Span span = null;
+        synchronized (dataQueue) {
+          // move packet from dataQueue to ackQueue
+          if (!one.isHeartbeatPacket()) {
+            span = scope.detach();
+            one.setTraceSpan(span);
+            dataQueue.removeFirst();
+            ackQueue.addLast(one);
+            dataQueue.notifyAll();
+          }
+        }
+
+        if (DFSClient.LOG.isDebugEnabled()) {
+          DFSClient.LOG.debug("DataStreamer block " + block +
+              " sending packet " + one);
+        }
+
+        // write out data to remote datanode
+        TraceScope writeScope = Trace.startSpan("writeTo", span);
+        try {
+          one.writeTo(blockStream);
+          blockStream.flush();
+        } catch (IOException e) {
+          // HDFS-3398 treat primary DN is down since client is unable to
+          // write to primary DN. If a failed or restarting node has already
+          // been recorded by the responder, the following call will have no
+          // effect. Pipeline recovery can handle only one node error at a
+          // time. If the primary node fails again during the recovery, it
+          // will be taken out then.
+          tryMarkPrimaryDatanodeFailed();
+          throw e;
+        } finally {
+          writeScope.close();
+        }
+        lastPacket = Time.monotonicNow();
+
+        // update bytesSent
+        long tmpBytesSent = one.getLastByteOffsetBlock();
+        if (bytesSent < tmpBytesSent) {
+          bytesSent = tmpBytesSent;
+        }
+
+        if (streamerClosed || hasError || !dfsClient.clientRunning) {
+          continue;
+        }
+
+        // Is this block full?
+        if (one.isLastPacketInBlock()) {
+          // wait for the close packet has been acked
+          synchronized (dataQueue) {
+            while (!streamerClosed && !hasError &&
+                ackQueue.size() != 0 && dfsClient.clientRunning) {
+              dataQueue.wait(1000);// wait for acks to arrive from datanodes
+            }
+          }
+          if (streamerClosed || hasError || !dfsClient.clientRunning) {
+            continue;
+          }
+
+          endBlock();
+        }
+        if (progress != null) { progress.progress(); }
+
+        // This is used by unit test to trigger race conditions.
+        if (artificialSlowdown != 0 && dfsClient.clientRunning) {
+          Thread.sleep(artificialSlowdown);
+        }
+      } catch (Throwable e) {
+        // Log warning if there was a real error.
+        if (restartingNodeIndex.get() == -1) {
+          // Since their messages are descriptive enough, do not always
+          // log a verbose stack-trace WARN for quota exceptions.
+          if (e instanceof QuotaExceededException) {
+            DFSClient.LOG.debug("DataStreamer Quota Exception", e);
+          } else {
+            DFSClient.LOG.warn("DataStreamer Exception", e);
+          }
+        }
+        if (e instanceof IOException) {
+          setLastException((IOException)e);
+        } else {
+          setLastException(new IOException("DataStreamer Exception: ",e));
+        }
+        hasError = true;
+        if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
+          // Not a datanode issue
+          streamerClosed = true;
+        }
+      } finally {
+        scope.close();
+      }
+    }
+    closeInternal();
+  }
+
+  private void closeInternal() {
+    closeResponder();       // close and join
+    closeStream();
+    streamerClosed = true;
+    release();
+    synchronized (dataQueue) {
+      dataQueue.notifyAll();
+    }
+  }
+
+  /**
+   * release the DFSPackets in the two queues
+   *
+   */
+  void release() {
+    synchronized (dataQueue) {
+      releaseBuffer(dataQueue, byteArrayManager);
+      releaseBuffer(ackQueue, byteArrayManager);
+    }
+  }
+
+  /**
+   * wait for the ack of seqno
+   *
+   * @param seqno the sequence number to be acked
+   * @throws IOException
+   */
+  void waitForAckedSeqno(long seqno) throws IOException {
+    TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER);
+    try {
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("Waiting for ack for: " + seqno);
+      }
+      long begin = Time.monotonicNow();
+      try {
+        synchronized (dataQueue) {
+          while (!streamerClosed) {
+            checkClosed();
+            if (lastAckedSeqno >= seqno) {
+              break;
+            }
+            try {
+              dataQueue.wait(1000); // when we receive an ack, we notify on
+              // dataQueue
+            } catch (InterruptedException ie) {
+              throw new InterruptedIOException(
+                  "Interrupted while waiting for data to be acknowledged by pipeline");
+            }
+          }
+        }
+        checkClosed();
+      } catch (ClosedChannelException e) {
+      }
+      long duration = Time.monotonicNow() - begin;
+      if (duration > dfsclientSlowLogThresholdMs) {
+        DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration
+            + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
+      }
+    } finally {
+      scope.close();
+    }
+  }
+
+  /**
+   * wait for space of dataQueue and queue the packet
+   *
+   * @param packet  the DFSPacket to be queued
+   * @throws IOException
+   */
+  void waitAndQueuePacket(DFSPacket packet) throws IOException {
+    synchronized (dataQueue) {
+      try {
+        // If queue is full, then wait till we have enough space
+        boolean firstWait = true;
+        try {
+          while (!streamerClosed && dataQueue.size() + ackQueue.size() >
+              dfsClient.getConf().writeMaxPackets) {
+            if (firstWait) {
+              Span span = Trace.currentSpan();
+              if (span != null) {
+                span.addTimelineAnnotation("dataQueue.wait");
+              }
+              firstWait = false;
+            }
+            try {
+              dataQueue.wait();
+            } catch (InterruptedException e) {
+              // If we get interrupted while waiting to queue data, we still need to get rid
+              // of the current packet. This is because we have an invariant that if
+              // currentPacket gets full, it will get queued before the next writeChunk.
+              //
+              // Rather than wait around for space in the queue, we should instead try to
+              // return to the caller as soon as possible, even though we slightly overrun
+              // the MAX_PACKETS length.
+              Thread.currentThread().interrupt();
+              break;
+            }
+          }
+        } finally {
+          Span span = Trace.currentSpan();
+          if ((span != null) && (!firstWait)) {
+            span.addTimelineAnnotation("end.wait");
+          }
+        }
+        checkClosed();
+        queuePacket(packet);
+      } catch (ClosedChannelException e) {
+      }
+    }
+  }
+
+  /*
+   * close the streamer, should be called only by an external thread
+   * and only after all data to be sent has been flushed to datanode.
+   *
+   * Interrupt this data streamer if force is true
+   *
+   * @param force if this data stream is forced to be closed
+   */
+  void close(boolean force) {
+    streamerClosed = true;
+    synchronized (dataQueue) {
+      dataQueue.notifyAll();
+    }
+    if (force) {
+      this.interrupt();
+    }
+  }
+
+
+  private void checkClosed() throws IOException {
+    if (streamerClosed) {
+      IOException e = lastException.get();
+      throw e != null ? e : new ClosedChannelException();
+    }
+  }
+
+  private void closeResponder() {
+    if (response != null) {
+      try {
+        response.close();
+        response.join();
+      } catch (InterruptedException  e) {
+        DFSClient.LOG.warn("Caught exception ", e);
+      } finally {
+        response = null;
+      }
+    }
+  }
+
+  private void closeStream() {
+    if (blockStream != null) {
+      try {
+        blockStream.close();
+      } catch (IOException e) {
+        setLastException(e);
+      } finally {
+        blockStream = null;
+      }
+    }
+    if (blockReplyStream != null) {
+      try {
+        blockReplyStream.close();
+      } catch (IOException e) {
+        setLastException(e);
+      } finally {
+        blockReplyStream = null;
+      }
+    }
+    if (null != s) {
+      try {
+        s.close();
+      } catch (IOException e) {
+        setLastException(e);
+      } finally {
+        s = null;
+      }
+    }
+  }
+
+  // The following synchronized methods are used whenever
+  // errorIndex or restartingNodeIndex is set. This is because
+  // check & set needs to be atomic. Simply reading variables
+  // does not require a synchronization. When responder is
+  // not running (e.g. during pipeline recovery), there is no
+  // need to use these methods.
+
+  /** Set the error node index. Called by responder */
+  synchronized void setErrorIndex(int idx) {
+    errorIndex = idx;
+  }
+
+  /** Set the restarting node index. Called by responder */
+  synchronized void setRestartingNodeIndex(int idx) {
+    restartingNodeIndex.set(idx);
+    // If the data streamer has already set the primary node
+    // bad, clear it. It is likely that the write failed due to
+    // the DN shutdown. Even if it was a real failure, the pipeline
+    // recovery will take care of it.
+    errorIndex = -1;
+  }
+
+  /**
+   * This method is used when no explicit error report was received,
+   * but something failed. When the primary node is a suspect or
+   * unsure about the cause, the primary node is marked as failed.
+   */
+  synchronized void tryMarkPrimaryDatanodeFailed() {
+    // There should be no existing error and no ongoing restart.
+    if ((errorIndex == -1) && (restartingNodeIndex.get() == -1)) {
+      errorIndex = 0;
+    }
+  }
+
+  /**
+   * Examine whether it is worth waiting for a node to restart.
+   * @param index the node index
+   */
+  boolean shouldWaitForRestart(int index) {
+    // Only one node in the pipeline.
+    if (nodes.length == 1) {
+      return true;
+    }
+
+    // Is it a local node?
+    InetAddress addr = null;
+    try {
+      addr = InetAddress.getByName(nodes[index].getIpAddr());
+    } catch (java.net.UnknownHostException e) {
+      // we are passing an ip address. this should not happen.
+      assert false;
+    }
+
+    if (addr != null && NetUtils.isLocalAddress(addr)) {
+      return true;
+    }
+    return false;
+  }
+
+  //
+  // Processes responses from the datanodes.  A packet is removed
+  // from the ackQueue when its response arrives.
+  //
+  private class ResponseProcessor extends Daemon {
+
+    private volatile boolean responderClosed = false;
+    private DatanodeInfo[] targets = null;
+    private boolean isLastPacketInBlock = false;
+
+    ResponseProcessor (DatanodeInfo[] targets) {
+      this.targets = targets;
+    }
+
+    @Override
+    public void run() {
+
+      setName("ResponseProcessor for block " + block);
+      PipelineAck ack = new PipelineAck();
+
+      TraceScope scope = NullScope.INSTANCE;
+      while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
+        // process responses from datanodes.
+        try {
+          // read an ack from the pipeline
+          long begin = Time.monotonicNow();
+          ack.readFields(blockReplyStream);
+          long duration = Time.monotonicNow() - begin;
+          if (duration > dfsclientSlowLogThresholdMs
+              && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
+            DFSClient.LOG
+                .warn("Slow ReadProcessor read fields took " + duration
+                    + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
+                    + ack + ", targets: " + Arrays.asList(targets));
+          } else if (DFSClient.LOG.isDebugEnabled()) {
+            DFSClient.LOG.debug("DFSClient " + ack);
+          }
+
+          long seqno = ack.getSeqno();
+          // processes response status from datanodes.
+          for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
+            final Status reply = PipelineAck.getStatusFromHeader(ack
+                .getReply(i));
+            // Restart will not be treated differently unless it is
+            // the local node or the only one in the pipeline.
+            if (PipelineAck.isRestartOOBStatus(reply) &&
+                shouldWaitForRestart(i)) {
+              restartDeadline = dfsClient.getConf().datanodeRestartTimeout
+                  + Time.monotonicNow();
+              setRestartingNodeIndex(i);
+              String message = "A datanode is restarting: " + targets[i];
+              DFSClient.LOG.info(message);
+              throw new IOException(message);
+            }
+            // node error
+            if (reply != SUCCESS) {
+              setErrorIndex(i); // first bad datanode
+              throw new IOException("Bad response " + reply +
+                  " for block " + block +
+                  " from datanode " +
+                  targets[i]);
+            }
+          }
+
+          assert seqno != PipelineAck.UNKOWN_SEQNO :
+              "Ack for unknown seqno should be a failed ack: " + ack;
+          if (seqno == DFSPacket.HEART_BEAT_SEQNO) {  // a heartbeat ack
+            continue;
+          }
+
+          // a success ack for a data packet
+          DFSPacket one;
+          synchronized (dataQueue) {
+            one = ackQueue.getFirst();
+          }
+          if (one.getSeqno() != seqno) {
+            throw new IOException("ResponseProcessor: Expecting seqno " +
+                " for block " + block +
+                one.getSeqno() + " but received " + seqno);
+          }
+          isLastPacketInBlock = one.isLastPacketInBlock();
+
+          // Fail the packet write for testing in order to force a
+          // pipeline recovery.
+          if (DFSClientFaultInjector.get().failPacket() &&
+              isLastPacketInBlock) {
+            failPacket = true;
+            throw new IOException(
+                "Failing the last packet for testing.");
+          }
+
+          // update bytesAcked
+          block.setNumBytes(one.getLastByteOffsetBlock());
+
+          synchronized (dataQueue) {
+            scope = Trace.continueSpan(one.getTraceSpan());
+            one.setTraceSpan(null);
+            lastAckedSeqno = seqno;
+            ackQueue.removeFirst();
+            dataQueue.notifyAll();
+
+            one.releaseBuffer(byteArrayManager);
+          }
+        } catch (Exception e) {
+          if (!responderClosed) {
+            if (e instanceof IOException) {
+              setLastException((IOException)e);
+            }
+            hasError = true;
+            // If no explicit error report was received, mark the primary
+            // node as failed.
+            tryMarkPrimaryDatanodeFailed();
+            synchronized (dataQueue) {
+              dataQueue.notifyAll();
+            }
+            if (restartingNodeIndex.get() == -1) {
+              DFSClient.LOG.warn("DataStreamer ResponseProcessor exception "
+                  + " for block " + block, e);
+            }
+            responderClosed = true;
+          }
+        } finally {
+            scope.close();
+        }
+      }
+    }
+
+    void close() {
+      responderClosed = true;
+      this.interrupt();
+    }
+  }
+
+  // If this stream has encountered any errors so far, shutdown
+  // threads and mark stream as closed. Returns true if we should
+  // sleep for a while after returning from this call.
+  //
+  private boolean processDatanodeError() throws IOException {
+    if (response != null) {
+      DFSClient.LOG.info("Error Recovery for " + block +
+          " waiting for responder to exit. ");
+      return true;
+    }
+    closeStream();
+
+    // move packets from ack queue to front of the data queue
+    synchronized (dataQueue) {
+      dataQueue.addAll(0, ackQueue);
+      ackQueue.clear();
+    }
+
+    // Record the new pipeline failure recovery.
+    if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
+      lastAckedSeqnoBeforeFailure = lastAckedSeqno;
+      pipelineRecoveryCount = 1;
+    } else {
+      // If we had to recover the pipeline five times in a row for the
+      // same packet, this client likely has corrupt data or corrupting
+      // during transmission.
+      if (++pipelineRecoveryCount > 5) {
+        DFSClient.LOG.warn("Error recovering pipeline for writing " +
+            block + ". Already retried 5 times for the same packet.");
+        lastException.set(new IOException("Failing write. Tried pipeline " +
+            "recovery 5 times without success."));
+        streamerClosed = true;
+        return false;
+      }
+    }
+    boolean doSleep = setupPipelineForAppendOrRecovery();
+
+    if (!streamerClosed && dfsClient.clientRunning) {
+      if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
+
+        // If we had an error while closing the pipeline, we go through a fast-path
+        // where the BlockReceiver does not run. Instead, the DataNode just finalizes
+        // the block immediately during the 'connect ack' process. So, we want to pull
+        // the end-of-block packet from the dataQueue, since we don't actually have
+        // a true pipeline to send it over.
+        //
+        // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
+        // a client waiting on close() will be aware that the flush finished.
+        synchronized (dataQueue) {
+          DFSPacket endOfBlockPacket = dataQueue.remove();  // remove the end of block packet
+          Span span = endOfBlockPacket.getTraceSpan();
+          if (span != null) {
+            // Close any trace span associated with this Packet
+            TraceScope scope = Trace.continueSpan(span);
+            scope.close();
+          }
+          assert endOfBlockPacket.isLastPacketInBlock();
+          assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
+          lastAckedSeqno = endOfBlockPacket.getSeqno();
+          dataQueue.notifyAll();
+        }
+        endBlock();
+      } else {
+        initDataStreaming();
+      }
+    }
+
+    return doSleep;
+  }
+
+  void setHflush() {
+    isHflushed = true;
+  }
+
+  private int findNewDatanode(final DatanodeInfo[] original
+  ) throws IOException {
+    if (nodes.length != original.length + 1) {
+      throw new IOException(
+          new StringBuilder()
+              .append("Failed to replace a bad datanode on the existing pipeline ")
+              .append("due to no more good datanodes being available to try. ")
+              .append("(Nodes: current=").append(Arrays.asList(nodes))
+              .append(", original=").append(Arrays.asList(original)).append("). ")
+              .append("The current failed datanode replacement policy is ")
+              .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ")
+              .append("a client may configure this via '")
+              .append(DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY)
+              .append("' in its configuration.")
+              .toString());
+    }
+    for(int i = 0; i < nodes.length; i++) {
+      int j = 0;
+      for(; j < original.length && !nodes[i].equals(original[j]); j++);
+      if (j == original.length) {
+        return i;
+      }
+    }
+    throw new IOException("Failed: new datanode not found: nodes="
+        + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
+  }
+
+  private void addDatanode2ExistingPipeline() throws IOException {
+    if (DataTransferProtocol.LOG.isDebugEnabled()) {
+      DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
+    }
+      /*
+       * Is data transfer necessary?  We have the following cases.
+       *
+       * Case 1: Failure in Pipeline Setup
+       * - Append
+       *    + Transfer the stored replica, which may be a RBW or a finalized.
+       * - Create
+       *    + If no data, then no transfer is required.
+       *    + If there are data written, transfer RBW. This case may happens
+       *      when there are streaming failure earlier in this pipeline.
+       *
+       * Case 2: Failure in Streaming
+       * - Append/Create:
+       *    + transfer RBW
+       *
+       * Case 3: Failure in Close
+       * - Append/Create:
+       *    + no transfer, let NameNode replicates the block.
+       */
+    if (!isAppend && lastAckedSeqno < 0
+        && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
+      //no data have been written
+      return;
+    } else if (stage == BlockConstructionStage.PIPELINE_CLOSE
+        || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+      //pipeline is closing
+      return;
+    }
+
+    //get a new datanode
+    final DatanodeInfo[] original = nodes;
+    final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
+        src, stat.getFileId(), block, nodes, storageIDs,
+        failed.toArray(new DatanodeInfo[failed.size()]),
+        1, dfsClient.clientName);
+    setPipeline(lb);
+
+    //find the new datanode
+    final int d = findNewDatanode(original);
+
+    //transfer replica
+    final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
+    final DatanodeInfo[] targets = {nodes[d]};
+    final StorageType[] targetStorageTypes = {storageTypes[d]};
+    transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+  }
+
+  private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
+                        final StorageType[] targetStorageTypes,
+                        final Token<BlockTokenIdentifier> blockToken) throws IOException {
+    //transfer replica to the new datanode
+    Socket sock = null;
+    DataOutputStream out = null;
+    DataInputStream in = null;
+    try {
+      sock = createSocketForPipeline(src, 2, dfsClient);
+      final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
+
+      OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+      InputStream unbufIn = NetUtils.getInputStream(sock);
+      IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
+          unbufOut, unbufIn, dfsClient, blockToken, src);
+      unbufOut = saslStreams.out;
+      unbufIn = saslStreams.in;
+      out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+          HdfsConstants.SMALL_BUFFER_SIZE));
+      in = new DataInputStream(unbufIn);
+
+      //send the TRANSFER_BLOCK request
+      new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
+          targets, targetStorageTypes);
+      out.flush();
+
+      //ack
+      BlockOpResponseProto response =
+          BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
+      if (SUCCESS != response.getStatus()) {
+        throw new IOException("Failed to add a datanode");
+      }
+    } finally {
+      IOUtils.closeStream(in);
+      IOUtils.closeStream(out);
+      IOUtils.closeSocket(sock);
+    }
+  }
+
+  /**
+   * Open a DataStreamer to a DataNode pipeline so that
+   * it can be written to.
+   * This happens when a file is appended or data streaming fails
+   * It keeps on trying until a pipeline is setup
+   */
+  private boolean setupPipelineForAppendOrRecovery() throws IOException {
+    // check number of datanodes
+    if (nodes == null || nodes.length == 0) {
+      String msg = "Could not get block locations. " + "Source file \""
+          + src + "\" - Aborting...";
+      DFSClient.LOG.warn(msg);
+      setLastException(new IOException(msg));
+      streamerClosed = true;
+      return false;
+    }
+
+    boolean success = false;
+    long newGS = 0L;
+    while (!success && !streamerClosed && dfsClient.clientRunning) {
+      // Sleep before reconnect if a dn is restarting.
+      // This process will be repeated until the deadline or the datanode
+      // starts back up.
+      if (restartingNodeIndex.get() >= 0) {
+        // 4 seconds or the configured deadline period, whichever is shorter.
+        // This is the retry interval and recovery will be retried in this
+        // interval until timeout or success.
+        long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
+            4000L);
+        try {
+          Thread.sleep(delay);
+        } catch (InterruptedException ie) {
+          lastException.set(new IOException("Interrupted while waiting for " +
+              "datanode to restart. " + nodes[restartingNodeIndex.get()]));
+          streamerClosed = true;
+          return false;
+        }
+      }
+      boolean isRecovery = hasError;
+      // remove bad datanode from list of datanodes.
+      // If errorIndex was not set (i.e. appends), then do not remove
+      // any datanodes
+      //
+      if (errorIndex >= 0) {
+        StringBuilder pipelineMsg = new StringBuilder();
+        for (int j = 0; j < nodes.length; j++) {
+          pipelineMsg.append(nodes[j]);
+          if (j < nodes.length - 1) {
+            pipelineMsg.append(", ");
+          }
+        }
+        if (nodes.length <= 1) {
+          lastException.set(new IOException("All datanodes " + pipelineMsg
+              + " are bad. Aborting..."));
+          streamerClosed = true;
+          return false;
+        }
+        DFSClient.LOG.warn("Error Recovery for block " + block +
+            " in pipeline " + pipelineMsg +
+            ": bad datanode " + nodes[errorIndex]);
+        failed.add(nodes[errorIndex]);
+
+        DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
+        arraycopy(nodes, newnodes, errorIndex);
+
+        final StorageType[] newStorageTypes = new StorageType[newnodes.length];
+        arraycopy(storageTypes, newStorageTypes, errorIndex);
+
+        final String[] newStorageIDs = new String[newnodes.length];
+        arraycopy(storageIDs, newStorageIDs, errorIndex);
+
+        setPipeline(newnodes, newStorageTypes, newStorageIDs);
+
+        // Just took care of a node error while waiting for a node restart
+        if (restartingNodeIndex.get() >= 0) {
+          // If the error came from a node further away than the restarting
+          // node, the restart must have been complete.
+          if (errorIndex > restartingNodeIndex.get()) {
+            restartingNodeIndex.set(-1);
+          } else if (errorIndex < restartingNodeIndex.get()) {
+            // the node index has shifted.
+            restartingNodeIndex.decrementAndGet();
+          } else {
+            // this shouldn't happen...
+            assert false;
+          }
+        }
+
+        if (restartingNodeIndex.get() == -1) {
+          hasError = false;
+        }
+        lastException.set(null);
+        errorIndex = -1;
+      }
+
+      // Check if replace-datanode policy is satisfied.
+      if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(stat.getReplication(),
+          nodes, isAppend, isHflushed)) {
+        try {
+          addDatanode2ExistingPipeline();
+        } catch(IOException ioe) {
+          if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
+            throw ioe;
+          }
+          DFSClient.LOG.warn("Failed to replace datanode."
+              + " Continue with the remaining datanodes since "
+              + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY
+              + " is set to true.", ioe);
+        }
+      }
+
+      // get a new generation stamp and an access token
+      LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
+      newGS = lb.getBlock().getGenerationStamp();
+      accessToken = lb.getBlockToken();
+
+      // set up the pipeline again with the remaining nodes
+      if (failPacket) { // for testing
+        success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
+        failPacket = false;
+        try {
+          // Give DNs time to send in bad reports. In real situations,
+          // good reports should follow bad ones, if client committed
+          // with those nodes.
+          Thread.sleep(2000);
+        } catch (InterruptedException ie) {}
+      } else {
+        success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
+      }
+
+      if (restartingNodeIndex.get() >= 0) {
+        assert hasError == true;
+        // check errorIndex set above
+        if (errorIndex == restartingNodeIndex.get()) {
+          // ignore, if came from the restarting node
+          errorIndex = -1;
+        }
+        // still within the deadline
+        if (Time.monotonicNow() < restartDeadline) {
+          continue; // with in the deadline
+        }
+        // expired. declare the restarting node dead
+        restartDeadline = 0;
+        int expiredNodeIndex = restartingNodeIndex.get();
+        restartingNodeIndex.set(-1);
+        DFSClient.LOG.warn("Datanode did not restart in time: " +
+            nodes[expiredNodeIndex]);
+        // Mark the restarting node as failed. If there is any other failed
+        // node during the last pipeline construction attempt, it will not be
+        // overwritten/dropped. In this case, the restarting node will get
+        // excluded in the following attempt, if it still does not come up.
+        if (errorIndex == -1) {
+          errorIndex = expiredNodeIndex;
+        }
+        // From this point on, normal pipeline recovery applies.
+      }
+    } // while
+
+    if (success) {
+      // update pipeline at the namenode
+      ExtendedBlock newBlock = new ExtendedBlock(
+          block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
+      dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
+          nodes, storageIDs);
+      // update client side generation stamp
+      block = newBlock;
+    }
+    return false; // do not sleep, continue processing
+  }
+
+  /**
+   * Open a DataStreamer to a DataNode so that it can be written to.
+   * This happens when a file is created and each time a new block is allocated.
+   * Must get block ID and the IDs of the destinations from the namenode.
+   * Returns the list of target datanodes.
+   */
+  private LocatedBlock nextBlockOutputStream() throws IOException {
+    LocatedBlock lb = null;
+    DatanodeInfo[] nodes = null;
+    StorageType[] storageTypes = null;
+    int count = dfsClient.getConf().nBlockWriteRetry;
+    boolean success = false;
+    ExtendedBlock oldBlock = block;
+    do {
+      hasError = false;
+      lastException.set(null);
+      errorIndex = -1;
+      success = false;
+
+      DatanodeInfo[] excluded =
+          excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
+              .keySet()
+              .toArray(new DatanodeInfo[0]);
+      block = oldBlock;
+      lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
+      block = lb.getBlock();
+      block.setNumBytes(0);
+      bytesSent = 0;
+      accessToken = lb.getBlockToken();
+      nodes = lb.getLocations();
+      storageTypes = lb.getStorageTypes();
+
+      //
+      // Connect to first DataNode in the list.
+      //
+      success = createBlockOutputStream(nodes, storageTypes, 0L, false);
+
+      if (!success) {
+        DFSClient.LOG.info("Abandoning " + block);
+        dfsClient.namenode.abandonBlock(block, stat.getFileId(), src,
+            dfsClient.clientName);
+        block = null;
+        DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
+        excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
+      }
+    } while (!success && --count >= 0);
+
+    if (!success) {
+      throw new IOException("Unable to create new block.");
+    }
+    return lb;
+  }
+
+  // connects to the first datanode in the pipeline
+  // Returns true if success, otherwise return failure.
+  //
+  private boolean createBlockOutputStream(DatanodeInfo[] nodes,
+      StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
+    if (nodes.length == 0) {
+      DFSClient.LOG.info("nodes are empty for write pipeline of block "
+          + block);
+      return false;
+    }
+    Status pipelineStatus = SUCCESS;
+    String firstBadLink = "";
+    boolean checkRestart = false;
+    if (DFSClient.LOG.isDebugEnabled()) {
+      for (int i = 0; i < nodes.length; i++) {
+        DFSClient.LOG.debug("pipeline = " + nodes[i]);
+      }
+    }
+
+    // persist blocks on namenode on next flush
+    persistBlocks.set(true);
+
+    int refetchEncryptionKey = 1;
+    while (true) {
+      boolean result = false;
+      DataOutputStream out = null;
+      try {
+        assert null == s : "Previous socket unclosed";
+        assert null == blockReplyStream : "Previous blockReplyStream unclosed";
+        s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
+        long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
+
+        OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
+        InputStream unbufIn = NetUtils.getInputStream(s);
+        IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
+            unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
+        unbufOut = saslStreams.out;
+        unbufIn = saslStreams.in;
+        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+            HdfsConstants.SMALL_BUFFER_SIZE));
+        blockReplyStream = new DataInputStream(unbufIn);
+
+        //
+        // Xmit header info to datanode
+        //
+
+        BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage;
+
+        // We cannot change the block length in 'block' as it counts the number
+        // of bytes ack'ed.
+        ExtendedBlock blockCopy = new ExtendedBlock(block);
+        blockCopy.setNumBytes(stat.getBlockSize());
+
+        boolean[] targetPinnings = getPinnings(nodes, true);
+        // send the request
+        new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
+            dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
+            nodes.length, block.getNumBytes(), bytesSent, newGS,
+            checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
+            (targetPinnings == null ? false : targetPinnings[0]), targetPinnings);
+
+        // receive ack for connect
+        BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+            PBHelper.vintPrefixed(blockReplyStream));
+        pipelineStatus = resp.getStatus();
+        firstBadLink = resp.getFirstBadLink();
+
+        // Got an restart OOB ack.
+        // If a node is already restarting, this status is not likely from
+        // the same node. If it is from a different node, it is not
+        // from the local datanode. Thus it is safe to treat this as a
+        // regular node error.
+        if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
+            restartingNodeIndex.get() == -1) {
+          checkRestart = true;
+          throw new IOException("A datanode is restarting.");
+        }
+		
+        String logInfo = "ack with firstBadLink as " + firstBadLink;
+        DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);
+
+        assert null == blockStream : "Previous blockStream unclosed";
+        blockStream = out;
+        result =  true; // success
+        restartingNodeIndex.set(-1);
+        hasError = false;
+      } catch (IOException ie) {
+        if (restartingNodeIndex.get() == -1) {
+          DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
+        }
+        if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+          DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+              + "encryption key was invalid when connecting to "
+              + nodes[0] + " : " + ie);
+          // The encryption key used is invalid.
+          refetchEncryptionKey--;
+          dfsClient.clearDataEncryptionKey();
+          // Don't close the socket/exclude this node just yet. Try again with
+          // a new encryption key.
+          continue;
+        }
+
+        // find the datanode that matches
+        if (firstBadLink.length() != 0) {
+          for (int i = 0; i < nodes.length; i++) {
+            // NB: Unconditionally using the xfer addr w/o hostname
+            if (firstBadLink.equals(nodes[i].getXferAddr())) {
+              errorIndex = i;
+              break;
+            }
+          }
+        } else {
+          assert checkRestart == false;
+          errorIndex = 0;
+        }
+        // Check whether there is a restart worth waiting for.
+        if (checkRestart && shouldWaitForRestart(errorIndex)) {
+          restartDeadline = dfsClient.getConf().datanodeRestartTimeout
+              + Time.monotonicNow();
+          restartingNodeIndex.set(errorIndex);
+          errorIndex = -1;
+          DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
+              nodes[restartingNodeIndex.get()]);
+        }
+        hasError = true;
+        setLastException(ie);
+        result =  false;  // error
+      } finally {
+        if (!result) {
+          IOUtils.closeSocket(s);
+          s = null;
+          IOUtils.closeStream(out);
+          out = null;
+          IOUtils.closeStream(blockReplyStream);
+          blockReplyStream = null;
+        }
+      }
+      return result;
+    }
+  }
+
+  private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) {
+    if (favoredNodes == null) {
+      return null;
+    } else {
+      boolean[] pinnings = new boolean[nodes.length];
+      HashSet<String> favoredSet =
+          new HashSet<String>(Arrays.asList(favoredNodes));
+      for (int i = 0; i < nodes.length; i++) {
+        pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname());
+        if (DFSClient.LOG.isDebugEnabled()) {
+          DFSClient.LOG.debug(nodes[i].getXferAddrWithHostname() +
+              " was chosen by name node (favored=" + pinnings[i] +
+              ").");
+        }
+      }
+      if (shouldLog && !favoredSet.isEmpty()) {
+        // There is one or more favored nodes that were not allocated.
+        DFSClient.LOG.warn(
+            "These favored nodes were specified but not chosen: " +
+                favoredSet +
+                " Specified favored nodes: " + Arrays.toString(favoredNodes));
+
+      }
+      return pinnings;
+    }
+  }
+
+  private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
+      throws IOException {
+    int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
+    long sleeptime = dfsClient.getConf().
+        blockWriteLocateFollowingInitialDelayMs;
+    while (true) {
+      long localstart = Time.monotonicNow();
+      while (true) {
+        try {
+          return dfsClient.namenode.addBlock(src, dfsClient.clientName,
+              block, excludedNodes, stat.getFileId(), favoredNodes);
+        } catch (RemoteException e) {
+          IOException ue =
+              e.unwrapRemoteException(FileNotFoundException.class,
+                  AccessControlException.class,
+                  NSQuotaExceededException.class,
+                  DSQuotaExceededException.class,
+                  UnresolvedPathException.class);
+          if (ue != e) {
+            throw ue; // no need to retry these exceptions
+          }
+
+
+          if (NotReplicatedYetException.class.getName().
+              equals(e.getClassName())) {
+            if (retries == 0) {
+              throw e;
+            } else {
+              --retries;
+              DFSClient.LOG.info("Exception while adding a block", e);
+              long elapsed = Time.monotonicNow() - localstart;
+              if (elapsed > 5000) {
+                DFSClient.LOG.info("Waiting for replication for "
+                    + (elapsed / 1000) + " seconds");
+              }
+              try {
+                DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src
+                    + " retries left " + retries);
+                Thread.sleep(sleeptime);
+                sleeptime *= 2;
+              } catch (InterruptedException ie) {
+                DFSClient.LOG.warn("Caught exception ", ie);
+              }
+            }
+          } else {
+            throw e;
+          }
+
+        }
+      }
+    }
+  }
+
+  /**
+   * get the block this streamer is writing to
+   *
+   * @return the block this streamer is writing to
+   */
+  ExtendedBlock getBlock() {
+    return block;
+  }
+
+  /**
+   * return the target datanodes in the pipeline
+   *
+   * @return the target datanodes in the pipeline
+   */
+  DatanodeInfo[] getNodes() {
+    return nodes;
+  }
+
+  /**
+   * return the token of the block
+   *
+   * @return the token of the block
+   */
+  Token<BlockTokenIdentifier> getBlockToken() {
+    return accessToken;
+  }
+
+  /**
+   * set last exception
+   *
+   * @param e an exception
+   */
+  void setLastException(IOException e) {
+    lastException.compareAndSet(null, e);
+  }
+
+  /**
+   * Put a packet to the data queue
+   *
+   * @param packet the packet to be put into the data queued
+   */
+  void queuePacket(DFSPacket packet) {
+    synchronized (dataQueue) {
+      if (packet == null) return;
+      packet.addTraceParent(Trace.currentSpan());
+      dataQueue.addLast(packet);
+      lastQueuedSeqno = packet.getSeqno();
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("Queued packet " + packet.getSeqno());
+      }
+      dataQueue.notifyAll();
+    }
+  }
+
+  /**
+   * For heartbeat packets, create buffer directly by new byte[]
+   * since heartbeats should not be blocked.
+   */
+  private DFSPacket createHeartbeatPacket() throws InterruptedIOException {
+    final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
+    return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, 0, false);
+  }
+
+  private LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes() {
+    return CacheBuilder.newBuilder().expireAfterWrite(
+        dfsClient.getConf().excludedNodesCacheExpiry, TimeUnit.MILLISECONDS)
+        .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
+          @Override
+          public void onRemoval(
+              RemovalNotification<DatanodeInfo, DatanodeInfo> notification) {
+            DFSClient.LOG.info("Removing node " + notification.getKey()
+                + " from the excluded nodes list");
+          }
+        }).build(new CacheLoader<DatanodeInfo, DatanodeInfo>() {
+          @Override
+          public DatanodeInfo load(DatanodeInfo key) throws Exception {
+            return key;
+          }
+        });
+  }
+
+  private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
+    System.arraycopy(srcs, 0, dsts, 0, skipIndex);
+    System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);
+  }
+
+  /**
+   * check if to persist blocks on namenode
+   *
+   * @return if to persist blocks on namenode
+   */
+  AtomicBoolean getPersistBlocks(){
+    return persistBlocks;
+  }
+
+  /**
+   * check if to append a chunk
+   *
+   * @param appendChunk if to append a chunk
+   */
+  void setAppendChunk(boolean appendChunk){
+    this.appendChunk = appendChunk;
+  }
+
+  /**
+   * get if to append a chunk
+   *
+   * @return if to append a chunk
+   */
+  boolean getAppendChunk(){
+    return appendChunk;
+  }
+
+  /**
+   * get the last exception
+   *
+   * @return the last exception
+   */
+  AtomicReference<IOException> getLastException(){
+    return lastException;
+  }
+
+  /**
+   * get the socket connecting to the first datanode in pipeline
+   *
+   * @return socket connecting to the first datanode in pipeline
+   */
+  Socket getSocket() {
+    return s;
+  }
+
+  /**
+   * set socket to null
+   */
+  void setSocketToNull() {
+    this.s = null;
+  }
+
+  /**
+   * return current sequence number and then increase it by 1
+   *
+   * @return current sequence number before increasing
+   */
+  long getAndIncCurrentSeqno() {
+    long old = this.currentSeqno;
+    this.currentSeqno++;
+    return old;
+  }
+
+  /**
+   * get last queued sequence number
+   *
+   * @return last queued sequence number
+   */
+  long getLastQueuedSeqno() {
+    return lastQueuedSeqno;
+  }
+
+  /**
+   * get the number of bytes of current block
+   *
+   * @return the number of bytes of current block
+   */
+  long getBytesCurBlock() {
+    return bytesCurBlock;
+  }
+
+  /**
+   * set the bytes of current block that have been written
+   *
+   * @param bytesCurBlock bytes of current block that have been written
+   */
+  void setBytesCurBlock(long bytesCurBlock) {
+    this.bytesCurBlock = bytesCurBlock;
+  }
+
+  /**
+   * increase bytes of current block by len.
+   *
+   * @param len how many bytes to increase to current block
+   */
+  void incBytesCurBlock(long len) {
+    this.bytesCurBlock += len;
+  }
+
+  /**
+   * set artificial slow down for unit test
+   *
+   * @param period artificial slow down
+   */
+  void setArtificialSlowdown(long period) {
+    this.artificialSlowdown = period;
+  }
+
+  /**
+   * if this streamer is to terminate
+   *
+   * @return if this streamer is to terminate
+   */
+  boolean streamerClosed(){
+    return streamerClosed;
+  }
+
+  void closeSocket() throws IOException {
+    if (s != null) {
+      s.close();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16bfff7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 493351b..5fc78d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -905,7 +905,7 @@ public class DFSTestUtil {
   public static BlockOpResponseProto transferRbw(final ExtendedBlock b, 
       final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
     assertEquals(2, datanodes.length);
-    final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0],
+    final Socket s = DataStreamer.createSocketForPipeline(datanodes[0],
         datanodes.length, dfsClient);
     final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16bfff7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
index 7269e39..b47e7f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
@@ -51,8 +51,11 @@ public class TestDFSOutputStream {
     DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,
         "wrappedStream");
     @SuppressWarnings("unchecked")
+    DataStreamer streamer = (DataStreamer) Whitebox
+        .getInternalState(dos, "streamer");
+    @SuppressWarnings("unchecked")
     AtomicReference<IOException> ex = (AtomicReference<IOException>) Whitebox
-        .getInternalState(dos, "lastException");
+        .getInternalState(streamer, "lastException");
     Assert.assertEquals(null, ex.get());
 
     dos.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16bfff7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
index e1c547b..fd916a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
@@ -43,6 +43,8 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
@@ -603,7 +605,8 @@ public class TestFileCreation {
    * Test that file leases are persisted across namenode restarts.
    */
   @Test
-  public void testFileCreationNamenodeRestart() throws IOException {
+  public void testFileCreationNamenodeRestart()
+      throws IOException, NoSuchFieldException, IllegalAccessException {
     Configuration conf = new HdfsConfiguration();
     final int MAX_IDLE_TIME = 2000; // 2s
     conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME);
@@ -702,11 +705,18 @@ public class TestFileCreation {
       // new blocks for files that were renamed.
       DFSOutputStream dfstream = (DFSOutputStream)
                                                  (stm.getWrappedStream());
-      dfstream.setTestFilename(file1.toString());
+
+      Field f = DFSOutputStream.class.getDeclaredField("src");
+      Field modifiersField = Field.class.getDeclaredField("modifiers");
+      modifiersField.setAccessible(true);
+      modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL);
+      f.setAccessible(true);
+
+      f.set(dfstream, file1.toString());
       dfstream = (DFSOutputStream) (stm3.getWrappedStream());
-      dfstream.setTestFilename(file3new.toString());
+      f.set(dfstream, file3new.toString());
       dfstream = (DFSOutputStream) (stm4.getWrappedStream());
-      dfstream.setTestFilename(file4new.toString());
+      f.set(dfstream, file4new.toString());
 
       // write 1 byte to file.  This should succeed because the 
       // namenode should have persisted leases.


[2/2] hadoop git commit: HDFS-7854. Separate class DataStreamer out of DFSOutputStream. Contributed by Li Bo.

Posted by ji...@apache.org.
HDFS-7854. Separate class DataStreamer out of DFSOutputStream. Contributed by Li Bo.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a16bfff7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a16bfff7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a16bfff7

Branch: refs/heads/trunk
Commit: a16bfff71bd7f00e06e1f59bfe5445a154bb8c66
Parents: 570a83a
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Mar 24 11:06:13 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Mar 24 11:06:13 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |    3 +
 .../dev-support/findbugsExcludeFile.xml         |    2 +-
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 1694 ++---------------
 .../org/apache/hadoop/hdfs/DataStreamer.java    | 1754 ++++++++++++++++++
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |    2 +-
 .../apache/hadoop/hdfs/TestDFSOutputStream.java |    5 +-
 .../apache/hadoop/hdfs/TestFileCreation.java    |   18 +-
 7 files changed, 1893 insertions(+), 1585 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16bfff7/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 5dae029..4ec0891 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -332,6 +332,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-7829. Code clean up for LocatedBlock. (Takanobu Asanuma via jing9)
 
+    HDFS-7854. Separate class DataStreamer out of DFSOutputStream. (Li Bo via
+    jing9)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16bfff7/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index dedeece..224d2fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -77,7 +77,7 @@
       ResponseProccessor is thread that is designed to catch RuntimeException.
      -->
      <Match>
-       <Class name="org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor" />
+       <Class name="org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor" />
        <Method name="run" />
        <Bug pattern="REC_CATCH_EXCEPTION" />
      </Match>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16bfff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index d7d59af..ee3e6f6 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -17,29 +17,12 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
-
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -52,64 +35,37 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
-import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
-import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.util.ByteArrayManager;
 import org.apache.hadoop.io.EnumSetWritable;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum.Type;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
-import org.apache.htrace.NullScope;
 import org.apache.htrace.Sampler;
-import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
-import org.apache.htrace.TraceInfo;
 import org.apache.htrace.TraceScope;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
 
 
 /****************************************************************
@@ -121,19 +77,11 @@ import com.google.common.cache.RemovalNotification;
  * is typically 512 bytes and has an associated checksum with it.
  *
  * When a client application fills up the currentPacket, it is
- * enqueued into dataQueue.  The DataStreamer thread picks up
- * packets from the dataQueue, sends it to the first datanode in
- * the pipeline and moves it from the dataQueue to the ackQueue.
- * The ResponseProcessor receives acks from the datanodes. When an
- * successful ack for a packet is received from all datanodes, the
- * ResponseProcessor removes the corresponding packet from the
- * ackQueue.
+ * enqueued into the dataQueue of DataStreamer. DataStreamer is a
+ * thread that picks up packets from the dataQueue and sends it to
+ * the first datanode in the pipeline.
  *
- * In case of error, all outstanding packets and moved from
- * ackQueue. A new pipeline is setup by eliminating the bad
- * datanode from the original pipeline. The DataStreamer now
- * starts sending packets from the dataQueue.
-****************************************************************/
+ ****************************************************************/
 @InterfaceAudience.Private
 public class DFSOutputStream extends FSOutputSummer
     implements Syncable, CanSetDropBehind {
@@ -148,45 +96,25 @@ public class DFSOutputStream extends FSOutputSummer
       CryptoProtocolVersion.supported();
 
   private final DFSClient dfsClient;
-  private final long dfsclientSlowLogThresholdMs;
   private final ByteArrayManager byteArrayManager;
-  private Socket s;
   // closed is accessed by different threads under different locks.
   private volatile boolean closed = false;
 
-  private String src;
+  private final String src;
   private final long fileId;
   private final long blockSize;
-  /** Only for DataTransferProtocol.writeBlock(..) */
-  private final DataChecksum checksum4WriteBlock;
-  private final int bytesPerChecksum; 
+  private final int bytesPerChecksum;
 
-  // both dataQueue and ackQueue are protected by dataQueue lock
-  private final LinkedList<DFSPacket> dataQueue = new LinkedList<DFSPacket>();
-  private final LinkedList<DFSPacket> ackQueue = new LinkedList<DFSPacket>();
   private DFSPacket currentPacket = null;
   private DataStreamer streamer;
-  private long currentSeqno = 0;
-  private long lastQueuedSeqno = -1;
-  private long lastAckedSeqno = -1;
-  private long bytesCurBlock = 0; // bytes written in current block
   private int packetSize = 0; // write packet size, not including the header.
   private int chunksPerPacket = 0;
-  private final AtomicReference<IOException> lastException = new AtomicReference<IOException>();
-  private long artificialSlowdown = 0;
   private long lastFlushOffset = 0; // offset when flush was invoked
-  //persist blocks on namenode
-  private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
-  private volatile boolean appendChunk = false;   // appending to existing partial block
   private long initialFileSize = 0; // at time of file open
-  private final Progressable progress;
   private final short blockReplication; // replication factor of file
   private boolean shouldSyncBlock = false; // force blocks to disk upon close
   private final AtomicReference<CachingStrategy> cachingStrategy;
-  private boolean failPacket = false;
   private FileEncryptionInfo fileEncryptionInfo;
-  private static final BlockStoragePolicySuite blockStoragePolicySuite =
-      BlockStoragePolicySuite.createDefaultSuite();
 
   /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
   private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
@@ -207,1326 +135,10 @@ public class DFSOutputStream extends FSOutputSummer
                          getChecksumSize(), lastPacketInBlock);
   }
 
-  /**
-   * For heartbeat packets, create buffer directly by new byte[]
-   * since heartbeats should not be blocked.
-   */
-  private DFSPacket createHeartbeatPacket() throws InterruptedIOException {
-    final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
-    return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO,
-                         getChecksumSize(), false);
-  }
-
-
-  //
-  // The DataStreamer class is responsible for sending data packets to the
-  // datanodes in the pipeline. It retrieves a new blockid and block locations
-  // from the namenode, and starts streaming packets to the pipeline of
-  // Datanodes. Every packet has a sequence number associated with
-  // it. When all the packets for a block are sent out and acks for each
-  // if them are received, the DataStreamer closes the current block.
-  //
-  class DataStreamer extends Daemon {
-    private volatile boolean streamerClosed = false;
-    private ExtendedBlock block; // its length is number of bytes acked
-    private Token<BlockTokenIdentifier> accessToken;
-    private DataOutputStream blockStream;
-    private DataInputStream blockReplyStream;
-    private ResponseProcessor response = null;
-    private volatile DatanodeInfo[] nodes = null; // list of targets for current block
-    private volatile StorageType[] storageTypes = null;
-    private volatile String[] storageIDs = null;
-    private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
-        CacheBuilder.newBuilder()
-        .expireAfterWrite(
-            dfsClient.getConf().excludedNodesCacheExpiry,
-            TimeUnit.MILLISECONDS)
-        .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
-          @Override
-          public void onRemoval(
-              RemovalNotification<DatanodeInfo, DatanodeInfo> notification) {
-            DFSClient.LOG.info("Removing node " +
-                notification.getKey() + " from the excluded nodes list");
-          }
-        })
-        .build(new CacheLoader<DatanodeInfo, DatanodeInfo>() {
-          @Override
-          public DatanodeInfo load(DatanodeInfo key) throws Exception {
-            return key;
-          }
-        });
-    private String[] favoredNodes;
-    volatile boolean hasError = false;
-    volatile int errorIndex = -1;
-    // Restarting node index
-    AtomicInteger restartingNodeIndex = new AtomicInteger(-1);
-    private long restartDeadline = 0; // Deadline of DN restart
-    private BlockConstructionStage stage;  // block construction stage
-    private long bytesSent = 0; // number of bytes that've been sent
-    private final boolean isLazyPersistFile;
-
-    /** Nodes have been used in the pipeline before and have failed. */
-    private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
-    /** The last ack sequence number before pipeline failure. */
-    private long lastAckedSeqnoBeforeFailure = -1;
-    private int pipelineRecoveryCount = 0;
-    /** Has the current block been hflushed? */
-    private boolean isHflushed = false;
-    /** Append on an existing block? */
-    private final boolean isAppend;
-
-    private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
-      isAppend = false;
-      isLazyPersistFile = isLazyPersist(stat);
-      this.block = block;
-      stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
-    }
-    
-    /**
-     * Construct a data streamer for appending to the last partial block
-     * @param lastBlock last block of the file to be appended
-     * @param stat status of the file to be appended
-     * @param bytesPerChecksum number of bytes per checksum
-     * @throws IOException if error occurs
-     */
-    private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
-        int bytesPerChecksum) throws IOException {
-      isAppend = true;
-      stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
-      block = lastBlock.getBlock();
-      bytesSent = block.getNumBytes();
-      accessToken = lastBlock.getBlockToken();
-      isLazyPersistFile = isLazyPersist(stat);
-      long usedInLastBlock = stat.getLen() % blockSize;
-      int freeInLastBlock = (int)(blockSize - usedInLastBlock);
-
-      // calculate the amount of free space in the pre-existing 
-      // last crc chunk
-      int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
-      int freeInCksum = bytesPerChecksum - usedInCksum;
-
-      // if there is space in the last block, then we have to 
-      // append to that block
-      if (freeInLastBlock == blockSize) {
-        throw new IOException("The last block for file " + 
-            src + " is full.");
-      }
-
-      if (usedInCksum > 0 && freeInCksum > 0) {
-        // if there is space in the last partial chunk, then 
-        // setup in such a way that the next packet will have only 
-        // one chunk that fills up the partial chunk.
-        //
-        computePacketChunkSize(0, freeInCksum);
-        setChecksumBufSize(freeInCksum);
-        appendChunk = true;
-      } else {
-        // if the remaining space in the block is smaller than 
-        // that expected size of of a packet, then create 
-        // smaller size packet.
-        //
-        computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock), 
-            bytesPerChecksum);
-      }
-
-      // setup pipeline to append to the last block XXX retries??
-      setPipeline(lastBlock);
-      errorIndex = -1;   // no errors yet.
-      if (nodes.length < 1) {
-        throw new IOException("Unable to retrieve blocks locations " +
-            " for last block " + block +
-            "of file " + src);
-
-      }
-    }
-
-    private void setPipeline(LocatedBlock lb) {
-      setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
-    }
-    private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
-        String[] storageIDs) {
-      this.nodes = nodes;
-      this.storageTypes = storageTypes;
-      this.storageIDs = storageIDs;
-    }
-
-    private void setFavoredNodes(String[] favoredNodes) {
-      this.favoredNodes = favoredNodes;
-    }
-
-    /**
-     * Initialize for data streaming
-     */
-    private void initDataStreaming() {
-      this.setName("DataStreamer for file " + src +
-          " block " + block);
-      response = new ResponseProcessor(nodes);
-      response.start();
-      stage = BlockConstructionStage.DATA_STREAMING;
-    }
-    
-    private void endBlock() {
-      if(DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("Closing old block " + block);
-      }
-      this.setName("DataStreamer for file " + src);
-      closeResponder();
-      closeStream();
-      setPipeline(null, null, null);
-      stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
-    }
-    
-    /*
-     * streamer thread is the only thread that opens streams to datanode, 
-     * and closes them. Any error recovery is also done by this thread.
-     */
-    @Override
-    public void run() {
-      long lastPacket = Time.monotonicNow();
-      TraceScope scope = NullScope.INSTANCE;
-      while (!streamerClosed && dfsClient.clientRunning) {
-        // if the Responder encountered an error, shutdown Responder
-        if (hasError && response != null) {
-          try {
-            response.close();
-            response.join();
-            response = null;
-          } catch (InterruptedException  e) {
-            DFSClient.LOG.warn("Caught exception ", e);
-          }
-        }
-
-        DFSPacket one;
-        try {
-          // process datanode IO errors if any
-          boolean doSleep = false;
-          if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) {
-            doSleep = processDatanodeError();
-          }
-
-          synchronized (dataQueue) {
-            // wait for a packet to be sent.
-            long now = Time.monotonicNow();
-            while ((!streamerClosed && !hasError && dfsClient.clientRunning 
-                && dataQueue.size() == 0 && 
-                (stage != BlockConstructionStage.DATA_STREAMING || 
-                 stage == BlockConstructionStage.DATA_STREAMING && 
-                 now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
-              long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
-              timeout = timeout <= 0 ? 1000 : timeout;
-              timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
-                 timeout : 1000;
-              try {
-                dataQueue.wait(timeout);
-              } catch (InterruptedException  e) {
-                DFSClient.LOG.warn("Caught exception ", e);
-              }
-              doSleep = false;
-              now = Time.monotonicNow();
-            }
-            if (streamerClosed || hasError || !dfsClient.clientRunning) {
-              continue;
-            }
-            // get packet to be sent.
-            if (dataQueue.isEmpty()) {
-              one = createHeartbeatPacket();
-              assert one != null;
-            } else {
-              one = dataQueue.getFirst(); // regular data packet
-              long parents[] = one.getTraceParents();
-              if (parents.length > 0) {
-                scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0]));
-                // TODO: use setParents API once it's available from HTrace 3.2
-//                scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS);
-//                scope.getSpan().setParents(parents);
-              }
-            }
-          }
-
-          // get new block from namenode.
-          if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
-            if(DFSClient.LOG.isDebugEnabled()) {
-              DFSClient.LOG.debug("Allocating new block");
-            }
-            setPipeline(nextBlockOutputStream());
-            initDataStreaming();
-          } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
-            if(DFSClient.LOG.isDebugEnabled()) {
-              DFSClient.LOG.debug("Append to block " + block);
-            }
-            setupPipelineForAppendOrRecovery();
-            initDataStreaming();
-          }
-
-          long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
-          if (lastByteOffsetInBlock > blockSize) {
-            throw new IOException("BlockSize " + blockSize +
-                " is smaller than data size. " +
-                " Offset of packet in block " + 
-                lastByteOffsetInBlock +
-                " Aborting file " + src);
-          }
-
-          if (one.isLastPacketInBlock()) {
-            // wait for all data packets have been successfully acked
-            synchronized (dataQueue) {
-              while (!streamerClosed && !hasError && 
-                  ackQueue.size() != 0 && dfsClient.clientRunning) {
-                try {
-                  // wait for acks to arrive from datanodes
-                  dataQueue.wait(1000);
-                } catch (InterruptedException  e) {
-                  DFSClient.LOG.warn("Caught exception ", e);
-                }
-              }
-            }
-            if (streamerClosed || hasError || !dfsClient.clientRunning) {
-              continue;
-            }
-            stage = BlockConstructionStage.PIPELINE_CLOSE;
-          }
-          
-          // send the packet
-          Span span = null;
-          synchronized (dataQueue) {
-            // move packet from dataQueue to ackQueue
-            if (!one.isHeartbeatPacket()) {
-              span = scope.detach();
-              one.setTraceSpan(span);
-              dataQueue.removeFirst();
-              ackQueue.addLast(one);
-              dataQueue.notifyAll();
-            }
-          }
-
-          if (DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("DataStreamer block " + block +
-                " sending packet " + one);
-          }
-
-          // write out data to remote datanode
-          TraceScope writeScope = Trace.startSpan("writeTo", span);
-          try {
-            one.writeTo(blockStream);
-            blockStream.flush();   
-          } catch (IOException e) {
-            // HDFS-3398 treat primary DN is down since client is unable to 
-            // write to primary DN. If a failed or restarting node has already
-            // been recorded by the responder, the following call will have no 
-            // effect. Pipeline recovery can handle only one node error at a
-            // time. If the primary node fails again during the recovery, it
-            // will be taken out then.
-            tryMarkPrimaryDatanodeFailed();
-            throw e;
-          } finally {
-            writeScope.close();
-          }
-          lastPacket = Time.monotonicNow();
-          
-          // update bytesSent
-          long tmpBytesSent = one.getLastByteOffsetBlock();
-          if (bytesSent < tmpBytesSent) {
-            bytesSent = tmpBytesSent;
-          }
-
-          if (streamerClosed || hasError || !dfsClient.clientRunning) {
-            continue;
-          }
-
-          // Is this block full?
-          if (one.isLastPacketInBlock()) {
-            // wait for the close packet has been acked
-            synchronized (dataQueue) {
-              while (!streamerClosed && !hasError && 
-                  ackQueue.size() != 0 && dfsClient.clientRunning) {
-                dataQueue.wait(1000);// wait for acks to arrive from datanodes
-              }
-            }
-            if (streamerClosed || hasError || !dfsClient.clientRunning) {
-              continue;
-            }
-
-            endBlock();
-          }
-          if (progress != null) { progress.progress(); }
-
-          // This is used by unit test to trigger race conditions.
-          if (artificialSlowdown != 0 && dfsClient.clientRunning) {
-            Thread.sleep(artificialSlowdown); 
-          }
-        } catch (Throwable e) {
-          // Log warning if there was a real error.
-          if (restartingNodeIndex.get() == -1) {
-            // Since their messages are descriptive enough, do not always
-            // log a verbose stack-trace WARN for quota exceptions.
-            if (e instanceof QuotaExceededException) {
-              DFSClient.LOG.debug("DataStreamer Quota Exception", e);
-            } else {
-              DFSClient.LOG.warn("DataStreamer Exception", e);
-            }
-          }
-          if (e instanceof IOException) {
-            setLastException((IOException)e);
-          } else {
-            setLastException(new IOException("DataStreamer Exception: ",e));
-          }
-          hasError = true;
-          if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
-            // Not a datanode issue
-            streamerClosed = true;
-          }
-        } finally {
-          scope.close();
-        }
-      }
-      closeInternal();
-    }
-
-    private void closeInternal() {
-      closeResponder();       // close and join
-      closeStream();
-      streamerClosed = true;
-      setClosed();
-      synchronized (dataQueue) {
-        dataQueue.notifyAll();
-      }
-    }
-
-    /*
-     * close both streamer and DFSOutputStream, should be called only 
-     * by an external thread and only after all data to be sent has 
-     * been flushed to datanode.
-     * 
-     * Interrupt this data streamer if force is true
-     * 
-     * @param force if this data stream is forced to be closed 
-     */
-    void close(boolean force) {
-      streamerClosed = true;
-      synchronized (dataQueue) {
-        dataQueue.notifyAll();
-      }
-      if (force) {
-        this.interrupt();
-      }
-    }
-
-    private void closeResponder() {
-      if (response != null) {
-        try {
-          response.close();
-          response.join();
-        } catch (InterruptedException  e) {
-          DFSClient.LOG.warn("Caught exception ", e);
-        } finally {
-          response = null;
-        }
-      }
-    }
-
-    private void closeStream() {
-      if (blockStream != null) {
-        try {
-          blockStream.close();
-        } catch (IOException e) {
-          setLastException(e);
-        } finally {
-          blockStream = null;
-        }
-      }
-      if (blockReplyStream != null) {
-        try {
-          blockReplyStream.close();
-        } catch (IOException e) {
-          setLastException(e);
-        } finally {
-          blockReplyStream = null;
-        }
-      }
-      if (null != s) {
-        try {
-          s.close();
-        } catch (IOException e) {
-          setLastException(e);
-        } finally {
-          s = null;
-        }
-      }
-    }
-
-    // The following synchronized methods are used whenever 
-    // errorIndex or restartingNodeIndex is set. This is because
-    // check & set needs to be atomic. Simply reading variables
-    // does not require a synchronization. When responder is
-    // not running (e.g. during pipeline recovery), there is no
-    // need to use these methods.
-
-    /** Set the error node index. Called by responder */
-    synchronized void setErrorIndex(int idx) {
-      errorIndex = idx;
-    }
-
-    /** Set the restarting node index. Called by responder */
-    synchronized void setRestartingNodeIndex(int idx) {
-      restartingNodeIndex.set(idx);
-      // If the data streamer has already set the primary node
-      // bad, clear it. It is likely that the write failed due to
-      // the DN shutdown. Even if it was a real failure, the pipeline
-      // recovery will take care of it.
-      errorIndex = -1;      
-    }
-
-    /**
-     * This method is used when no explicit error report was received,
-     * but something failed. When the primary node is a suspect or
-     * unsure about the cause, the primary node is marked as failed.
-     */
-    synchronized void tryMarkPrimaryDatanodeFailed() {
-      // There should be no existing error and no ongoing restart.
-      if ((errorIndex == -1) && (restartingNodeIndex.get() == -1)) {
-        errorIndex = 0;
-      }
-    }
-
-    /**
-     * Examine whether it is worth waiting for a node to restart.
-     * @param index the node index
-     */
-    boolean shouldWaitForRestart(int index) {
-      // Only one node in the pipeline.
-      if (nodes.length == 1) {
-        return true;
-      }
-
-      // Is it a local node?
-      InetAddress addr = null;
-      try {
-        addr = InetAddress.getByName(nodes[index].getIpAddr());
-      } catch (java.net.UnknownHostException e) {
-        // we are passing an ip address. this should not happen.
-        assert false;
-      }
-
-      if (addr != null && NetUtils.isLocalAddress(addr)) {
-        return true;
-      }
-      return false;
-    }
-
-    //
-    // Processes responses from the datanodes.  A packet is removed
-    // from the ackQueue when its response arrives.
-    //
-    private class ResponseProcessor extends Daemon {
-
-      private volatile boolean responderClosed = false;
-      private DatanodeInfo[] targets = null;
-      private boolean isLastPacketInBlock = false;
-
-      ResponseProcessor (DatanodeInfo[] targets) {
-        this.targets = targets;
-      }
-
-      @Override
-      public void run() {
-
-        setName("ResponseProcessor for block " + block);
-        PipelineAck ack = new PipelineAck();
-
-        TraceScope scope = NullScope.INSTANCE;
-        while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
-          // process responses from datanodes.
-          try {
-            // read an ack from the pipeline
-            long begin = Time.monotonicNow();
-            ack.readFields(blockReplyStream);
-            long duration = Time.monotonicNow() - begin;
-            if (duration > dfsclientSlowLogThresholdMs
-                && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
-              DFSClient.LOG
-                  .warn("Slow ReadProcessor read fields took " + duration
-                      + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
-                      + ack + ", targets: " + Arrays.asList(targets));
-            } else if (DFSClient.LOG.isDebugEnabled()) {
-              DFSClient.LOG.debug("DFSClient " + ack);
-            }
-
-            long seqno = ack.getSeqno();
-            // processes response status from datanodes.
-            for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
-              final Status reply = PipelineAck.getStatusFromHeader(ack
-                .getReply(i));
-              // Restart will not be treated differently unless it is
-              // the local node or the only one in the pipeline.
-              if (PipelineAck.isRestartOOBStatus(reply) &&
-                  shouldWaitForRestart(i)) {
-                restartDeadline = dfsClient.getConf().datanodeRestartTimeout
-                    + Time.monotonicNow();
-                setRestartingNodeIndex(i);
-                String message = "A datanode is restarting: " + targets[i];
-                DFSClient.LOG.info(message);
-               throw new IOException(message);
-              }
-              // node error
-              if (reply != SUCCESS) {
-                setErrorIndex(i); // first bad datanode
-                throw new IOException("Bad response " + reply +
-                    " for block " + block +
-                    " from datanode " + 
-                    targets[i]);
-              }
-            }
-            
-            assert seqno != PipelineAck.UNKOWN_SEQNO : 
-              "Ack for unknown seqno should be a failed ack: " + ack;
-            if (seqno == DFSPacket.HEART_BEAT_SEQNO) {  // a heartbeat ack
-              continue;
-            }
-
-            // a success ack for a data packet
-            DFSPacket one;
-            synchronized (dataQueue) {
-              one = ackQueue.getFirst();
-            }
-            if (one.getSeqno() != seqno) {
-              throw new IOException("ResponseProcessor: Expecting seqno " +
-                                    " for block " + block +
-                                    one.getSeqno() + " but received " + seqno);
-            }
-            isLastPacketInBlock = one.isLastPacketInBlock();
-
-            // Fail the packet write for testing in order to force a
-            // pipeline recovery.
-            if (DFSClientFaultInjector.get().failPacket() &&
-                isLastPacketInBlock) {
-              failPacket = true;
-              throw new IOException(
-                    "Failing the last packet for testing.");
-            }
-              
-            // update bytesAcked
-            block.setNumBytes(one.getLastByteOffsetBlock());
-
-            synchronized (dataQueue) {
-              scope = Trace.continueSpan(one.getTraceSpan());
-              one.setTraceSpan(null);
-              lastAckedSeqno = seqno;
-              ackQueue.removeFirst();
-              dataQueue.notifyAll();
-
-              one.releaseBuffer(byteArrayManager);
-            }
-          } catch (Exception e) {
-            if (!responderClosed) {
-              if (e instanceof IOException) {
-                setLastException((IOException)e);
-              }
-              hasError = true;
-              // If no explicit error report was received, mark the primary
-              // node as failed.
-              tryMarkPrimaryDatanodeFailed();
-              synchronized (dataQueue) {
-                dataQueue.notifyAll();
-              }
-              if (restartingNodeIndex.get() == -1) {
-                DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception "
-                     + " for block " + block, e);
-              }
-              responderClosed = true;
-            }
-          } finally {
-            scope.close();
-          }
-        }
-      }
-
-      void close() {
-        responderClosed = true;
-        this.interrupt();
-      }
-    }
-
-    // If this stream has encountered any errors so far, shutdown 
-    // threads and mark stream as closed. Returns true if we should
-    // sleep for a while after returning from this call.
-    //
-    private boolean processDatanodeError() throws IOException {
-      if (response != null) {
-        DFSClient.LOG.info("Error Recovery for " + block +
-        " waiting for responder to exit. ");
-        return true;
-      }
-      closeStream();
-
-      // move packets from ack queue to front of the data queue
-      synchronized (dataQueue) {
-        dataQueue.addAll(0, ackQueue);
-        ackQueue.clear();
-      }
-
-      // Record the new pipeline failure recovery.
-      if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
-         lastAckedSeqnoBeforeFailure = lastAckedSeqno;
-         pipelineRecoveryCount = 1;
-      } else {
-        // If we had to recover the pipeline five times in a row for the
-        // same packet, this client likely has corrupt data or corrupting
-        // during transmission.
-        if (++pipelineRecoveryCount > 5) {
-          DFSClient.LOG.warn("Error recovering pipeline for writing " +
-              block + ". Already retried 5 times for the same packet.");
-          lastException.set(new IOException("Failing write. Tried pipeline " +
-              "recovery 5 times without success."));
-          streamerClosed = true;
-          return false;
-        }
-      }
-      boolean doSleep = setupPipelineForAppendOrRecovery();
-      
-      if (!streamerClosed && dfsClient.clientRunning) {
-        if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
-
-          // If we had an error while closing the pipeline, we go through a fast-path
-          // where the BlockReceiver does not run. Instead, the DataNode just finalizes
-          // the block immediately during the 'connect ack' process. So, we want to pull
-          // the end-of-block packet from the dataQueue, since we don't actually have
-          // a true pipeline to send it over.
-          //
-          // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
-          // a client waiting on close() will be aware that the flush finished.
-          synchronized (dataQueue) {
-            DFSPacket endOfBlockPacket = dataQueue.remove();  // remove the end of block packet
-            Span span = endOfBlockPacket.getTraceSpan();
-            if (span != null) {
-              // Close any trace span associated with this Packet
-              TraceScope scope = Trace.continueSpan(span);
-              scope.close();
-            }
-            assert endOfBlockPacket.isLastPacketInBlock();
-            assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
-            lastAckedSeqno = endOfBlockPacket.getSeqno();
-            dataQueue.notifyAll();
-          }
-          endBlock();
-        } else {
-          initDataStreaming();
-        }
-      }
-      
-      return doSleep;
-    }
-
-    private void setHflush() {
-      isHflushed = true;
-    }
-
-    private int findNewDatanode(final DatanodeInfo[] original
-        ) throws IOException {
-      if (nodes.length != original.length + 1) {
-        throw new IOException(
-            new StringBuilder()
-            .append("Failed to replace a bad datanode on the existing pipeline ")
-            .append("due to no more good datanodes being available to try. ")
-            .append("(Nodes: current=").append(Arrays.asList(nodes))
-            .append(", original=").append(Arrays.asList(original)).append("). ")
-            .append("The current failed datanode replacement policy is ")
-            .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ")
-            .append("a client may configure this via '")
-            .append(DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY)
-            .append("' in its configuration.")
-            .toString());
-      }
-      for(int i = 0; i < nodes.length; i++) {
-        int j = 0;
-        for(; j < original.length && !nodes[i].equals(original[j]); j++);
-        if (j == original.length) {
-          return i;
-        }
-      }
-      throw new IOException("Failed: new datanode not found: nodes="
-          + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
-    }
-
-    private void addDatanode2ExistingPipeline() throws IOException {
-      if (DataTransferProtocol.LOG.isDebugEnabled()) {
-        DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
-      }
-      /*
-       * Is data transfer necessary?  We have the following cases.
-       * 
-       * Case 1: Failure in Pipeline Setup
-       * - Append
-       *    + Transfer the stored replica, which may be a RBW or a finalized.
-       * - Create
-       *    + If no data, then no transfer is required.
-       *    + If there are data written, transfer RBW. This case may happens 
-       *      when there are streaming failure earlier in this pipeline.
-       *
-       * Case 2: Failure in Streaming
-       * - Append/Create:
-       *    + transfer RBW
-       * 
-       * Case 3: Failure in Close
-       * - Append/Create:
-       *    + no transfer, let NameNode replicates the block.
-       */
-      if (!isAppend && lastAckedSeqno < 0
-          && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
-        //no data have been written
-        return;
-      } else if (stage == BlockConstructionStage.PIPELINE_CLOSE
-          || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
-        //pipeline is closing
-        return;
-      }
-
-      //get a new datanode
-      final DatanodeInfo[] original = nodes;
-      final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
-          src, fileId, block, nodes, storageIDs,
-          failed.toArray(new DatanodeInfo[failed.size()]),
-          1, dfsClient.clientName);
-      setPipeline(lb);
-
-      //find the new datanode
-      final int d = findNewDatanode(original);
-
-      //transfer replica
-      final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
-      final DatanodeInfo[] targets = {nodes[d]};
-      final StorageType[] targetStorageTypes = {storageTypes[d]};
-      transfer(src, targets, targetStorageTypes, lb.getBlockToken());
-    }
-
-    private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
-        final StorageType[] targetStorageTypes,
-        final Token<BlockTokenIdentifier> blockToken) throws IOException {
-      //transfer replica to the new datanode
-      Socket sock = null;
-      DataOutputStream out = null;
-      DataInputStream in = null;
-      try {
-        sock = createSocketForPipeline(src, 2, dfsClient);
-        final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
-        
-        OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
-        InputStream unbufIn = NetUtils.getInputStream(sock);
-        IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
-          unbufOut, unbufIn, dfsClient, blockToken, src);
-        unbufOut = saslStreams.out;
-        unbufIn = saslStreams.in;
-        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
-            HdfsConstants.SMALL_BUFFER_SIZE));
-        in = new DataInputStream(unbufIn);
-
-        //send the TRANSFER_BLOCK request
-        new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
-            targets, targetStorageTypes);
-        out.flush();
-
-        //ack
-        BlockOpResponseProto response =
-          BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
-        if (SUCCESS != response.getStatus()) {
-          throw new IOException("Failed to add a datanode");
-        }
-      } finally {
-        IOUtils.closeStream(in);
-        IOUtils.closeStream(out);
-        IOUtils.closeSocket(sock);
-      }
-    }
-
-    /**
-     * Open a DataOutputStream to a DataNode pipeline so that 
-     * it can be written to.
-     * This happens when a file is appended or data streaming fails
-     * It keeps on trying until a pipeline is setup
-     */
-    private boolean setupPipelineForAppendOrRecovery() throws IOException {
-      // check number of datanodes
-      if (nodes == null || nodes.length == 0) {
-        String msg = "Could not get block locations. " + "Source file \""
-            + src + "\" - Aborting...";
-        DFSClient.LOG.warn(msg);
-        setLastException(new IOException(msg));
-        streamerClosed = true;
-        return false;
-      }
-      
-      boolean success = false;
-      long newGS = 0L;
-      while (!success && !streamerClosed && dfsClient.clientRunning) {
-        // Sleep before reconnect if a dn is restarting.
-        // This process will be repeated until the deadline or the datanode
-        // starts back up.
-        if (restartingNodeIndex.get() >= 0) {
-          // 4 seconds or the configured deadline period, whichever is shorter.
-          // This is the retry interval and recovery will be retried in this
-          // interval until timeout or success.
-          long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
-              4000L);
-          try {
-            Thread.sleep(delay);
-          } catch (InterruptedException ie) {
-            lastException.set(new IOException("Interrupted while waiting for " +
-                "datanode to restart. " + nodes[restartingNodeIndex.get()]));
-            streamerClosed = true;
-            return false;
-          }
-        }
-        boolean isRecovery = hasError;
-        // remove bad datanode from list of datanodes.
-        // If errorIndex was not set (i.e. appends), then do not remove 
-        // any datanodes
-        // 
-        if (errorIndex >= 0) {
-          StringBuilder pipelineMsg = new StringBuilder();
-          for (int j = 0; j < nodes.length; j++) {
-            pipelineMsg.append(nodes[j]);
-            if (j < nodes.length - 1) {
-              pipelineMsg.append(", ");
-            }
-          }
-          if (nodes.length <= 1) {
-            lastException.set(new IOException("All datanodes " + pipelineMsg
-                + " are bad. Aborting..."));
-            streamerClosed = true;
-            return false;
-          }
-          DFSClient.LOG.warn("Error Recovery for block " + block +
-              " in pipeline " + pipelineMsg + 
-              ": bad datanode " + nodes[errorIndex]);
-          failed.add(nodes[errorIndex]);
-
-          DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
-          arraycopy(nodes, newnodes, errorIndex);
-
-          final StorageType[] newStorageTypes = new StorageType[newnodes.length];
-          arraycopy(storageTypes, newStorageTypes, errorIndex);
-
-          final String[] newStorageIDs = new String[newnodes.length];
-          arraycopy(storageIDs, newStorageIDs, errorIndex);
-          
-          setPipeline(newnodes, newStorageTypes, newStorageIDs);
-
-          // Just took care of a node error while waiting for a node restart
-          if (restartingNodeIndex.get() >= 0) {
-            // If the error came from a node further away than the restarting
-            // node, the restart must have been complete.
-            if (errorIndex > restartingNodeIndex.get()) {
-              restartingNodeIndex.set(-1);
-            } else if (errorIndex < restartingNodeIndex.get()) {
-              // the node index has shifted.
-              restartingNodeIndex.decrementAndGet();
-            } else {
-              // this shouldn't happen...
-              assert false;
-            }
-          }
-
-          if (restartingNodeIndex.get() == -1) {
-            hasError = false;
-          }
-          lastException.set(null);
-          errorIndex = -1;
-        }
-
-        // Check if replace-datanode policy is satisfied.
-        if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
-            nodes, isAppend, isHflushed)) {
-          try {
-            addDatanode2ExistingPipeline();
-          } catch(IOException ioe) {
-            if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
-              throw ioe;
-            }
-            DFSClient.LOG.warn("Failed to replace datanode."
-                + " Continue with the remaining datanodes since "
-                + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY
-                + " is set to true.", ioe);
-          }
-        }
-
-        // get a new generation stamp and an access token
-        LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
-        newGS = lb.getBlock().getGenerationStamp();
-        accessToken = lb.getBlockToken();
-        
-        // set up the pipeline again with the remaining nodes
-        if (failPacket) { // for testing
-          success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
-          failPacket = false;
-          try {
-            // Give DNs time to send in bad reports. In real situations,
-            // good reports should follow bad ones, if client committed
-            // with those nodes.
-            Thread.sleep(2000);
-          } catch (InterruptedException ie) {}
-        } else {
-          success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
-        }
-
-        if (restartingNodeIndex.get() >= 0) {
-          assert hasError == true;
-          // check errorIndex set above
-          if (errorIndex == restartingNodeIndex.get()) {
-            // ignore, if came from the restarting node
-            errorIndex = -1;
-          }
-          // still within the deadline
-          if (Time.monotonicNow() < restartDeadline) {
-            continue; // with in the deadline
-          }
-          // expired. declare the restarting node dead
-          restartDeadline = 0;
-          int expiredNodeIndex = restartingNodeIndex.get();
-          restartingNodeIndex.set(-1);
-          DFSClient.LOG.warn("Datanode did not restart in time: " +
-              nodes[expiredNodeIndex]);
-          // Mark the restarting node as failed. If there is any other failed
-          // node during the last pipeline construction attempt, it will not be
-          // overwritten/dropped. In this case, the restarting node will get
-          // excluded in the following attempt, if it still does not come up.
-          if (errorIndex == -1) {
-            errorIndex = expiredNodeIndex;
-          }
-          // From this point on, normal pipeline recovery applies.
-        }
-      } // while
-
-      if (success) {
-        // update pipeline at the namenode
-        ExtendedBlock newBlock = new ExtendedBlock(
-            block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
-        dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
-            nodes, storageIDs);
-        // update client side generation stamp
-        block = newBlock;
-      }
-      return false; // do not sleep, continue processing
-    }
-
-    /**
-     * Open a DataOutputStream to a DataNode so that it can be written to.
-     * This happens when a file is created and each time a new block is allocated.
-     * Must get block ID and the IDs of the destinations from the namenode.
-     * Returns the list of target datanodes.
-     */
-    private LocatedBlock nextBlockOutputStream() throws IOException {
-      LocatedBlock lb = null;
-      DatanodeInfo[] nodes = null;
-      StorageType[] storageTypes = null;
-      int count = dfsClient.getConf().nBlockWriteRetry;
-      boolean success = false;
-      ExtendedBlock oldBlock = block;
-      do {
-        hasError = false;
-        lastException.set(null);
-        errorIndex = -1;
-        success = false;
-
-        DatanodeInfo[] excluded =
-            excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
-            .keySet()
-            .toArray(new DatanodeInfo[0]);
-        block = oldBlock;
-        lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
-        block = lb.getBlock();
-        block.setNumBytes(0);
-        bytesSent = 0;
-        accessToken = lb.getBlockToken();
-        nodes = lb.getLocations();
-        storageTypes = lb.getStorageTypes();
-
-        //
-        // Connect to first DataNode in the list.
-        //
-        success = createBlockOutputStream(nodes, storageTypes, 0L, false);
-
-        if (!success) {
-          DFSClient.LOG.info("Abandoning " + block);
-          dfsClient.namenode.abandonBlock(block, fileId, src,
-              dfsClient.clientName);
-          block = null;
-          DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
-          excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
-        }
-      } while (!success && --count >= 0);
-
-      if (!success) {
-        throw new IOException("Unable to create new block.");
-      }
-      return lb;
-    }
-
-    // connects to the first datanode in the pipeline
-    // Returns true if success, otherwise return failure.
-    //
-    private boolean createBlockOutputStream(DatanodeInfo[] nodes,
-        StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
-      if (nodes.length == 0) {
-        DFSClient.LOG.info("nodes are empty for write pipeline of block "
-            + block);
-        return false;
-      }
-      Status pipelineStatus = SUCCESS;
-      String firstBadLink = "";
-      boolean checkRestart = false;
-      if (DFSClient.LOG.isDebugEnabled()) {
-        for (int i = 0; i < nodes.length; i++) {
-          DFSClient.LOG.debug("pipeline = " + nodes[i]);
-        }
-      }
-
-      // persist blocks on namenode on next flush
-      persistBlocks.set(true);
-
-      int refetchEncryptionKey = 1;
-      while (true) {
-        boolean result = false;
-        DataOutputStream out = null;
-        try {
-          assert null == s : "Previous socket unclosed";
-          assert null == blockReplyStream : "Previous blockReplyStream unclosed";
-          s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
-          long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
-          
-          OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
-          InputStream unbufIn = NetUtils.getInputStream(s);
-          IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
-            unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
-          unbufOut = saslStreams.out;
-          unbufIn = saslStreams.in;
-          out = new DataOutputStream(new BufferedOutputStream(unbufOut,
-              HdfsConstants.SMALL_BUFFER_SIZE));
-          blockReplyStream = new DataInputStream(unbufIn);
-  
-          //
-          // Xmit header info to datanode
-          //
-  
-          BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage;
-
-          // We cannot change the block length in 'block' as it counts the number
-          // of bytes ack'ed.
-          ExtendedBlock blockCopy = new ExtendedBlock(block);
-          blockCopy.setNumBytes(blockSize);
-
-          boolean[] targetPinnings = getPinnings(nodes, true);
-          // send the request
-          new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
-              dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
-              nodes.length, block.getNumBytes(), bytesSent, newGS,
-              checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
-            (targetPinnings == null ? false : targetPinnings[0]), targetPinnings);
-  
-          // receive ack for connect
-          BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
-              PBHelper.vintPrefixed(blockReplyStream));
-          pipelineStatus = resp.getStatus();
-          firstBadLink = resp.getFirstBadLink();
-          
-          // Got an restart OOB ack.
-          // If a node is already restarting, this status is not likely from
-          // the same node. If it is from a different node, it is not
-          // from the local datanode. Thus it is safe to treat this as a
-          // regular node error.
-          if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
-            restartingNodeIndex.get() == -1) {
-            checkRestart = true;
-            throw new IOException("A datanode is restarting.");
-          }
-
-          String logInfo = "ack with firstBadLink as " + firstBadLink;
-          DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);
-
-          assert null == blockStream : "Previous blockStream unclosed";
-          blockStream = out;
-          result =  true; // success
-          restartingNodeIndex.set(-1);
-          hasError = false;
-        } catch (IOException ie) {
-          if (restartingNodeIndex.get() == -1) {
-            DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
-          }
-          if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
-            DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
-                + "encryption key was invalid when connecting to "
-                + nodes[0] + " : " + ie);
-            // The encryption key used is invalid.
-            refetchEncryptionKey--;
-            dfsClient.clearDataEncryptionKey();
-            // Don't close the socket/exclude this node just yet. Try again with
-            // a new encryption key.
-            continue;
-          }
-  
-          // find the datanode that matches
-          if (firstBadLink.length() != 0) {
-            for (int i = 0; i < nodes.length; i++) {
-              // NB: Unconditionally using the xfer addr w/o hostname
-              if (firstBadLink.equals(nodes[i].getXferAddr())) {
-                errorIndex = i;
-                break;
-              }
-            }
-          } else {
-            assert checkRestart == false;
-            errorIndex = 0;
-          }
-          // Check whether there is a restart worth waiting for.
-          if (checkRestart && shouldWaitForRestart(errorIndex)) {
-            restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
-                Time.monotonicNow();
-            restartingNodeIndex.set(errorIndex);
-            errorIndex = -1;
-            DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
-                nodes[restartingNodeIndex.get()]);
-          }
-          hasError = true;
-          setLastException(ie);
-          result =  false;  // error
-        } finally {
-          if (!result) {
-            IOUtils.closeSocket(s);
-            s = null;
-            IOUtils.closeStream(out);
-            out = null;
-            IOUtils.closeStream(blockReplyStream);
-            blockReplyStream = null;
-          }
-        }
-        return result;
-      }
-    }
-
-    private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) {
-      if (favoredNodes == null) {
-        return null;
-      } else {
-        boolean[] pinnings = new boolean[nodes.length];
-        HashSet<String> favoredSet =
-            new HashSet<String>(Arrays.asList(favoredNodes));
-        for (int i = 0; i < nodes.length; i++) {
-          pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname());
-          if (DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug(nodes[i].getXferAddrWithHostname() +
-                " was chosen by name node (favored=" + pinnings[i] +
-                ").");
-          }
-        }
-        if (shouldLog && !favoredSet.isEmpty()) {
-          // There is one or more favored nodes that were not allocated.
-          DFSClient.LOG.warn(
-              "These favored nodes were specified but not chosen: " +
-              favoredSet +
-              " Specified favored nodes: " + Arrays.toString(favoredNodes));
-
-        }
-        return pinnings;
-      }
-    }
-
-    private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)  throws IOException {
-      int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
-      long sleeptime = dfsClient.getConf().
-          blockWriteLocateFollowingInitialDelayMs;
-      while (true) {
-        long localstart = Time.monotonicNow();
-        while (true) {
-          try {
-            return dfsClient.namenode.addBlock(src, dfsClient.clientName,
-                block, excludedNodes, fileId, favoredNodes);
-          } catch (RemoteException e) {
-            IOException ue = 
-              e.unwrapRemoteException(FileNotFoundException.class,
-                                      AccessControlException.class,
-                                      NSQuotaExceededException.class,
-                                      DSQuotaExceededException.class,
-                                      UnresolvedPathException.class);
-            if (ue != e) { 
-              throw ue; // no need to retry these exceptions
-            }
-            
-            
-            if (NotReplicatedYetException.class.getName().
-                equals(e.getClassName())) {
-              if (retries == 0) { 
-                throw e;
-              } else {
-                --retries;
-                DFSClient.LOG.info("Exception while adding a block", e);
-                long elapsed = Time.monotonicNow() - localstart;
-                if (elapsed > 5000) {
-                  DFSClient.LOG.info("Waiting for replication for "
-                      + (elapsed / 1000) + " seconds");
-                }
-                try {
-                  DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src
-                      + " retries left " + retries);
-                  Thread.sleep(sleeptime);
-                  sleeptime *= 2;
-                } catch (InterruptedException ie) {
-                  DFSClient.LOG.warn("Caught exception ", ie);
-                }
-              }
-            } else {
-              throw e;
-            }
-
-          }
-        }
-      } 
-    }
-
-    ExtendedBlock getBlock() {
-      return block;
-    }
-
-    DatanodeInfo[] getNodes() {
-      return nodes;
-    }
-
-    Token<BlockTokenIdentifier> getBlockToken() {
-      return accessToken;
-    }
-
-    private void setLastException(IOException e) {
-      lastException.compareAndSet(null, e);
-    }
-  }
-
-  /**
-   * Create a socket for a write pipeline
-   * @param first the first datanode 
-   * @param length the pipeline length
-   * @param client client
-   * @return the socket connected to the first datanode
-   */
-  static Socket createSocketForPipeline(final DatanodeInfo first,
-      final int length, final DFSClient client) throws IOException {
-    final String dnAddr = first.getXferAddr(
-        client.getConf().connectToDnViaHostname);
-    if (DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
-    }
-    final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
-    final Socket sock = client.socketFactory.createSocket();
-    final int timeout = client.getDatanodeReadTimeout(length);
-    NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), client.getConf().socketTimeout);
-    sock.setSoTimeout(timeout);
-    sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
-    if(DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("Send buf size " + sock.getSendBufferSize());
-    }
-    return sock;
-  }
-
   @Override
   protected void checkClosed() throws IOException {
     if (isClosed()) {
-      IOException e = lastException.get();
+      IOException e = streamer.getLastException().get();
       throw e != null ? e : new ClosedChannelException();
     }
   }
@@ -1536,7 +148,7 @@ public class DFSOutputStream extends FSOutputSummer
   //
   @VisibleForTesting
   public synchronized DatanodeInfo[] getPipeline() {
-    if (streamer == null) {
+    if (streamer.streamerClosed()) {
       return null;
     }
     DatanodeInfo[] currentNodes = streamer.getNodes();
@@ -1556,7 +168,7 @@ public class DFSOutputStream extends FSOutputSummer
    */
   private static DataChecksum getChecksum4Compute(DataChecksum checksum,
       HdfsFileStatus stat) {
-    if (isLazyPersist(stat) && stat.getReplication() == 1) {
+    if (DataStreamer.isLazyPersist(stat) && stat.getReplication() == 1) {
       // do not compute checksum for writing to single replica to memory
       return DataChecksum.newDataChecksum(Type.NULL,
           checksum.getBytesPerChecksum());
@@ -1573,7 +185,6 @@ public class DFSOutputStream extends FSOutputSummer
     this.blockSize = stat.getBlockSize();
     this.blockReplication = stat.getReplication();
     this.fileEncryptionInfo = stat.getFileEncryptionInfo();
-    this.progress = progress;
     this.cachingStrategy = new AtomicReference<CachingStrategy>(
         dfsClient.getDefaultWriteCachingStrategy());
     if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
@@ -1591,10 +202,6 @@ public class DFSOutputStream extends FSOutputSummer
           + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
           + ") must divide block size (=" + blockSize + ").");
     }
-    this.checksum4WriteBlock = checksum;
-
-    this.dfsclientSlowLogThresholdMs =
-      dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
     this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
   }
 
@@ -1607,7 +214,8 @@ public class DFSOutputStream extends FSOutputSummer
 
     computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
 
-    streamer = new DataStreamer(stat, null);
+    streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
+        cachingStrategy, byteArrayManager);
     if (favoredNodes != null && favoredNodes.length != 0) {
       streamer.setFavoredNodes(favoredNodes);
     }
@@ -1676,18 +284,57 @@ public class DFSOutputStream extends FSOutputSummer
     this(dfsClient, src, progress, stat, checksum);
     initialFileSize = stat.getLen(); // length of file when opened
 
+    this.fileEncryptionInfo = stat.getFileEncryptionInfo();
+
     // The last partial block of the file has to be filled.
     if (!toNewBlock && lastBlock != null) {
       // indicate that we are appending to an existing block
-      bytesCurBlock = lastBlock.getBlockSize();
-      streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
+      streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum,
+          cachingStrategy, byteArrayManager);
+      streamer.setBytesCurBlock(lastBlock.getBlockSize());
+      adjustPacketChunkSize(stat);
+      streamer.setPipelineInConstruction(lastBlock);
     } else {
       computePacketChunkSize(dfsClient.getConf().writePacketSize,
           bytesPerChecksum);
-      streamer = new DataStreamer(stat,
-          lastBlock != null ? lastBlock.getBlock() : null);
+      streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null,
+          dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager);
+    }
+  }
+
+  private void adjustPacketChunkSize(HdfsFileStatus stat) throws IOException{
+
+    long usedInLastBlock = stat.getLen() % blockSize;
+    int freeInLastBlock = (int)(blockSize - usedInLastBlock);
+
+    // calculate the amount of free space in the pre-existing
+    // last crc chunk
+    int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
+    int freeInCksum = bytesPerChecksum - usedInCksum;
+
+    // if there is space in the last block, then we have to
+    // append to that block
+    if (freeInLastBlock == blockSize) {
+      throw new IOException("The last block for file " +
+          src + " is full.");
+    }
+
+    if (usedInCksum > 0 && freeInCksum > 0) {
+      // if there is space in the last partial chunk, then
+      // setup in such a way that the next packet will have only
+      // one chunk that fills up the partial chunk.
+      //
+      computePacketChunkSize(0, freeInCksum);
+      setChecksumBufSize(freeInCksum);
+      streamer.setAppendChunk(true);
+    } else {
+      // if the remaining space in the block is smaller than
+      // that expected size of of a packet, then create
+      // smaller size packet.
+      //
+      computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock),
+          bytesPerChecksum);
     }
-    this.fileEncryptionInfo = stat.getFileEncryptionInfo();
   }
 
   static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
@@ -1708,12 +355,6 @@ public class DFSOutputStream extends FSOutputSummer
       scope.close();
     }
   }
-  
-  private static boolean isLazyPersist(HdfsFileStatus stat) {
-    final BlockStoragePolicy p = blockStoragePolicySuite.getPolicy(
-        HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
-    return p != null && stat.getStoragePolicy() == p.getId();
-  }
 
   private void computePacketChunkSize(int psize, int csize) {
     final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
@@ -1728,62 +369,6 @@ public class DFSOutputStream extends FSOutputSummer
     }
   }
 
-  private void queueCurrentPacket() {
-    synchronized (dataQueue) {
-      if (currentPacket == null) return;
-      currentPacket.addTraceParent(Trace.currentSpan());
-      dataQueue.addLast(currentPacket);
-      lastQueuedSeqno = currentPacket.getSeqno();
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno());
-      }
-      currentPacket = null;
-      dataQueue.notifyAll();
-    }
-  }
-
-  private void waitAndQueueCurrentPacket() throws IOException {
-    synchronized (dataQueue) {
-      try {
-      // If queue is full, then wait till we have enough space
-        boolean firstWait = true;
-        try {
-          while (!isClosed() && dataQueue.size() + ackQueue.size() >
-              dfsClient.getConf().writeMaxPackets) {
-            if (firstWait) {
-              Span span = Trace.currentSpan();
-              if (span != null) {
-                span.addTimelineAnnotation("dataQueue.wait");
-              }
-              firstWait = false;
-            }
-            try {
-              dataQueue.wait();
-            } catch (InterruptedException e) {
-              // If we get interrupted while waiting to queue data, we still need to get rid
-              // of the current packet. This is because we have an invariant that if
-              // currentPacket gets full, it will get queued before the next writeChunk.
-              //
-              // Rather than wait around for space in the queue, we should instead try to
-              // return to the caller as soon as possible, even though we slightly overrun
-              // the MAX_PACKETS length.
-              Thread.currentThread().interrupt();
-              break;
-            }
-          }
-        } finally {
-          Span span = Trace.currentSpan();
-          if ((span != null) && (!firstWait)) {
-            span.addTimelineAnnotation("end.wait");
-          }
-        }
-        checkClosed();
-        queueCurrentPacket();
-      } catch (ClosedChannelException e) {
-      }
-    }
-  }
-
   // @see FSOutputSummer#writeChunk()
   @Override
   protected synchronized void writeChunk(byte[] b, int offset, int len,
@@ -1814,57 +399,62 @@ public class DFSOutputStream extends FSOutputSummer
 
     if (currentPacket == null) {
       currentPacket = createPacket(packetSize, chunksPerPacket, 
-          bytesCurBlock, currentSeqno++, false);
+          streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false);
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
             currentPacket.getSeqno() +
             ", src=" + src +
             ", packetSize=" + packetSize +
             ", chunksPerPacket=" + chunksPerPacket +
-            ", bytesCurBlock=" + bytesCurBlock);
+            ", bytesCurBlock=" + streamer.getBytesCurBlock());
       }
     }
 
     currentPacket.writeChecksum(checksum, ckoff, cklen);
     currentPacket.writeData(b, offset, len);
     currentPacket.incNumChunks();
-    bytesCurBlock += len;
+    streamer.incBytesCurBlock(len);
 
     // If packet is full, enqueue it for transmission
     //
     if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
-        bytesCurBlock == blockSize) {
+        streamer.getBytesCurBlock() == blockSize) {
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
             currentPacket.getSeqno() +
             ", src=" + src +
-            ", bytesCurBlock=" + bytesCurBlock +
+            ", bytesCurBlock=" + streamer.getBytesCurBlock() +
             ", blockSize=" + blockSize +
-            ", appendChunk=" + appendChunk);
+            ", appendChunk=" + streamer.getAppendChunk());
       }
-      waitAndQueueCurrentPacket();
+      streamer.waitAndQueuePacket(currentPacket);
+      currentPacket = null;
 
       // If the reopened file did not end at chunk boundary and the above
       // write filled up its partial chunk. Tell the summer to generate full 
       // crc chunks from now on.
-      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
-        appendChunk = false;
+      if (streamer.getAppendChunk() &&
+          streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
+        streamer.setAppendChunk(false);
         resetChecksumBufSize();
       }
 
-      if (!appendChunk) {
-        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
+      if (!streamer.getAppendChunk()) {
+        int psize = Math.min((int)(blockSize-streamer.getBytesCurBlock()),
+            dfsClient.getConf().writePacketSize);
         computePacketChunkSize(psize, bytesPerChecksum);
       }
       //
       // if encountering a block boundary, send an empty packet to 
       // indicate the end of block and reset bytesCurBlock.
       //
-      if (bytesCurBlock == blockSize) {
-        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
+      if (streamer.getBytesCurBlock() == blockSize) {
+        currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
+            streamer.getAndIncCurrentSeqno(), true);
         currentPacket.setSyncBlock(shouldSyncBlock);
-        waitAndQueueCurrentPacket();
-        bytesCurBlock = 0;
+        streamer.waitAndQueuePacket(currentPacket);
+        currentPacket = null;
+        streamer.setBytesCurBlock(0);
         lastFlushOffset = 0;
       }
     }
@@ -1954,30 +544,30 @@ public class DFSOutputStream extends FSOutputSummer
 
         if (DFSClient.LOG.isDebugEnabled()) {
           DFSClient.LOG.debug("DFSClient flush(): "
-              + " bytesCurBlock=" + bytesCurBlock
+              + " bytesCurBlock=" + streamer.getBytesCurBlock()
               + " lastFlushOffset=" + lastFlushOffset
               + " createNewBlock=" + endBlock);
         }
         // Flush only if we haven't already flushed till this offset.
-        if (lastFlushOffset != bytesCurBlock) {
-          assert bytesCurBlock > lastFlushOffset;
+        if (lastFlushOffset != streamer.getBytesCurBlock()) {
+          assert streamer.getBytesCurBlock() > lastFlushOffset;
           // record the valid offset of this flush
-          lastFlushOffset = bytesCurBlock;
+          lastFlushOffset = streamer.getBytesCurBlock();
           if (isSync && currentPacket == null && !endBlock) {
             // Nothing to send right now,
             // but sync was requested.
             // Send an empty packet if we do not end the block right now
             currentPacket = createPacket(packetSize, chunksPerPacket,
-                bytesCurBlock, currentSeqno++, false);
+                streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false);
           }
         } else {
-          if (isSync && bytesCurBlock > 0 && !endBlock) {
+          if (isSync && streamer.getBytesCurBlock() > 0 && !endBlock) {
             // Nothing to send right now,
             // and the block was partially written,
             // and sync was requested.
             // So send an empty sync packet if we do not end the block right now
             currentPacket = createPacket(packetSize, chunksPerPacket,
-                bytesCurBlock, currentSeqno++, false);
+                streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false);
           } else if (currentPacket != null) {
             // just discard the current packet since it is already been sent.
             currentPacket.releaseBuffer(byteArrayManager);
@@ -1986,39 +576,42 @@ public class DFSOutputStream extends FSOutputSummer
         }
         if (currentPacket != null) {
           currentPacket.setSyncBlock(isSync);
-          waitAndQueueCurrentPacket();          
+          streamer.waitAndQueuePacket(currentPacket);
+          currentPacket = null;
         }
-        if (endBlock && bytesCurBlock > 0) {
+        if (endBlock && streamer.getBytesCurBlock() > 0) {
           // Need to end the current block, thus send an empty packet to
           // indicate this is the end of the block and reset bytesCurBlock
-          currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
+          currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
+              streamer.getAndIncCurrentSeqno(), true);
           currentPacket.setSyncBlock(shouldSyncBlock || isSync);
-          waitAndQueueCurrentPacket();
-          bytesCurBlock = 0;
+          streamer.waitAndQueuePacket(currentPacket);
+          currentPacket = null;
+          streamer.setBytesCurBlock(0);
           lastFlushOffset = 0;
         } else {
           // Restore state of stream. Record the last flush offset
           // of the last full chunk that was flushed.
-          bytesCurBlock -= numKept;
+          streamer.setBytesCurBlock(streamer.getBytesCurBlock() - numKept);
         }
 
-        toWaitFor = lastQueuedSeqno;
+        toWaitFor = streamer.getLastQueuedSeqno();
       } // end synchronized
 
-      waitForAckedSeqno(toWaitFor);
+      streamer.waitForAckedSeqno(toWaitFor);
 
       // update the block length first time irrespective of flag
-      if (updateLength || persistBlocks.get()) {
+      if (updateLength || streamer.getPersistBlocks().get()) {
         synchronized (this) {
-          if (streamer != null && streamer.block != null) {
-            lastBlockLength = streamer.block.getNumBytes();
+          if (!streamer.streamerClosed() && streamer.getBlock() != null) {
+            lastBlockLength = streamer.getBlock().getNumBytes();
           }
         }
       }
       // If 1) any new blocks were allocated since the last flush, or 2) to
       // update length in NN is required, then persist block locations on
       // namenode.
-      if (persistBlocks.getAndSet(false) || updateLength) {
+      if (streamer.getPersistBlocks().getAndSet(false) || updateLength) {
         try {
           dfsClient.namenode.fsync(src, fileId, dfsClient.clientName,
               lastBlockLength);
@@ -2035,7 +628,7 @@ public class DFSOutputStream extends FSOutputSummer
       }
 
       synchronized(this) {
-        if (streamer != null) {
+        if (!streamer.streamerClosed()) {
           streamer.setHflush();
         }
       }
@@ -2048,7 +641,7 @@ public class DFSOutputStream extends FSOutputSummer
       DFSClient.LOG.warn("Error while syncing", e);
       synchronized (this) {
         if (!isClosed()) {
-          lastException.set(new IOException("IOException flush: " + e));
+          streamer.getLastException().set(new IOException("IOException flush: " + e));
           closeThreads(true);
         }
       }
@@ -2073,7 +666,7 @@ public class DFSOutputStream extends FSOutputSummer
   public synchronized int getCurrentBlockReplication() throws IOException {
     dfsClient.checkOpen();
     checkClosed();
-    if (streamer == null) {
+    if (streamer.streamerClosed()) {
       return blockReplication; // no pipeline, return repl factor of file
     }
     DatanodeInfo[] currentNodes = streamer.getNodes();
@@ -2095,47 +688,12 @@ public class DFSOutputStream extends FSOutputSummer
       //
       // If there is data in the current buffer, send it across
       //
-      queueCurrentPacket();
-      toWaitFor = lastQueuedSeqno;
+      streamer.queuePacket(currentPacket);
+      currentPacket = null;
+      toWaitFor = streamer.getLastQueuedSeqno();
     }
 
-    waitForAckedSeqno(toWaitFor);
-  }
-
-  private void waitForAckedSeqno(long seqno) throws IOException {
-    TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER);
-    try {
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("Waiting for ack for: " + seqno);
-      }
-      long begin = Time.monotonicNow();
-      try {
-        synchronized (dataQueue) {
-          while (!isClosed()) {
-            checkClosed();
-            if (lastAckedSeqno >= seqno) {
-              break;
-            }
-            try {
-              dataQueue.wait(1000); // when we receive an ack, we notify on
-              // dataQueue
-            } catch (InterruptedException ie) {
-              throw new InterruptedIOException(
-                  "Interrupted while waiting for data to be acknowledged by pipeline");
-            }
-          }
-        }
-        checkClosed();
-      } catch (ClosedChannelException e) {
-      }
-      long duration = Time.monotonicNow() - begin;
-      if (duration > dfsclientSlowLogThresholdMs) {
-        DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration
-            + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
-      }
-    } finally {
-      scope.close();
-    }
+    streamer.waitForAckedSeqno(toWaitFor);
   }
 
   private synchronized void start() {
@@ -2157,22 +715,12 @@ public class DFSOutputStream extends FSOutputSummer
   }
 
   boolean isClosed() {
-    return closed;
+    return closed || streamer.streamerClosed();
   }
 
   void setClosed() {
     closed = true;
-    synchronized (dataQueue) {
-      releaseBuffer(dataQueue, byteArrayManager);
-      releaseBuffer(ackQueue, byteArrayManager);
-    }
-  }
-  
-  private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam) {
-    for (DFSPacket p : packets) {
-      p.releaseBuffer(bam);
-    }
-    packets.clear();
+    streamer.release();
   }
 
   // shutdown datastreamer and responseprocessor threads.
@@ -2181,14 +729,11 @@ public class DFSOutputStream extends FSOutputSummer
     try {
       streamer.close(force);
       streamer.join();
-      if (s != null) {
-        s.close();
-      }
+      streamer.closeSocket();
     } catch (InterruptedException e) {
       throw new IOException("Failed to shutdown streamer");
     } finally {
-      streamer = null;
-      s = null;
+      streamer.setSocketToNull();
       setClosed();
     }
   }
@@ -2210,7 +755,7 @@ public class DFSOutputStream extends FSOutputSummer
 
   private synchronized void closeImpl() throws IOException {
     if (isClosed()) {
-      IOException e = lastException.getAndSet(null);
+      IOException e = streamer.getLastException().getAndSet(null);
       if (e == null)
         return;
       else
@@ -2221,12 +766,14 @@ public class DFSOutputStream extends FSOutputSummer
       flushBuffer();       // flush from all upper layers
 
       if (currentPacket != null) { 
-        waitAndQueueCurrentPacket();
+        streamer.waitAndQueuePacket(currentPacket);
+        currentPacket = null;
       }
 
-      if (bytesCurBlock != 0) {
+      if (streamer.getBytesCurBlock() != 0) {
         // send an empty packet to mark the end of the block
-        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
+        currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
+            streamer.getAndIncCurrentSeqno(), true);
         currentPacket.setSyncBlock(shouldSyncBlock);
       }
 
@@ -2261,7 +808,7 @@ public class DFSOutputStream extends FSOutputSummer
       if (!fileComplete) {
         final int hdfsTimeout = dfsClient.getHdfsTimeout();
         if (!dfsClient.clientRunning
-            || (hdfsTimeout > 0 
+            || (hdfsTimeout > 0
                 && localstart + hdfsTimeout < Time.monotonicNow())) {
             String msg = "Unable to close file because dfsclient " +
                           " was unable to contact the HDFS servers." +
@@ -2290,7 +837,7 @@ public class DFSOutputStream extends FSOutputSummer
 
   @VisibleForTesting
   public void setArtificialSlowdown(long period) {
-    artificialSlowdown = period;
+    streamer.setArtificialSlowdown(period);
   }
 
   @VisibleForTesting
@@ -2299,10 +846,6 @@ public class DFSOutputStream extends FSOutputSummer
     packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket;
   }
 
-  synchronized void setTestFilename(String newname) {
-    src = newname;
-  }
-
   /**
    * Returns the size of a file as it was when this stream was opened
    */
@@ -2345,9 +888,4 @@ public class DFSOutputStream extends FSOutputSummer
   public long getFileId() {
     return fileId;
   }
-
-  private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
-    System.arraycopy(srcs, 0, dsts, 0, skipIndex);
-    System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);
-  }
 }