You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/17 01:58:57 UTC

[29/50] hadoop git commit: HDFS-7672. Handle write failure for stripping blocks and refactor the existing code in DFSStripedOutputStream and StripedDataStreamer.

HDFS-7672. Handle write failure for stripping blocks and refactor the existing code in DFSStripedOutputStream and StripedDataStreamer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/af74bae8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/af74bae8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/af74bae8

Branch: refs/heads/HDFS-7285
Commit: af74bae80ab40d4cfb2d1466d7e4b932f9690ec6
Parents: 5e77bce
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Tue May 5 16:26:49 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Sat May 16 15:16:05 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  69 +--
 .../hadoop/hdfs/DFSStripedOutputStream.java     | 501 ++++++++++++-------
 .../java/org/apache/hadoop/hdfs/DFSUtil.java    |  10 +-
 .../org/apache/hadoop/hdfs/DataStreamer.java    |  15 +-
 .../apache/hadoop/hdfs/StripedDataStreamer.java | 156 ++----
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java  |   2 -
 .../hadoop/hdfs/TestDFSStripedOutputStream.java |  18 +-
 .../TestDFSStripedOutputStreamWithFailure.java  | 323 ++++++++++++
 9 files changed, 764 insertions(+), 333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/af74bae8/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index a8df3f2..7efaa5a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -172,3 +172,6 @@
 
     HDFS-8324. Add trace info to DFSClient#getErasureCodingZoneInfo(..) (vinayakumarb via 
     umamahesh)
+    
+    HDFS-7672. Handle write failure for stripping blocks and refactor the
+    existing code in DFSStripedOutputStream and StripedDataStreamer.  (szetszwo)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/af74bae8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 0280d71..8580357 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -24,6 +24,8 @@ import java.nio.channels.ClosedChannelException;
 import java.util.EnumSet;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
@@ -86,6 +88,8 @@ import com.google.common.base.Preconditions;
 @InterfaceAudience.Private
 public class DFSOutputStream extends FSOutputSummer
     implements Syncable, CanSetDropBehind {
+  static final Log LOG = LogFactory.getLog(DFSOutputStream.class);
+
   /**
    * Number of times to retry creating a file when there are transient 
    * errors (typically related to encryption zones and KeyProvider operations).
@@ -419,24 +423,35 @@ public class DFSOutputStream extends FSOutputSummer
     streamer.incBytesCurBlock(len);
 
     // If packet is full, enqueue it for transmission
-    //
     if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
         streamer.getBytesCurBlock() == blockSize) {
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
-            currentPacket.getSeqno() +
-            ", src=" + src +
-            ", bytesCurBlock=" + streamer.getBytesCurBlock() +
-            ", blockSize=" + blockSize +
-            ", appendChunk=" + streamer.getAppendChunk());
-      }
-      streamer.waitAndQueuePacket(currentPacket);
-      currentPacket = null;
+      enqueueCurrentPacketFull();
+    }
+  }
 
-      adjustChunkBoundary();
+  void enqueueCurrentPacket() throws IOException {
+    streamer.waitAndQueuePacket(currentPacket);
+    currentPacket = null;
+  }
 
-      endBlock();
+  void enqueueCurrentPacketFull() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("enqueue full " + currentPacket + ", src=" + src
+          + ", bytesCurBlock=" + streamer.getBytesCurBlock()
+          + ", blockSize=" + blockSize
+          + ", appendChunk=" + streamer.getAppendChunk()
+          + ", " + streamer);
     }
+    enqueueCurrentPacket();
+    adjustChunkBoundary();
+    endBlock();
+  }
+
+  /** create an empty packet to mark the end of the block */
+  void setCurrentPacket2Empty() throws InterruptedIOException {
+    currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
+        streamer.getAndIncCurrentSeqno(), true);
+    currentPacket.setSyncBlock(shouldSyncBlock);
   }
 
   /**
@@ -444,7 +459,7 @@ public class DFSOutputStream extends FSOutputSummer
    * write filled up its partial chunk. Tell the summer to generate full
    * crc chunks from now on.
    */
-  protected void adjustChunkBoundary() {
+  private void adjustChunkBoundary() {
     if (streamer.getAppendChunk() &&
         streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
       streamer.setAppendChunk(false);
@@ -466,11 +481,8 @@ public class DFSOutputStream extends FSOutputSummer
    */
   protected void endBlock() throws IOException {
     if (streamer.getBytesCurBlock() == blockSize) {
-      currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
-          streamer.getAndIncCurrentSeqno(), true);
-      currentPacket.setSyncBlock(shouldSyncBlock);
-      streamer.waitAndQueuePacket(currentPacket);
-      currentPacket = null;
+      setCurrentPacket2Empty();
+      enqueueCurrentPacket();
       streamer.setBytesCurBlock(0);
       lastFlushOffset = 0;
     }
@@ -592,8 +604,7 @@ public class DFSOutputStream extends FSOutputSummer
         }
         if (currentPacket != null) {
           currentPacket.setSyncBlock(isSync);
-          streamer.waitAndQueuePacket(currentPacket);
-          currentPacket = null;
+          enqueueCurrentPacket();
         }
         if (endBlock && streamer.getBytesCurBlock() > 0) {
           // Need to end the current block, thus send an empty packet to
@@ -601,8 +612,7 @@ public class DFSOutputStream extends FSOutputSummer
           currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
               streamer.getAndIncCurrentSeqno(), true);
           currentPacket.setSyncBlock(shouldSyncBlock || isSync);
-          streamer.waitAndQueuePacket(currentPacket);
-          currentPacket = null;
+          enqueueCurrentPacket();
           streamer.setBytesCurBlock(0);
           lastFlushOffset = 0;
         } else {
@@ -779,15 +789,11 @@ public class DFSOutputStream extends FSOutputSummer
       flushBuffer();       // flush from all upper layers
 
       if (currentPacket != null) {
-        streamer.waitAndQueuePacket(currentPacket);
-        currentPacket = null;
+        enqueueCurrentPacket();
       }
 
       if (streamer.getBytesCurBlock() != 0) {
-        // send an empty packet to mark the end of the block
-        currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
-            streamer.getAndIncCurrentSeqno(), true);
-        currentPacket.setSyncBlock(shouldSyncBlock);
+        setCurrentPacket2Empty();
       }
 
       flushInternal();             // flush all data to Datanodes
@@ -901,4 +907,9 @@ public class DFSOutputStream extends FSOutputSummer
   public long getFileId() {
     return fileId;
   }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + ":" + streamer;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/af74bae8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index 71cdbb9..bbc8ba0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -28,14 +28,16 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.hdfs.protocol.ECInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 import org.apache.hadoop.util.DataChecksum;
@@ -44,6 +46,8 @@ import org.apache.htrace.Sampler;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 
+import com.google.common.base.Preconditions;
+
 
 /****************************************************************
  * The DFSStripedOutputStream class supports writing files in striped
@@ -55,33 +59,154 @@ import org.apache.htrace.TraceScope;
 
 @InterfaceAudience.Private
 public class DFSStripedOutputStream extends DFSOutputStream {
+  /** Coordinate the communication between the streamers. */
+  static class Coordinator {
+    private final List<BlockingQueue<ExtendedBlock>> endBlocks;
+    private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
+    private volatile boolean shouldLocateFollowingBlock = false;
+
+    Coordinator(final int numDataBlocks, final int numAllBlocks) {
+      endBlocks = new ArrayList<>(numDataBlocks);
+      for (int i = 0; i < numDataBlocks; i++) {
+        endBlocks.add(new LinkedBlockingQueue<ExtendedBlock>(1));
+      }
 
-  private final List<StripedDataStreamer> streamers;
-  /**
-   * Size of each striping cell, must be a multiple of bytesPerChecksum
-   */
-  private final ECInfo ecInfo;
-  private final int cellSize;
-  // checksum buffer, we only need to calculate checksum for parity blocks
-  private byte[] checksumBuf;
-  private ByteBuffer[] cellBuffers;
+      stripedBlocks = new ArrayList<>(numAllBlocks);
+      for (int i = 0; i < numAllBlocks; i++) {
+        stripedBlocks.add(new LinkedBlockingQueue<LocatedBlock>(1));
+      }
+    }
 
-  private final short numAllBlocks;
-  private final short numDataBlocks;
+    boolean shouldLocateFollowingBlock() {
+      return shouldLocateFollowingBlock;
+    }
 
-  private int curIdx = 0;
-  /* bytes written in current block group */
-  //private long currentBlockGroupBytes = 0;
+    void putEndBlock(int i, ExtendedBlock block) {
+      shouldLocateFollowingBlock = true;
 
-  //TODO: Use ErasureCoder interface (HDFS-7781)
-  private RawErasureEncoder encoder;
+      final boolean b = endBlocks.get(i).offer(block);
+      Preconditions.checkState(b, "Failed to add " + block
+          + " to endBlocks queue, i=" + i);
+    }
 
-  private StripedDataStreamer getLeadingStreamer() {
-    return streamers.get(0);
+    ExtendedBlock getEndBlock(int i) throws InterruptedIOException {
+      try {
+        return endBlocks.get(i).poll(30, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        throw DFSUtil.toInterruptedIOException(
+            "getEndBlock interrupted, i=" + i, e);
+      }
+    }
+
+    void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) {
+      ExtendedBlock b = endBlocks.get(i).peek();
+      if (b == null) {
+        // streamer just has failed, put end block and continue
+        b = block;
+        putEndBlock(i, b);
+      }
+      b.setNumBytes(newBytes);
+    }
+
+    void putStripedBlock(int i, LocatedBlock block) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("putStripedBlock " + block + ", i=" + i);
+      }
+      final boolean b = stripedBlocks.get(i).offer(block);
+      if (!b) {
+        throw new IOException("Failed: " + block + ", i=" + i);
+      }
+    }
+
+    LocatedBlock getStripedBlock(int i) throws IOException {
+      final LocatedBlock lb;
+      try {
+        lb = stripedBlocks.get(i).poll(90, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        throw DFSUtil.toInterruptedIOException("getStripedBlock interrupted", e);
+      }
+
+      if (lb == null) {
+        throw new IOException("Failed: i=" + i);
+      }
+      return lb;
+    }
   }
 
-  private long getBlockGroupSize() {
-    return blockSize * numDataBlocks;
+  /** Buffers for writing the data and parity cells of a strip. */
+  class CellBuffers {
+    private final ByteBuffer[] buffers;
+    private final byte[][] checksumArrays;
+
+    CellBuffers(int numParityBlocks) throws InterruptedException{
+      if (cellSize % bytesPerChecksum != 0) {
+        throw new HadoopIllegalArgumentException("Invalid values: "
+            + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (="
+            + bytesPerChecksum + ") must divide cell size (=" + cellSize + ").");
+      }
+
+      checksumArrays = new byte[numParityBlocks][];
+      final int size = getChecksumSize() * (cellSize / bytesPerChecksum);
+      for (int i = 0; i < checksumArrays.length; i++) {
+        checksumArrays[i] = new byte[size];
+      }
+
+      buffers = new ByteBuffer[numAllBlocks];
+      for (int i = 0; i < buffers.length; i++) {
+        buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
+      }
+    }
+
+    private ByteBuffer[] getBuffers() {
+      return buffers;
+    }
+
+    byte[] getChecksumArray(int i) {
+      return checksumArrays[i - numDataBlocks];
+    }
+
+    private int addTo(int i, byte[] b, int off, int len) {
+      final ByteBuffer buf = buffers[i];
+      final int pos = buf.position() + len;
+      Preconditions.checkState(pos <= cellSize);
+      buf.put(b, off, len);
+      return pos;
+    }
+
+    private void clear() {
+      for (int i = 0; i< numAllBlocks; i++) {
+        buffers[i].clear();
+        if (i >= numDataBlocks) {
+          Arrays.fill(buffers[i].array(), (byte) 0);
+        }
+      }
+    }
+
+    private void release() {
+      for (int i = 0; i < numAllBlocks; i++) {
+        byteArrayManager.release(buffers[i].array());
+      }
+    }
+
+    private void flipDataBuffers() {
+      for (int i = 0; i < numDataBlocks; i++) {
+        buffers[i].flip();
+      }
+    }
+  }
+
+  private final Coordinator coordinator;
+  private final CellBuffers cellBuffers;
+  private final RawErasureEncoder encoder;
+  private final List<StripedDataStreamer> streamers;
+
+  /** Size of each striping cell, must be a multiple of bytesPerChecksum */
+  private final int cellSize;
+  private final int numAllBlocks;
+  private final int numDataBlocks;
+
+  private StripedDataStreamer getLeadingStreamer() {
+    return streamers.get(0);
   }
 
   /** Construct a new output stream for creating a file. */
@@ -90,82 +215,94 @@ public class DFSStripedOutputStream extends DFSOutputStream {
                          DataChecksum checksum, String[] favoredNodes)
                          throws IOException {
     super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
-    DFSClient.LOG.info("Creating striped output stream");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Creating DFSStripedOutputStream for " + src);
+    }
 
     // ECInfo is restored from NN just before writing striped files.
-    ecInfo = dfsClient.getErasureCodingInfo(src);
-    cellSize = ecInfo.getSchema().getChunkSize();
-    numAllBlocks = (short)(ecInfo.getSchema().getNumDataUnits()
-        + ecInfo.getSchema().getNumParityUnits());
-    numDataBlocks = (short)ecInfo.getSchema().getNumDataUnits();
+    //TODO reduce an rpc call HDFS-8289
+    final ECSchema schema = dfsClient.getErasureCodingInfo(src).getSchema();
+    final int numParityBlocks = schema.getNumParityUnits();
+    cellSize = schema.getChunkSize();
+    numDataBlocks = schema.getNumDataUnits();
+    numAllBlocks = numDataBlocks + numParityBlocks;
 
-    checkConfiguration();
-
-    checksumBuf = new byte[getChecksumSize() * (cellSize / bytesPerChecksum)];
-    cellBuffers = new ByteBuffer[numAllBlocks];
-    List<BlockingQueue<LocatedBlock>> stripeBlocks = new ArrayList<>();
+    encoder = new RSRawEncoder();
+    encoder.initialize(numDataBlocks, numParityBlocks, cellSize);
 
-    for (int i = 0; i < numAllBlocks; i++) {
-      stripeBlocks.add(new LinkedBlockingQueue<LocatedBlock>(numAllBlocks));
-      try {
-        cellBuffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
-      } catch (InterruptedException ie) {
-        final InterruptedIOException iioe = new InterruptedIOException(
-            "create cell buffers");
-        iioe.initCause(ie);
-        throw iioe;
-      }
+    coordinator = new Coordinator(numDataBlocks, numAllBlocks);
+    try {
+      cellBuffers = new CellBuffers(numParityBlocks);
+    } catch (InterruptedException ie) {
+      throw DFSUtil.toInterruptedIOException("Failed to create cell buffers", ie);
     }
-    encoder = new RSRawEncoder();
-    encoder.initialize(numDataBlocks,
-        numAllBlocks - numDataBlocks, cellSize);
 
     List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks);
     for (short i = 0; i < numAllBlocks; i++) {
-      StripedDataStreamer streamer = new StripedDataStreamer(stat, null,
+      StripedDataStreamer streamer = new StripedDataStreamer(stat,
           dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
-          i, stripeBlocks, favoredNodes);
+          favoredNodes, i, coordinator);
       s.add(streamer);
     }
     streamers = Collections.unmodifiableList(s);
+    setCurrentStreamer(0);
+  }
 
-    refreshStreamer();
+  StripedDataStreamer getStripedDataStreamer(int i) {
+    return streamers.get(i);
   }
 
-  private void checkConfiguration() {
-    if (cellSize % bytesPerChecksum != 0) {
-      throw new HadoopIllegalArgumentException("Invalid values: "
-          + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
-          + ") must divide cell size (=" + cellSize + ").");
-    }
+  int getCurrentIndex() {
+    return getCurrentStreamer().getIndex();
   }
 
-  private void refreshStreamer() {
-    streamer = streamers.get(curIdx);
+  StripedDataStreamer getCurrentStreamer() {
+    return (StripedDataStreamer)streamer;
   }
 
-  private void moveToNextStreamer() {
-    curIdx = (curIdx + 1) % numAllBlocks;
-    refreshStreamer();
+  private StripedDataStreamer setCurrentStreamer(int i) {
+    streamer = streamers.get(i);
+    return getCurrentStreamer();
   }
 
   /**
-   * encode the buffers.
-   * After encoding, flip each buffer.
+   * Encode the buffers, i.e. compute parities.
    *
    * @param buffers data buffers + parity buffers
    */
-  private void encode(ByteBuffer[] buffers) {
-    ByteBuffer[] dataBuffers = new ByteBuffer[numDataBlocks];
-    ByteBuffer[] parityBuffers = new ByteBuffer[numAllBlocks - numDataBlocks];
-    for (int i = 0; i < numAllBlocks; i++) {
-      if (i < numDataBlocks) {
-        dataBuffers[i] = buffers[i];
-      } else {
-        parityBuffers[i - numDataBlocks] = buffers[i];
+  private static void encode(RawErasureEncoder encoder, int numData,
+      ByteBuffer[] buffers) {
+    final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
+    final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
+    System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);
+    System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length);
+
+    encoder.encode(dataBuffers, parityBuffers);
+  }
+
+
+  private void checkStreamers() throws IOException {
+    int count = 0;
+    for(StripedDataStreamer s : streamers) {
+      if (!s.isFailed()) {
+        count++;
       }
     }
-    encoder.encode(dataBuffers, parityBuffers);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("checkStreamers: " + streamers);
+      LOG.debug("count=" + count);
+    }
+    if (count < numDataBlocks) {
+      throw new IOException("Failed: the number of remaining blocks = "
+          + count + " < the number of data blocks = " + numDataBlocks);
+    }
+  }
+
+  private void handleStreamerFailure(String err, Exception e) throws IOException {
+    LOG.warn("Failed: " + err + ", " + this, e);
+    getCurrentStreamer().setIsFailed(true);
+    checkStreamers();
+    currentPacket = null;
   }
 
   /**
@@ -173,11 +310,12 @@ public class DFSStripedOutputStream extends DFSOutputStream {
    * writing parity blocks.
    *
    * @param byteBuffer the given buffer to generate packets
+   * @param checksumBuf the checksum buffer
    * @return packets generated
    * @throws IOException
    */
-  private List<DFSPacket> generatePackets(ByteBuffer byteBuffer)
-      throws IOException{
+  private List<DFSPacket> generatePackets(
+      ByteBuffer byteBuffer, byte[] checksumBuf) throws IOException{
     List<DFSPacket> packets = new ArrayList<>();
     assert byteBuffer.hasArray();
     getDataChecksum().calculateChunkedSums(byteBuffer.array(), 0,
@@ -201,82 +339,47 @@ public class DFSStripedOutputStream extends DFSOutputStream {
   }
 
   @Override
-  protected synchronized void writeChunk(byte[] b, int offset, int len,
+  protected synchronized void writeChunk(byte[] bytes, int offset, int len,
       byte[] checksum, int ckoff, int cklen) throws IOException {
-    super.writeChunk(b, offset, len, checksum, ckoff, cklen);
-
-    if (getSizeOfCellnBuffer(curIdx) <= cellSize) {
-      addToCellBuffer(b, offset, len);
-    } else {
-      String msg = "Writing a chunk should not overflow the cell buffer.";
-      DFSClient.LOG.info(msg);
-      throw new IOException(msg);
-    }
-
-    // If current packet has not been enqueued for transmission,
-    // but the cell buffer is full, we need to enqueue the packet
-    if (currentPacket != null && getSizeOfCellnBuffer(curIdx) == cellSize) {
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("DFSClient writeChunk cell buffer full seqno=" +
-            currentPacket.getSeqno() +
-            ", curIdx=" + curIdx +
-            ", src=" + src +
-            ", bytesCurBlock=" + streamer.getBytesCurBlock() +
-            ", blockSize=" + blockSize +
-            ", appendChunk=" + streamer.getAppendChunk());
+    final int index = getCurrentIndex();
+    final StripedDataStreamer current = getCurrentStreamer();
+    final int pos = cellBuffers.addTo(index, bytes, offset, len);
+    final boolean cellFull = pos == cellSize;
+
+    final long oldBytes = current.getBytesCurBlock();
+    if (!current.isFailed()) {
+      try {
+        super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
+
+        // cell is full and current packet has not been enqueued,
+        if (cellFull && currentPacket != null) {
+          enqueueCurrentPacketFull();
+        }
+      } catch(Exception e) {
+        handleStreamerFailure("offset=" + offset + ", length=" + len, e);
       }
-      streamer.waitAndQueuePacket(currentPacket);
-      currentPacket = null;
-      adjustChunkBoundary();
-      endBlock();
+    }
+
+    if (current.isFailed()) {
+      final long newBytes = oldBytes + len;
+      coordinator.setBytesEndBlock(index, newBytes, current.getBlock());
+      current.setBytesCurBlock(newBytes);
     }
 
     // Two extra steps are needed when a striping cell is full:
     // 1. Forward the current index pointer
     // 2. Generate parity packets if a full stripe of data cells are present
-    if (getSizeOfCellnBuffer(curIdx) == cellSize) {
-      //move curIdx to next cell
-      moveToNextStreamer();
+    if (cellFull) {
+      int next = index + 1;
       //When all data cells in a stripe are ready, we need to encode
       //them and generate some parity cells. These cells will be
       //converted to packets and put to their DataStreamer's queue.
-      if (curIdx == numDataBlocks) {
-        //encode the data cells
-        for (int k = 0; k < numDataBlocks; k++) {
-          cellBuffers[k].flip();
-        }
-        encode(cellBuffers);
-        for (int i = numDataBlocks; i < numAllBlocks; i++) {
-          ByteBuffer parityBuffer = cellBuffers[i];
-          List<DFSPacket> packets = generatePackets(parityBuffer);
-          for (DFSPacket p : packets) {
-            currentPacket = p;
-            streamer.waitAndQueuePacket(currentPacket);
-            currentPacket = null;
-          }
-          endBlock();
-          moveToNextStreamer();
-        }
-        //read next stripe to cellBuffers
-        clearCellBuffers();
-      }
-    }
-  }
-
-  private void addToCellBuffer(byte[] b, int off, int len) {
-    cellBuffers[curIdx].put(b, off, len);
-  }
-
-  private int getSizeOfCellnBuffer(int cellIndex) {
-    return cellBuffers[cellIndex].position();
-  }
-
-  private void clearCellBuffers() {
-    for (int i = 0; i< numAllBlocks; i++) {
-      cellBuffers[i].clear();
-      if (i >= numDataBlocks) {
-        Arrays.fill(cellBuffers[i].array(), (byte) 0);
+      if (next == numDataBlocks) {
+        cellBuffers.flipDataBuffers();
+        writeParityCells();
+        next = 0;
       }
+      setCurrentStreamer(next);
     }
   }
 
@@ -284,20 +387,14 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     return numDataBlocks * cellSize;
   }
 
-  private void notSupported(String headMsg)
-      throws IOException{
-      throw new IOException(
-          headMsg + " is now not supported for striping layout.");
-  }
-
   @Override
-  public void hflush() throws IOException {
-    notSupported("hflush");
+  public void hflush() {
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public void hsync() throws IOException {
-    notSupported("hsync");
+  public void hsync() {
+    throw new UnsupportedOperationException();
   }
 
   @Override
@@ -327,29 +424,28 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     return closed || getLeadingStreamer().streamerClosed();
   }
 
-  // shutdown datastreamer and responseprocessor threads.
-  // interrupt datastreamer if force is true
   @Override
   protected void closeThreads(boolean force) throws IOException {
-    int index = 0;
-    boolean exceptionOccurred = false;
+    final MultipleIOException.Builder b = new MultipleIOException.Builder();
     for (StripedDataStreamer streamer : streamers) {
       try {
         streamer.close(force);
         streamer.join();
         streamer.closeSocket();
-      } catch (InterruptedException | IOException e) {
-        DFSClient.LOG.error("Failed to shutdown streamer: name="
-            + streamer.getName() + ", index=" + index + ", file=" + src, e);
-        exceptionOccurred = true;
+      } catch(Exception e) {
+        try {
+          handleStreamerFailure("force=" + force, e);
+        } catch(IOException ioe) {
+          b.add(ioe);
+        }
       } finally {
         streamer.setSocketToNull();
         setClosed();
-        index++;
       }
     }
-    if (exceptionOccurred) {
-      throw new IOException("Failed to shutdown streamer");
+    final IOException ioe = b.build();
+    if (ioe != null) {
+      throw ioe;
     }
   }
 
@@ -370,50 +466,69 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     if (currentBlockGroupBytes % stripeDataSize() == 0) {
       return;
     }
-    long firstCellSize = getLeadingStreamer().getBytesCurBlock() % cellSize;
-    long parityCellSize = firstCellSize > 0 && firstCellSize < cellSize ?
+
+    final int firstCellSize = (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize);
+    final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize?
         firstCellSize : cellSize;
+    final ByteBuffer[] buffers = cellBuffers.getBuffers();
 
     for (int i = 0; i < numAllBlocks; i++) {
       // Pad zero bytes to make all cells exactly the size of parityCellSize
       // If internal block is smaller than parity block, pad zero bytes.
       // Also pad zero bytes to all parity cells
-      int position = cellBuffers[i].position();
+      final int position = buffers[i].position();
       assert position <= parityCellSize : "If an internal block is smaller" +
           " than parity block, then its last cell should be small than last" +
           " parity cell";
       for (int j = 0; j < parityCellSize - position; j++) {
-        cellBuffers[i].put((byte) 0);
+        buffers[i].put((byte) 0);
       }
-      cellBuffers[i].flip();
+      buffers[i].flip();
     }
-    encode(cellBuffers);
 
-    // write parity cells
-    curIdx = numDataBlocks;
-    refreshStreamer();
+    writeParityCells();
+  }
+
+  void writeParityCells() throws IOException {
+    final ByteBuffer[] buffers = cellBuffers.getBuffers();
+    //encode the data cells
+    encode(encoder, numDataBlocks, buffers);
     for (int i = numDataBlocks; i < numAllBlocks; i++) {
-      ByteBuffer parityBuffer = cellBuffers[i];
-      List<DFSPacket> packets = generatePackets(parityBuffer);
-      for (DFSPacket p : packets) {
-        currentPacket = p;
-        streamer.waitAndQueuePacket(currentPacket);
-        currentPacket = null;
+      writeParity(i, buffers[i], cellBuffers.getChecksumArray(i));
+    }
+    cellBuffers.clear();
+  }
+
+  void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf
+      ) throws IOException {
+    final StripedDataStreamer current = setCurrentStreamer(index);
+    final int len = buffer.limit();
+
+    final long oldBytes = current.getBytesCurBlock();
+    if (!current.isFailed()) {
+      try {
+        for (DFSPacket p : generatePackets(buffer, checksumBuf)) {
+          streamer.waitAndQueuePacket(p);
+        }
+        endBlock();
+      } catch(Exception e) {
+        handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
       }
-      endBlock();
-      moveToNextStreamer();
     }
 
-    clearCellBuffers();
+    if (current.isFailed()) {
+      final long newBytes = oldBytes + len;
+      current.setBytesCurBlock(newBytes);
+    }
   }
 
   @Override
   void setClosed() {
     super.setClosed();
     for (int i = 0; i < numAllBlocks; i++) {
-      byteArrayManager.release(cellBuffers[i].array());
       streamers.get(i).release();
     }
+    cellBuffers.release();
   }
 
   @Override
@@ -425,25 +540,31 @@ public class DFSStripedOutputStream extends DFSOutputStream {
 
     try {
       // flush from all upper layers
-      flushBuffer();
-      if (currentPacket != null) {
-        streamer.waitAndQueuePacket(currentPacket);
-        currentPacket = null;
+      try {
+        flushBuffer();
+        if (currentPacket != null) {
+          enqueueCurrentPacket();
+        }
+      } catch(Exception e) {
+        handleStreamerFailure("closeImpl", e);
       }
+
       // if the last stripe is incomplete, generate and write parity cells
       writeParityCellsForLastStripe();
 
       for (int i = 0; i < numAllBlocks; i++) {
-        curIdx = i;
-        refreshStreamer();
-        if (streamer.getBytesCurBlock() > 0) {
-          // send an empty packet to mark the end of the block
-          currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
-              streamer.getAndIncCurrentSeqno(), true);
-          currentPacket.setSyncBlock(shouldSyncBlock);
+        final StripedDataStreamer s = setCurrentStreamer(i);
+        if (!s.isFailed()) {
+          try {
+            if (s.getBytesCurBlock() > 0) {
+              setCurrentPacket2Empty();
+            }
+            // flush all data to Datanode
+            flushInternal();
+          } catch(Exception e) {
+            handleStreamerFailure("closeImpl", e);
+          }
         }
-        // flush all data to Datanode
-        flushInternal();
       }
 
       closeThreads(false);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/af74bae8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index a925a60..d2ce9fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -36,6 +36,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PAS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.io.PrintStream;
 import java.io.UnsupportedEncodingException;
 import java.net.InetAddress;
@@ -55,7 +56,6 @@ import java.util.Set;
 
 import javax.net.SocketFactory;
 
-import com.google.common.collect.Sets;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Option;
@@ -96,6 +96,7 @@ import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.protobuf.BlockingService;
 
 @InterfaceAudience.Private
@@ -1525,4 +1526,11 @@ public class DFSUtil {
   public static int getSmallBufferSize(Configuration conf) {
     return Math.min(getIoFileBufferSize(conf) / 2, 512);
   }
+
+  public static InterruptedIOException toInterruptedIOException(String message,
+      InterruptedException e) {
+    final InterruptedIOException iioe = new InterruptedIOException(message);
+    iioe.initCause(e);
+    return iioe;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/af74bae8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 631f386..8f07341 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -575,7 +575,7 @@ class DataStreamer extends Daemon {
         // get new block from namenode.
         if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
           if(LOG.isDebugEnabled()) {
-            LOG.debug("Allocating new block");
+            LOG.debug("Allocating new block " + this);
           }
           setPipeline(nextBlockOutputStream());
           initDataStreaming();
@@ -593,10 +593,7 @@ class DataStreamer extends Daemon {
         long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
         if (lastByteOffsetInBlock > stat.getBlockSize()) {
           throw new IOException("BlockSize " + stat.getBlockSize() +
-              " is smaller than data size. " +
-              " Offset of packet in block " +
-              lastByteOffsetInBlock +
-              " Aborting file " + src);
+              " < lastByteOffsetInBlock, " + this + ", " + one);
         }
 
         if (one.isLastPacketInBlock()) {
@@ -1751,7 +1748,7 @@ class DataStreamer extends Daemon {
       dataQueue.addLast(packet);
       lastQueuedSeqno = packet.getSeqno();
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Queued packet " + packet.getSeqno());
+        LOG.debug("Queued " + packet + ", " + this);
       }
       dataQueue.notifyAll();
     }
@@ -1901,4 +1898,10 @@ class DataStreamer extends Daemon {
       s.close();
     }
   }
+
+  @Override
+  public String toString() {
+    return  (block == null? null: block.getLocalBlock())
+        + "@" + Arrays.toString(getNodes());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/af74bae8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index ef7e2a6..258fc65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -18,8 +18,14 @@
 
 package org.apache.hadoop.hdfs;
 
-import java.util.List;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -31,15 +37,6 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
-
 /****************************************************************************
  * The StripedDataStreamer class is used by {@link DFSStripedOutputStream}.
  * There are two kinds of StripedDataStreamer, leading streamer and ordinary
@@ -49,40 +46,32 @@ import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
  *
  ****************************************************************************/
 public class StripedDataStreamer extends DataStreamer {
-  private final short index;
-  private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
-  private boolean hasCommittedBlock = false;
+  private final Coordinator coordinator;
+  private final int index;
+  private volatile boolean isFailed;
 
-  StripedDataStreamer(HdfsFileStatus stat, ExtendedBlock block,
+  StripedDataStreamer(HdfsFileStatus stat,
                       DFSClient dfsClient, String src,
                       Progressable progress, DataChecksum checksum,
                       AtomicReference<CachingStrategy> cachingStrategy,
-                      ByteArrayManager byteArrayManage, short index,
-                      List<BlockingQueue<LocatedBlock>> stripedBlocks,
-                      String[] favoredNodes) {
-    super(stat, block, dfsClient, src, progress, checksum, cachingStrategy,
+                      ByteArrayManager byteArrayManage, String[] favoredNodes,
+                      short index, Coordinator coordinator) {
+    super(stat, null, dfsClient, src, progress, checksum, cachingStrategy,
         byteArrayManage, favoredNodes);
     this.index = index;
-    this.stripedBlocks = stripedBlocks;
+    this.coordinator = coordinator;
   }
 
-  /**
-   * Construct a data streamer for appending to the last partial block
-   * @param lastBlock last block of the file to be appended
-   * @param stat status of the file to be appended
-   * @throws IOException if error occurs
-   */
-  StripedDataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
-                      DFSClient dfsClient, String src,
-                      Progressable progress, DataChecksum checksum,
-                      AtomicReference<CachingStrategy> cachingStrategy,
-                      ByteArrayManager byteArrayManage, short index,
-                      List<BlockingQueue<LocatedBlock>> stripedBlocks)
-      throws IOException {
-    super(lastBlock, stat, dfsClient, src, progress, checksum, cachingStrategy,
-        byteArrayManage);
-    this.index = index;
-    this.stripedBlocks = stripedBlocks;
+  int getIndex() {
+    return index;
+  }
+
+  void setIsFailed(boolean isFailed) {
+    this.isFailed = isFailed;
+  }
+
+  boolean isFailed() {
+    return isFailed;
   }
 
   public boolean isLeadingStreamer () {
@@ -95,18 +84,8 @@ public class StripedDataStreamer extends DataStreamer {
 
   @Override
   protected void endBlock() {
-    if (!isLeadingStreamer() && !isParityStreamer()) {
-      // before retrieving a new block, transfer the finished block to
-      // leading streamer
-      LocatedBlock finishedBlock = new LocatedBlock(
-          new ExtendedBlock(block.getBlockPoolId(), block.getBlockId(),
-              block.getNumBytes(), block.getGenerationStamp()), null);
-      try {
-        boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
-            TimeUnit.SECONDS);
-      } catch (InterruptedException ie) {
-        // TODO: Handle InterruptedException (HDFS-7786)
-      }
+    if (!isParityStreamer()) {
+      coordinator.putEndBlock(index, block);
     }
     super.endBlock();
   }
@@ -114,71 +93,40 @@ public class StripedDataStreamer extends DataStreamer {
   @Override
   protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
       throws IOException {
-    LocatedBlock lb = null;
     if (isLeadingStreamer()) {
-      if (hasCommittedBlock) {
-        /**
-         * when committing a block group, leading streamer has to adjust
-         * {@link block} to include the size of block group
-         */
-        for (int i = 1; i < NUM_DATA_BLOCKS; i++) {
-          try {
-            LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
-                TimeUnit.SECONDS);
-            if (finishedLocatedBlock == null) {
-              throw new IOException("Fail to get finished LocatedBlock " +
-                  "from streamer, i=" + i);
-            }
-            ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
-            long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
-            if (block != null) {
-              block.setNumBytes(block.getNumBytes() + bytes);
-            }
-          } catch (InterruptedException ie) {
-            DFSClient.LOG.info("InterruptedException received when putting" +
-                " a block to stripeBlocks, ie = " + ie);
-          }
+      if (coordinator.shouldLocateFollowingBlock()) {
+        // set numByte for the previous block group
+        long bytes = 0;
+        for (int i = 0; i < NUM_DATA_BLOCKS; i++) {
+          final ExtendedBlock b = coordinator.getEndBlock(i);
+          bytes += b == null ? 0 : b.getNumBytes();
         }
+        block.setNumBytes(bytes);
       }
 
-      lb = super.locateFollowingBlock(excludedNodes);
-      hasCommittedBlock = true;
-      assert lb instanceof LocatedStripedBlock;
-      DFSClient.LOG.debug("Leading streamer obtained bg " + lb);
-      LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
-          (LocatedStripedBlock) lb, BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS,
-          NUM_PARITY_BLOCKS);
+      final LocatedStripedBlock lsb
+          = (LocatedStripedBlock)super.locateFollowingBlock(excludedNodes);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Obtained block group " + lsb);
+      }
+      LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(lsb,
+          BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
+
       assert blocks.length == (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) :
           "Fail to get block group from namenode: blockGroupSize: " +
               (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length: " +
               blocks.length;
-      lb = blocks[0];
-      for (int i = 1; i < blocks.length; i++) {
-        try {
-          boolean offSuccess = stripedBlocks.get(i).offer(blocks[i],
-              90, TimeUnit.SECONDS);
-          if(!offSuccess){
-            String msg = "Fail to put block to stripeBlocks. i = " + i;
-            DFSClient.LOG.info(msg);
-            throw new IOException(msg);
-          } else {
-            DFSClient.LOG.info("Allocate a new block to a streamer. i = " + i
-                + ", block: " + blocks[i]);
-          }
-        } catch (InterruptedException ie) {
-          DFSClient.LOG.info("InterruptedException received when putting" +
-              " a block to stripeBlocks, ie = " + ie);
-        }
-      }
-    } else {
-      try {
-        // wait 90 seconds to get a block from the queue
-        lb = stripedBlocks.get(index).poll(90, TimeUnit.SECONDS);
-      } catch (InterruptedException ie) {
-        DFSClient.LOG.info("InterruptedException received when retrieving " +
-            "a block from stripeBlocks, ie = " + ie);
+      for (int i = 0; i < blocks.length; i++) {
+        coordinator.putStripedBlock(i, blocks[i]);
       }
     }
-    return lb;
+
+    return coordinator.getStripedBlock(index);
+  }
+
+  @Override
+  public String toString() {
+    return "#" + index + ": isFailed? " + Boolean.toString(isFailed).charAt(0)
+        + ", " + super.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/af74bae8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index fdbacdc..4ec9bf9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1998,8 +1998,6 @@ public class MiniDFSCluster {
     int node = -1;
     for (int i = 0; i < dataNodes.size(); i++) {
       DataNode dn = dataNodes.get(i).datanode;
-      LOG.info("DN name=" + dnName + " found DN=" + dn +
-          " with name=" + dn.getDisplayName());
       if (dnName.equals(dn.getDatanodeId().getXferAddr())) {
         node = i;
         break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/af74bae8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index 5ce94ee..ec98e68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -42,6 +44,12 @@ import org.junit.Test;
 
 public class TestDFSStripedOutputStream {
   public static final Log LOG = LogFactory.getLog(TestDFSStripedOutputStream.class);
+
+  static {
+    GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
+  }
+
   private int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
   private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
 
@@ -245,6 +253,11 @@ public class TestDFSStripedOutputStream {
 
   static void verifyParity(final long size, final int cellSize,
       byte[][] dataBytes, byte[][] parityBytes) {
+    verifyParity(size, cellSize, dataBytes, parityBytes, -1);
+  }
+
+  static void verifyParity(final long size, final int cellSize,
+      byte[][] dataBytes, byte[][] parityBytes, int killedDnIndex) {
     // verify the parity blocks
     int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
         size, cellSize, dataBytes.length, dataBytes.length);
@@ -265,7 +278,10 @@ public class TestDFSStripedOutputStream {
     encoder.initialize(dataBytes.length, parityBytes.length, cellSize);
     encoder.encode(dataBytes, expectedParityBytes);
     for (int i = 0; i < parityBytes.length; i++) {
-      Assert.assertArrayEquals(expectedParityBytes[i], parityBytes[i]);
+      if (i != killedDnIndex) {
+        Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + killedDnIndex,
+            expectedParityBytes[i], parityBytes[i]);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/af74bae8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
new file mode 100644
index 0000000..c2e588a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDFSStripedOutputStreamWithFailure {
+  public static final Log LOG = LogFactory.getLog(
+      TestDFSStripedOutputStreamWithFailure.class);
+  static {
+    GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
+  }
+
+  private static final int NUM_DATA_BLOCKS = HdfsConstants.NUM_DATA_BLOCKS;
+  private static final int NUM_PARITY_BLOCKS = HdfsConstants.NUM_PARITY_BLOCKS;
+  private static final int CELL_SIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private static final int STRIPES_PER_BLOCK = 4;
+  private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK;
+  private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS;
+
+  private final HdfsConfiguration conf = new HdfsConfiguration();
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem dfs;
+  private final Path dir = new Path("/"
+      + TestDFSStripedOutputStreamWithFailure.class.getSimpleName());
+
+
+  @Before
+  public void setup() throws IOException {
+    final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.waitActive();
+    dfs = cluster.getFileSystem();
+    dfs.mkdirs(dir);
+    dfs.createErasureCodingZone(dir, null);
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private static byte getByte(long pos) {
+    return (byte)pos;
+  }
+
+  @Test(timeout=120000)
+  public void testDatanodeFailure1() {
+    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+    final int dn = 1;
+    runTest("file" + dn, length, dn);
+  }
+
+  @Test(timeout=120000)
+  public void testDatanodeFailure2() {
+    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+    final int dn = 2;
+    runTest("file" + dn, length, dn);
+  }
+
+  @Test(timeout=120000)
+  public void testDatanodeFailure3() {
+    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+    final int dn = 3;
+    runTest("file" + dn, length, dn);
+  }
+
+  @Test(timeout=120000)
+  public void testDatanodeFailure4() {
+    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+    final int dn = 4;
+    runTest("file" + dn, length, dn);
+  }
+
+  @Test(timeout=120000)
+  public void testDatanodeFailure5() {
+    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+    final int dn = 5;
+    runTest("file" + dn, length, dn);
+  }
+
+  @Test(timeout=120000)
+  public void testDatanodeFailure6() {
+    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+    final int dn = 6;
+    runTest("file" + dn, length, dn);
+  }
+
+  @Test(timeout=120000)
+  public void testDatanodeFailure7() {
+    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+    final int dn = 7;
+    runTest("file" + dn, length, dn);
+  }
+
+  @Test(timeout=120000)
+  public void testDatanodeFailure8() {
+    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+    final int dn = 8;
+    runTest("file" + dn, length, dn);
+  }
+
+  private void runTest(final String src, final int length, final int dnIndex) {
+    try {
+      cluster.startDataNodes(conf, 1, true, null, null);
+      cluster.waitActive();
+
+      runTest(new Path(dir, src), length, dnIndex);
+    } catch(Exception e) {
+      LOG.info("FAILED", e);
+      Assert.fail(StringUtils.stringifyException(e));
+    }
+  }
+
+  private void runTest(final Path p, final int length,
+      final int dnIndex) throws Exception {
+    LOG.info("p=" + p + ", length=" + length + ", dnIndex=" + dnIndex);
+    final String fullPath = p.toString();
+
+    final AtomicInteger pos = new AtomicInteger();
+    final FSDataOutputStream out = dfs.create(p);
+    final AtomicBoolean killed = new AtomicBoolean();
+    final Thread killer = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        killDatanode(cluster, (DFSStripedOutputStream)out.getWrappedStream(),
+            dnIndex, pos);
+        killed.set(true);
+      }
+    });
+    killer.start();
+
+    final int mask = (1 << 16) - 1;
+    for(; pos.get() < length; ) {
+      final int i = pos.getAndIncrement();
+      write(out, i);
+      if ((i & mask) == 0) {
+        final long ms = 100;
+        LOG.info("i=" + i + " sleep " + ms);
+        Thread.sleep(ms);
+      }
+    }
+    killer.join(10000);
+    Assert.assertTrue(killed.get());
+    out.close();
+
+    // check file length
+    final FileStatus status = dfs.getFileStatus(p);
+    Assert.assertEquals(length, status.getLen());
+
+    checkData(dfs, fullPath, length, dnIndex);
+  }
+
+  static void write(FSDataOutputStream out, int i) throws IOException {
+    try {
+      out.write(getByte(i));
+    } catch(IOException ioe) {
+      throw new IOException("Failed at i=" + i, ioe);
+    }
+  }
+
+  static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
+    for(;;) {
+      final DatanodeInfo[] datanodes = streamer.getNodes();
+      if (datanodes != null) {
+        Assert.assertEquals(1, datanodes.length);
+        Assert.assertNotNull(datanodes[0]);
+        return datanodes[0];
+      }
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException ignored) {
+        return null;
+      }
+    }
+  }
+
+  static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out,
+      final int dnIndex, final AtomicInteger pos) {
+    final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
+    final DatanodeInfo datanode = getDatanodes(s);
+    LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
+    cluster.stopDataNode(datanode.getXferAddr());
+  }
+
+  static void checkData(DistributedFileSystem dfs, String src, int length,
+      int killedDnIndex) throws IOException {
+    List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
+    LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(src, 0L);
+    final int expectedNumGroup = (length - 1)/BLOCK_GROUP_SIZE + 1;
+    Assert.assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size());
+
+    for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
+      Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
+      LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
+          (LocatedStripedBlock) firstBlock,
+          CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
+      blockGroupList.add(Arrays.asList(blocks));
+    }
+
+    // test each block group
+    for (int group = 0; group < blockGroupList.size(); group++) {
+      final boolean isLastGroup = group == blockGroupList.size() - 1;
+      final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE
+          : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE;
+      final int numCellInGroup = (int)((groupSize - 1)/CELL_SIZE + 1);
+      final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS;
+      final int lastCellSize = groupSize - (numCellInGroup - 1)*CELL_SIZE;
+
+      //get the data of this block
+      List<LocatedBlock> blockList = blockGroupList.get(group);
+      byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][];
+      byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][];
+
+      // for each block, use BlockReader to read data
+      for (int i = 0; i < blockList.size(); i++) {
+        final int j = i >= NUM_DATA_BLOCKS? 0: i;
+        final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS
+            + (j <= lastCellIndex? 1: 0);
+        final int blockSize = numCellInBlock*CELL_SIZE
+            + (isLastGroup && i == lastCellIndex? lastCellSize - CELL_SIZE: 0);
+
+        final byte[] blockBytes = new byte[blockSize];
+        if (i < NUM_DATA_BLOCKS) {
+          dataBlockBytes[i] = blockBytes;
+        } else {
+          parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes;
+        }
+
+        final LocatedBlock lb = blockList.get(i);
+        LOG.info("XXX i=" + i + ", lb=" + lb);
+        if (lb == null) {
+          continue;
+        }
+        final ExtendedBlock block = lb.getBlock();
+        Assert.assertEquals(blockSize, block.getNumBytes());
+
+
+        if (block.getNumBytes() == 0) {
+          continue;
+        }
+
+        if (i != killedDnIndex) {
+          final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
+              dfs, lb, 0, block.getNumBytes());
+          blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
+          blockReader.close();
+        }
+      }
+
+      // check data
+      final int groupPosInFile = group*BLOCK_GROUP_SIZE;
+      for (int i = 0; i < dataBlockBytes.length; i++) {
+        final byte[] actual = dataBlockBytes[i];
+        for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) {
+          final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(
+              CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile;
+          Assert.assertTrue(posInFile < length);
+          final byte expected = getByte(posInFile);
+
+          if (i == killedDnIndex) {
+            actual[posInBlk] = expected;
+          } else {
+            String s = "expected=" + expected + " but actual=" + actual[posInBlk]
+                + ", posInFile=" + posInFile + ", posInBlk=" + posInBlk
+                + ". group=" + group + ", i=" + i;
+            Assert.assertEquals(s, expected, actual[posInBlk]);
+          }
+        }
+      }
+
+      // check parity
+      TestDFSStripedOutputStream.verifyParity(
+          lbs.getLocatedBlocks().get(group).getBlockSize(),
+          CELL_SIZE, dataBlockBytes, parityBlockBytes,
+          killedDnIndex - dataBlockBytes.length);
+    }
+  }
+}