You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/09/28 23:41:22 UTC

[1/2] hadoop git commit: HDFS-9040. Erasure coding: coordinate data streamers in DFSStripedOutputStream. Contributed by Jing Zhao and Walter Su.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7285 c09dc258a -> 6419900ac


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
index 8d4a0cf..12453fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -20,23 +20,35 @@ package org.apache.hadoop.hdfs;
 import com.google.common.base.Joiner;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 import org.junit.Assert;
 
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Random;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.junit.Assert.assertEquals;
+
 public class StripedFileTestUtil {
   public static final Log LOG = LogFactory.getLog(StripedFileTestUtil.class);
   /*
@@ -50,8 +62,8 @@ public class StripedFileTestUtil {
   static final int stripesPerBlock = 4;
   static final int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock;
   static final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2;
+  static final int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS;
 
-  static final Random random = new Random();
 
   static byte[] generateBytes(int cnt) {
     byte[] bytes = new byte[cnt];
@@ -61,6 +73,11 @@ public class StripedFileTestUtil {
     return bytes;
   }
 
+  static byte getByte(long pos) {
+    final int mod = 29;
+    return (byte) (pos % mod + 1);
+  }
+
   static int readAll(FSDataInputStream in, byte[] buf) throws IOException {
     int readLen = 0;
     int ret;
@@ -71,15 +88,10 @@ public class StripedFileTestUtil {
     return readLen;
   }
 
-  static byte getByte(long pos) {
-    final int mod = 29;
-    return (byte) (pos % mod + 1);
-  }
-
   static void verifyLength(FileSystem fs, Path srcPath, int fileLength)
       throws IOException {
     FileStatus status = fs.getFileStatus(srcPath);
-    Assert.assertEquals("File length should be the same", fileLength, status.getLen());
+    assertEquals("File length should be the same", fileLength, status.getLen());
   }
 
   static void verifyPread(FileSystem fs, Path srcPath,  int fileLength,
@@ -101,9 +113,7 @@ public class StripedFileTestUtil {
           offset += target;
         }
         for (int i = 0; i < fileLength - startOffset; i++) {
-          Assert.assertEquals("Byte at " + (startOffset + i) + " is different, "
-                  + "the startOffset is " + startOffset,
-              expected[startOffset + i], result[i]);
+          assertEquals("Byte at " + (startOffset + i) + " is different, " + "the startOffset is " + startOffset, expected[startOffset + i], result[i]);
         }
       }
     }
@@ -119,8 +129,7 @@ public class StripedFileTestUtil {
         System.arraycopy(buf, 0, result, readLen, ret);
         readLen += ret;
       }
-      Assert.assertEquals("The length of file should be the same to write size",
-          fileLength, readLen);
+      assertEquals("The length of file should be the same to write size", fileLength, readLen);
       Assert.assertArrayEquals(expected, result);
     }
   }
@@ -137,8 +146,7 @@ public class StripedFileTestUtil {
         result.put(buf);
         buf.clear();
       }
-      Assert.assertEquals("The length of file should be the same to write size",
-          fileLength, readLen);
+      assertEquals("The length of file should be the same to write size", fileLength, readLen);
       Assert.assertArrayEquals(expected, result.array());
     }
   }
@@ -199,10 +207,9 @@ public class StripedFileTestUtil {
     fsdis.seek(pos);
     byte[] buf = new byte[writeBytes];
     int readLen = StripedFileTestUtil.readAll(fsdis, buf);
-    Assert.assertEquals(readLen, writeBytes - pos);
+    assertEquals(readLen, writeBytes - pos);
     for (int i = 0; i < readLen; i++) {
-      Assert.assertEquals("Byte at " + i + " should be the same",
-          StripedFileTestUtil.getByte(pos + i), buf[i]);
+      assertEquals("Byte at " + i + " should be the same", StripedFileTestUtil.getByte(pos + i), buf[i]);
     }
   }
 
@@ -210,6 +217,7 @@ public class StripedFileTestUtil {
       final int dnIndex, final AtomicInteger pos) {
     final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
     final DatanodeInfo datanode = getDatanodes(s);
+    assert datanode != null;
     LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
     cluster.stopDataNode(datanode.getXferAddr());
   }
@@ -218,7 +226,7 @@ public class StripedFileTestUtil {
     for(;;) {
       final DatanodeInfo[] datanodes = streamer.getNodes();
       if (datanodes != null) {
-        Assert.assertEquals(1, datanodes.length);
+        assertEquals(1, datanodes.length);
         Assert.assertNotNull(datanodes[0]);
         return datanodes[0];
       }
@@ -287,7 +295,6 @@ public class StripedFileTestUtil {
    * @param min minimum of the range
    * @param max maximum of the range
    * @param n number to be generated
-   * @return
    */
   public static int[] randomArray(int min, int max, int n){
     if (n > (max - min + 1) || max < min || min < 0 || max < 0) {
@@ -315,4 +322,170 @@ public class StripedFileTestUtil {
     }
     return result;
   }
+
+  /**
+   * Verify that blocks in striped block group are on different nodes, and every
+   * internal blocks exists.
+   */
+  public static void verifyLocatedStripedBlocks(LocatedBlocks lbs, int groupSize) {
+    for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+      assert lb instanceof LocatedStripedBlock;
+      HashSet<DatanodeInfo> locs = new HashSet<>();
+      Collections.addAll(locs, lb.getLocations());
+      assertEquals(groupSize, lb.getLocations().length);
+      assertEquals(groupSize, locs.size());
+
+      // verify that every internal blocks exists
+      int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices();
+      assertEquals(groupSize, blockIndices.length);
+      HashSet<Integer> found = new HashSet<>();
+      for (int index : blockIndices) {
+        assert index >=0;
+        found.add(index);
+      }
+      assertEquals(groupSize, found.size());
+    }
+  }
+
+  static void checkData(DistributedFileSystem dfs, Path srcPath, int length,
+      int[] killedDnIndex, long oldGS) throws IOException {
+
+    StripedFileTestUtil.verifyLength(dfs, srcPath, length);
+    Arrays.sort(killedDnIndex);
+    List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
+    LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(srcPath.toString(), 0L,
+        Long.MAX_VALUE);
+    int expectedNumGroup = 0;
+    if (length > 0) {
+      expectedNumGroup = (length - 1) / BLOCK_GROUP_SIZE + 1;
+    }
+    assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size());
+
+    for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
+      Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
+
+      final long gs = firstBlock.getBlock().getGenerationStamp();
+      final String s = "gs=" + gs + ", oldGS=" + oldGS;
+      LOG.info(s);
+      Assert.assertTrue(s, gs >= oldGS);
+
+      LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
+          (LocatedStripedBlock) firstBlock, BLOCK_STRIPED_CELL_SIZE,
+          NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
+      blockGroupList.add(Arrays.asList(blocks));
+    }
+
+    // test each block group
+    for (int group = 0; group < blockGroupList.size(); group++) {
+      final boolean isLastGroup = group == blockGroupList.size() - 1;
+      final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE
+          : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE;
+      final int numCellInGroup = (groupSize - 1)/BLOCK_STRIPED_CELL_SIZE + 1;
+      final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS;
+      final int lastCellSize = groupSize - (numCellInGroup - 1)*BLOCK_STRIPED_CELL_SIZE;
+
+      //get the data of this block
+      List<LocatedBlock> blockList = blockGroupList.get(group);
+      byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][];
+      byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][];
+
+      // for each block, use BlockReader to read data
+      for (int i = 0; i < blockList.size(); i++) {
+        final int j = i >= NUM_DATA_BLOCKS? 0: i;
+        final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS
+            + (j <= lastCellIndex? 1: 0);
+        final int blockSize = numCellInBlock*BLOCK_STRIPED_CELL_SIZE
+            + (isLastGroup && j == lastCellIndex? lastCellSize - BLOCK_STRIPED_CELL_SIZE: 0);
+
+        final byte[] blockBytes = new byte[blockSize];
+        if (i < NUM_DATA_BLOCKS) {
+          dataBlockBytes[i] = blockBytes;
+        } else {
+          parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes;
+        }
+
+        final LocatedBlock lb = blockList.get(i);
+        LOG.info("i,j=" + i + ", " + j + ", numCellInBlock=" + numCellInBlock
+            + ", blockSize=" + blockSize + ", lb=" + lb);
+        if (lb == null) {
+          continue;
+        }
+        final ExtendedBlock block = lb.getBlock();
+        assertEquals(blockSize, block.getNumBytes());
+
+        if (block.getNumBytes() == 0) {
+          continue;
+        }
+
+        if (Arrays.binarySearch(killedDnIndex, i) < 0) {
+          final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
+              dfs, lb, 0, block.getNumBytes());
+          blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
+          blockReader.close();
+        }
+      }
+
+      // check data
+      final int groupPosInFile = group*BLOCK_GROUP_SIZE;
+      for (int i = 0; i < dataBlockBytes.length; i++) {
+        boolean killed = false;
+        if (Arrays.binarySearch(killedDnIndex, i) >= 0){
+          killed = true;
+        }
+        final byte[] actual = dataBlockBytes[i];
+        for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) {
+          final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(
+              BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile;
+          Assert.assertTrue(posInFile < length);
+          final byte expected = getByte(posInFile);
+
+          if (killed) {
+            actual[posInBlk] = expected;
+          } else {
+            if(expected != actual[posInBlk]){
+              String s = "expected=" + expected + " but actual=" + actual[posInBlk]
+                  + ", posInFile=" + posInFile + ", posInBlk=" + posInBlk
+                  + ". group=" + group + ", i=" + i;
+              Assert.fail(s);
+            }
+          }
+        }
+      }
+
+      // check parity
+      verifyParityBlocks(dfs.getConf(), lbs.getLocatedBlocks().get(group)
+              .getBlockSize(),
+          BLOCK_STRIPED_CELL_SIZE, dataBlockBytes, parityBlockBytes, killedDnIndex);
+    }
+  }
+
+  static void verifyParityBlocks(Configuration conf, final long size, final int cellSize,
+      byte[][] dataBytes, byte[][] parityBytes, int[] killedDnIndex) {
+    Arrays.sort(killedDnIndex);
+    // verify the parity blocks
+    int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
+        size, cellSize, dataBytes.length, dataBytes.length);
+    final byte[][] expectedParityBytes = new byte[parityBytes.length][];
+    for (int i = 0; i < parityBytes.length; i++) {
+      expectedParityBytes[i] = new byte[parityBlkSize];
+    }
+    for (int i = 0; i < dataBytes.length; i++) {
+      if (dataBytes[i] == null) {
+        dataBytes[i] = new byte[dataBytes[0].length];
+      } else if (dataBytes[i].length < dataBytes[0].length) {
+        final byte[] tmp = dataBytes[i];
+        dataBytes[i] = new byte[dataBytes[0].length];
+        System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length);
+      }
+    }
+    final RawErasureEncoder encoder =
+        CodecUtil.createRSRawEncoder(conf, dataBytes.length, parityBytes.length);
+    encoder.encode(dataBytes, expectedParityBytes);
+    for (int i = 0; i < parityBytes.length; i++) {
+      if (Arrays.binarySearch(killedDnIndex, dataBytes.length + i) < 0){
+        Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + Arrays.toString(killedDnIndex),
+            expectedParityBytes[i], parityBytes[i]);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index 0641e8e..d78e88b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -18,26 +18,14 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
-import org.apache.hadoop.hdfs.util.StripedBlockUtil;
-import org.apache.hadoop.io.erasurecode.CodecUtil;
-import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -154,141 +142,15 @@ public class TestDFSStripedOutputStream {
         + cellSize + 123);
   }
 
-  private byte[] generateBytes(int cnt) {
-    byte[] bytes = new byte[cnt];
-    for (int i = 0; i < cnt; i++) {
-      bytes[i] = getByte(i);
-    }
-    return bytes;
-  }
-
-  private byte getByte(long pos) {
-    int mod = 29;
-    return (byte) (pos % mod + 1);
-  }
-
   private void testOneFile(String src, int writeBytes) throws Exception {
     src += "_" + writeBytes;
     Path testPath = new Path(src);
 
-    byte[] bytes = generateBytes(writeBytes);
+    byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes);
     DFSTestUtil.writeFile(fs, testPath, new String(bytes));
     StripedFileTestUtil.waitBlockGroupsReported(fs, src);
 
-    // check file length
-    FileStatus status = fs.getFileStatus(testPath);
-    Assert.assertEquals(writeBytes, status.getLen());
-
-    checkData(src, writeBytes);
-  }
-
-  void checkData(String src, int writeBytes) throws IOException {
-    List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
-    LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
-
-    for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
-      Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
-      LocatedBlock[] blocks = StripedBlockUtil.
-          parseStripedBlockGroup((LocatedStripedBlock) firstBlock,
-              cellSize, dataBlocks, parityBlocks);
-      List<LocatedBlock> oneGroup = Arrays.asList(blocks);
-      blockGroupList.add(oneGroup);
-    }
-
-    // test each block group
-    for (int group = 0; group < blockGroupList.size(); group++) {
-      //get the data of this block
-      List<LocatedBlock> blockList = blockGroupList.get(group);
-      byte[][] dataBlockBytes = new byte[dataBlocks][];
-      byte[][] parityBlockBytes = new byte[parityBlocks][];
-
-      // for each block, use BlockReader to read data
-      for (int i = 0; i < blockList.size(); i++) {
-        LocatedBlock lblock = blockList.get(i);
-        if (lblock == null) {
-          continue;
-        }
-        ExtendedBlock block = lblock.getBlock();
-        byte[] blockBytes = new byte[(int)block.getNumBytes()];
-        if (i < dataBlocks) {
-          dataBlockBytes[i] = blockBytes;
-        } else {
-          parityBlockBytes[i - dataBlocks] = blockBytes;
-        }
-
-        if (block.getNumBytes() == 0) {
-          continue;
-        }
-
-        final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
-            fs, lblock, 0, block.getNumBytes());
-        blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
-        blockReader.close();
-      }
-
-      // check if we write the data correctly
-      for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length;
-           blkIdxInGroup++) {
-        final byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup];
-        if (actualBlkBytes == null) {
-          continue;
-        }
-        for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) {
-          // calculate the position of this byte in the file
-          long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize,
-              dataBlocks, posInBlk, blkIdxInGroup) +
-              group * blockSize * dataBlocks;
-          Assert.assertTrue(posInFile < writeBytes);
-          final byte expected = getByte(posInFile);
-
-          String s = "Unexpected byte " + actualBlkBytes[posInBlk]
-              + ", expect " + expected
-              + ". Block group index is " + group
-              + ", stripe index is " + posInBlk / cellSize
-              + ", cell index is " + blkIdxInGroup
-              + ", byte index is " + posInBlk % cellSize;
-          Assert.assertEquals(s, expected, actualBlkBytes[posInBlk]);
-        }
-      }
-
-      verifyParity(lbs.getLocatedBlocks().get(group).getBlockSize(),
-          cellSize, dataBlockBytes, parityBlockBytes);
-    }
-  }
-
-  void verifyParity(final long size, final int cellSize,
-      byte[][] dataBytes, byte[][] parityBytes) {
-    verifyParity(conf, size, cellSize, dataBytes, parityBytes, -1);
-  }
-
-  static void verifyParity(Configuration conf, final long size,
-                           final int cellSize, byte[][] dataBytes,
-                           byte[][] parityBytes, int killedDnIndex) {
-    // verify the parity blocks
-    int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
-        size, cellSize, dataBytes.length, dataBytes.length);
-    final byte[][] expectedParityBytes = new byte[parityBytes.length][];
-    for (int i = 0; i < parityBytes.length; i++) {
-      expectedParityBytes[i] = new byte[parityBlkSize];
-    }
-    for (int i = 0; i < dataBytes.length; i++) {
-      if (dataBytes[i] == null) {
-        dataBytes[i] = new byte[dataBytes[0].length];
-      } else if (dataBytes[i].length < dataBytes[0].length) {
-        final byte[] tmp = dataBytes[i];
-        dataBytes[i] = new byte[dataBytes[0].length];
-        System.arraycopy(tmp, 0, dataBytes[i], 0, tmp.length);
-      }
-    }
-    final RawErasureEncoder encoder =
-            CodecUtil.createRSRawEncoder(conf,
-                dataBytes.length, parityBytes.length);
-    encoder.encode(dataBytes, expectedParityBytes);
-    for (int i = 0; i < parityBytes.length; i++) {
-      if (i != killedDnIndex) {
-        Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + killedDnIndex,
-            expectedParityBytes[i], parityBytes[i]);
-      }
-    }
+    StripedFileTestUtil.checkData(fs, testPath, writeBytes,
+        new int[]{}, 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
index 44a29e6..f6c2566 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -30,23 +31,18 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -74,6 +70,7 @@ public class TestDFSStripedOutputStreamWithFailure {
 
   private static final int FLUSH_POS
       = 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
+
   static {
     System.out.println("NUM_DATA_BLOCKS  = " + NUM_DATA_BLOCKS);
     System.out.println("NUM_PARITY_BLOCKS= " + NUM_PARITY_BLOCKS);
@@ -101,6 +98,32 @@ public class TestDFSStripedOutputStreamWithFailure {
     return lengths;
   }
 
+  private static final int[][] dnIndexSuite = {
+      {0, 1},
+      {0, 5},
+      {0, 6},
+      {0, 8},
+      {1, 5},
+      {1, 6},
+      {6, 8},
+      {0, 1, 2},
+      {3, 4, 5},
+      {0, 1, 6},
+      {0, 5, 6},
+      {0, 5, 8},
+      {0, 6, 7},
+      {5, 6, 7},
+      {6, 7, 8},
+  };
+
+  private int[] getKillPositions(int fileLen, int num) {
+    int[] positions = new int[num];
+    for (int i = 0; i < num; i++) {
+      positions[i] = fileLen * (i + 1) / (num + 1);
+    }
+    return positions;
+  }
+
   private static final List<Integer> LENGTHS = newLengths();
 
   static int getLength(int i) {
@@ -127,42 +150,26 @@ public class TestDFSStripedOutputStreamWithFailure {
     }
   }
 
-  private static byte getByte(long pos) {
-    return (byte)pos;
-  }
-
   private HdfsConfiguration newHdfsConfiguration() {
     final HdfsConfiguration conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L);
+    conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L);
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
     return conf;
   }
 
-  void runTest(final int length) {
-    final HdfsConfiguration conf = newHdfsConfiguration();
-    for (int dn = 0; dn < 9; dn++) {
-      try {
-        setup(conf);
-        runTest(length, dn, false, conf);
-      } catch (Exception e) {
-        final String err = "failed, dn=" + dn + ", length=" + length
-            + StringUtils.stringifyException(e);
-        LOG.error(err);
-        Assert.fail(err);
-      } finally {
-        tearDown();
-      }
-    }
-  }
-
   @Test(timeout=240000)
   public void testDatanodeFailure56() throws Exception {
     runTest(getLength(56));
   }
 
   @Test(timeout=240000)
+  public void testMultipleDatanodeFailure56() throws Exception {
+    runTestWithMultipleFailure(getLength(56));
+  }
+
+  @Test(timeout=240000)
   public void testBlockTokenExpired() throws Exception {
     final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE);
     final HdfsConfiguration conf = newHdfsConfiguration();
@@ -174,7 +181,7 @@ public class TestDFSStripedOutputStreamWithFailure {
     for (int dn = 0; dn < 9; dn += 2) {
       try {
         setup(conf);
-        runTest(length, dn, true, conf);
+        runTest(length, new int[]{length/2}, new int[]{dn}, true);
       } catch (Exception e) {
         LOG.error("failed, dn=" + dn + ", length=" + length);
         throw e;
@@ -214,22 +221,8 @@ public class TestDFSStripedOutputStreamWithFailure {
         Assert.fail("Failed to validate available dns against blkGroupSize");
       } catch (IOException ioe) {
         // expected
-        GenericTestUtils.assertExceptionContains("Failed: the number of "
-            + "remaining blocks = 5 < the number of data blocks = 6", ioe);
-        DFSStripedOutputStream dfsout = (DFSStripedOutputStream) out
-            .getWrappedStream();
-
-        // get leading streamer and verify the last exception
-        StripedDataStreamer datastreamer = dfsout.getStripedDataStreamer(0);
-        try {
-          datastreamer.getLastException().check(true);
-          Assert.fail("Failed to validate available dns against blkGroupSize");
-        } catch (IOException le) {
-          GenericTestUtils.assertExceptionContains(
-              "Failed to get datablocks number of nodes from"
-                  + " namenode: blockGroupSize= 9, blocks.length= "
-                  + numDatanodes, le);
-        }
+        GenericTestUtils.assertExceptionContains("Failed to get 6 nodes from" +
+            " namenode: blockGroupSize= 9, blocks.length= 5", ioe);
       }
     } finally {
       tearDown();
@@ -258,42 +251,73 @@ public class TestDFSStripedOutputStreamWithFailure {
       int fileLength = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE - 1000;
       final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
       DFSTestUtil.writeFile(dfs, srcPath, new String(expected));
+      LOG.info("writing finished. Seek and read the file to verify.");
       StripedFileTestUtil.verifySeek(dfs, srcPath, fileLength);
     } finally {
       tearDown();
     }
   }
 
-  private void runTest(final int length, final int dnIndex,
-      final boolean tokenExpire, final HdfsConfiguration conf) {
-    try {
-      runTest(length, length/2, dnIndex, tokenExpire, conf);
-    } catch(Exception e) {
-      LOG.info("FAILED", e);
-      Assert.fail(StringUtils.stringifyException(e));
+  void runTest(final int length) {
+    final HdfsConfiguration conf = newHdfsConfiguration();
+    for (int dn = 0; dn < 9; dn++) {
+      try {
+        setup(conf);
+        runTest(length, new int[]{length/2}, new int[]{dn}, false);
+      } catch (Throwable e) {
+        final String err = "failed, dn=" + dn + ", length=" + length
+            + StringUtils.stringifyException(e);
+        LOG.error(err);
+        Assert.fail(err);
+      } finally {
+        tearDown();
+      }
     }
   }
 
-  private void runTest(final int length, final int killPos,
-      final int dnIndex, final boolean tokenExpire,
-      final HdfsConfiguration conf) throws Exception {
-    if (killPos <= FLUSH_POS) {
-      LOG.warn("killPos=" + killPos + " <= FLUSH_POS=" + FLUSH_POS
-          + ", length=" + length + ", dnIndex=" + dnIndex);
-      return; //skip test
+  void runTestWithMultipleFailure(final int length) throws Exception {
+    final HdfsConfiguration conf = newHdfsConfiguration();
+    for(int i=0;i<dnIndexSuite.length;i++){
+      int[] dnIndex = dnIndexSuite[i];
+      int[] killPos = getKillPositions(length, dnIndex.length);
+      try {
+        setup(conf);
+        runTest(length, killPos, dnIndex, false);
+      } catch (Throwable e) {
+        final String err = "failed, killPos=" + Arrays.toString(killPos)
+            + ", dnIndex=" + Arrays.toString(dnIndex) + ", length=" + length;
+        LOG.error(err);
+        throw e;
+      } finally {
+        tearDown();
+      }
     }
-    Preconditions.checkArgument(length > killPos,
-        "length=%s <= killPos=%s", length, killPos);
+  }
 
-    // start a datanode now, will kill one later
-    cluster.startDataNodes(conf, 1, true, null, null);
-    cluster.waitActive();
+  /**
+   * runTest implementation
+   * @param length file length
+   * @param killPos killing positions in ascending order
+   * @param dnIndex DN index to kill when meets killing positions
+   * @param tokenExpire wait token to expire when kill a DN
+   * @throws Exception
+   */
+  private void runTest(final int length, final int[] killPos,
+      final int[] dnIndex, final boolean tokenExpire) throws Exception {
+    if (killPos[0] <= FLUSH_POS) {
+      LOG.warn("killPos=" + Arrays.toString(killPos) + " <= FLUSH_POS=" + FLUSH_POS
+          + ", length=" + length + ", dnIndex=" + Arrays.toString(dnIndex));
+      return; //skip test
+    }
+    Preconditions.checkArgument(length > killPos[0], "length=%s <= killPos=%s",
+        length, killPos);
+    Preconditions.checkArgument(killPos.length == dnIndex.length);
 
-    final Path p = new Path(dir, "dn" + dnIndex + "len" + length + "kill" +  killPos);
+    final Path p = new Path(dir, "dn" + Arrays.toString(dnIndex)
+        + "len" + length + "kill" +  Arrays.toString(killPos));
     final String fullPath = p.toString();
     LOG.info("fullPath=" + fullPath);
 
-
     if (tokenExpire) {
       final NameNode nn = cluster.getNameNode();
       final BlockManager bm = nn.getNamesystem().getBlockManager();
@@ -308,50 +332,56 @@ public class TestDFSStripedOutputStreamWithFailure {
     final DFSStripedOutputStream stripedOut
         = (DFSStripedOutputStream)out.getWrappedStream();
 
-    long oldGS = -1;
-    boolean killed = false;
+    long firstGS = -1;  // first GS of this block group which never proceeds blockRecovery
+    long oldGS = -1; // the old GS before bumping
+    int numKilled=0;
     for(; pos.get() < length; ) {
       final int i = pos.getAndIncrement();
-      if (i == killPos) {
+      if (numKilled < killPos.length &&  i == killPos[numKilled]) {
+        assertTrue(firstGS != -1);
         final long gs = getGenerationStamp(stripedOut);
-        Assert.assertTrue(oldGS != -1);
-        Assert.assertEquals(oldGS, gs);
+        if (numKilled == 0) {
+          assertEquals(firstGS, gs);
+        } else {
+          //TODO: implement hflush/hsync and verify gs strict greater than oldGS
+          assertTrue(gs >= oldGS);
+        }
+        oldGS = gs;
 
         if (tokenExpire) {
           DFSTestUtil.flushInternal(stripedOut);
           waitTokenExpires(out);
         }
 
-        killDatanode(cluster, stripedOut, dnIndex, pos);
-        killed = true;
+        killDatanode(cluster, stripedOut, dnIndex[numKilled], pos);
+        numKilled++;
       }
 
       write(out, i);
 
-      if (i == FLUSH_POS) {
-        oldGS = getGenerationStamp(stripedOut);
+      if (i % BLOCK_GROUP_SIZE == FLUSH_POS) {
+        firstGS = getGenerationStamp(stripedOut);
+        oldGS = firstGS;
       }
     }
     out.close();
+    assertEquals(dnIndex.length, numKilled);
 
     short expectedReported = StripedFileTestUtil.getRealTotalBlockNum(length);
-    if (length > dnIndex * CELL_SIZE || dnIndex >= NUM_DATA_BLOCKS) {
-      expectedReported--;
+    for(int idx :dnIndex) {
+      if (length > idx * CELL_SIZE || idx >= NUM_DATA_BLOCKS) {
+        expectedReported--;
+      }
     }
     DFSTestUtil.waitReplication(dfs, p, expectedReported);
 
-    Assert.assertTrue(killed);
-
-    // check file length
-    final FileStatus status = dfs.getFileStatus(p);
-    Assert.assertEquals(length, status.getLen());
-
-    checkData(dfs, fullPath, length, dnIndex, oldGS);
+    cluster.triggerBlockReports();
+    StripedFileTestUtil.checkData(dfs, p, length, dnIndex, oldGS);
   }
 
   static void write(FSDataOutputStream out, int i) throws IOException {
     try {
-      out.write(getByte(i));
+      out.write(StripedFileTestUtil.getByte(i));
     } catch(IOException ioe) {
       throw new IOException("Failed at i=" + i, ioe);
     }
@@ -359,10 +389,10 @@ public class TestDFSStripedOutputStreamWithFailure {
 
   static long getGenerationStamp(DFSStripedOutputStream out)
       throws IOException {
+    DFSTestUtil.flushBuffer(out);
     final long gs = DFSTestUtil.flushInternal(out).getGenerationStamp();
     LOG.info("getGenerationStamp returns " + gs);
     return gs;
-
   }
 
   static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
@@ -399,106 +429,6 @@ public class TestDFSStripedOutputStreamWithFailure {
     cluster.stopDataNode(datanode.getXferAddr());
   }
 
-  static void checkData(DistributedFileSystem dfs, String src, int length,
-      int killedDnIndex, long oldGS) throws IOException {
-    List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
-    LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(src, 0L);
-    final int expectedNumGroup = (length - 1)/BLOCK_GROUP_SIZE + 1;
-    Assert.assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size());
-
-    for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
-      Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
-
-      final long gs = firstBlock.getBlock().getGenerationStamp();
-      final String s = "gs=" + gs + ", oldGS=" + oldGS;
-      LOG.info(s);
-      Assert.assertTrue(s, gs >= oldGS);
-
-      LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
-          (LocatedStripedBlock) firstBlock,
-          CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
-      blockGroupList.add(Arrays.asList(blocks));
-    }
-
-    // test each block group
-    for (int group = 0; group < blockGroupList.size(); group++) {
-      final boolean isLastGroup = group == blockGroupList.size() - 1;
-      final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE
-          : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE;
-      final int numCellInGroup = (groupSize - 1)/CELL_SIZE + 1;
-      final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS;
-      final int lastCellSize = groupSize - (numCellInGroup - 1)*CELL_SIZE;
-
-      //get the data of this block
-      List<LocatedBlock> blockList = blockGroupList.get(group);
-      byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][];
-      byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][];
-
-      // for each block, use BlockReader to read data
-      for (int i = 0; i < blockList.size(); i++) {
-        final int j = i >= NUM_DATA_BLOCKS? 0: i;
-        final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS
-            + (j <= lastCellIndex? 1: 0);
-        final int blockSize = numCellInBlock*CELL_SIZE
-            + (isLastGroup && j == lastCellIndex? lastCellSize - CELL_SIZE: 0);
-
-        final byte[] blockBytes = new byte[blockSize];
-        if (i < NUM_DATA_BLOCKS) {
-          dataBlockBytes[i] = blockBytes;
-        } else {
-          parityBlockBytes[i - NUM_DATA_BLOCKS] = blockBytes;
-        }
-
-        final LocatedBlock lb = blockList.get(i);
-        LOG.info("i,j=" + i + ", " + j + ", numCellInBlock=" + numCellInBlock
-            + ", blockSize=" + blockSize + ", lb=" + lb);
-        if (lb == null) {
-          continue;
-        }
-        final ExtendedBlock block = lb.getBlock();
-        Assert.assertEquals(blockSize, block.getNumBytes());
-
-
-        if (block.getNumBytes() == 0) {
-          continue;
-        }
-
-        if (i != killedDnIndex) {
-          final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
-              dfs, lb, 0, block.getNumBytes());
-          blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
-          blockReader.close();
-        }
-      }
-
-      // check data
-      final int groupPosInFile = group*BLOCK_GROUP_SIZE;
-      for (int i = 0; i < dataBlockBytes.length; i++) {
-        final byte[] actual = dataBlockBytes[i];
-        for (int posInBlk = 0; posInBlk < actual.length; posInBlk++) {
-          final long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(
-              CELL_SIZE, NUM_DATA_BLOCKS, posInBlk, i) + groupPosInFile;
-          Assert.assertTrue(posInFile < length);
-          final byte expected = getByte(posInFile);
-
-          if (i == killedDnIndex) {
-            actual[posInBlk] = expected;
-          } else {
-            String s = "expected=" + expected + " but actual=" + actual[posInBlk]
-                + ", posInFile=" + posInFile + ", posInBlk=" + posInBlk
-                + ". group=" + group + ", i=" + i;
-            Assert.assertEquals(s, expected, actual[posInBlk]);
-          }
-        }
-      }
-
-      // check parity
-      TestDFSStripedOutputStream.verifyParity(dfs.getConf(),
-          lbs.getLocatedBlocks().get(group).getBlockSize(),
-          CELL_SIZE, dataBlockBytes, parityBlockBytes,
-          killedDnIndex - dataBlockBytes.length);
-    }
-  }
 
   private void waitTokenExpires(FSDataOutputStream out) throws IOException {
     Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(out);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
index c0dca4e..764527d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -39,6 +41,12 @@ public class TestWriteStripedFileWithFailure {
   private static MiniDFSCluster cluster;
   private static FileSystem fs;
   private static Configuration conf = new HdfsConfiguration();
+
+  static {
+    GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
+  }
+
   private final short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS;
   private final short parityBlocks = StripedFileTestUtil.NUM_PARITY_BLOCKS;
   private final int smallFileLength = blockSize * dataBlocks - 123;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 124bf80..ef31527 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -1745,7 +1745,7 @@ public class TestBalancer {
 
       // verify locations of striped blocks
       LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
-      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
 
       // add one datanode
       String newRack = "/rack" + (++numOfRacks);
@@ -1761,7 +1761,7 @@ public class TestBalancer {
 
       // verify locations of striped blocks
       locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
-      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
     } finally {
       cluster.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index 3a9748f..7cf5656 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -488,7 +488,7 @@ public class TestMover {
           Assert.assertEquals(StorageType.DISK, type);
         }
       }
-      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
           dataBlocks + parityBlocks);
 
       // start 5 more datanodes
@@ -523,7 +523,7 @@ public class TestMover {
           Assert.assertEquals(StorageType.ARCHIVE, type);
         }
       }
-      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
           dataBlocks + parityBlocks);
 
     }finally{

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
index 64d33a4..abcdbc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
@@ -42,7 +41,6 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class TestAddOverReplicatedStripedBlocks {
 
@@ -64,6 +62,7 @@ public class TestAddOverReplicatedStripedBlocks {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     // disable block recovery
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
     SimulatedFSDataset.setFactory(conf);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     cluster.waitActive();
@@ -118,7 +117,7 @@ public class TestAddOverReplicatedStripedBlocks {
     // verify that all internal blocks exists
     lbs = cluster.getNameNodeRpc().getBlockLocations(
         filePath.toString(), 0, fileLen);
-    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
+    StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
   }
 
   @Test
@@ -162,7 +161,7 @@ public class TestAddOverReplicatedStripedBlocks {
     // verify that all internal blocks exists
     lbs = cluster.getNameNodeRpc().getBlockLocations(
         filePath.toString(), 0, fileLen);
-    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
+    StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
   }
 
   @Test
@@ -216,7 +215,7 @@ public class TestAddOverReplicatedStripedBlocks {
     // verify that all internal blocks exists
     lbs = cluster.getNameNodeRpc().getBlockLocations(
         filePath.toString(), 0, fileLen);
-    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
+    StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE);
   }
 
   @Test
@@ -248,6 +247,7 @@ public class TestAddOverReplicatedStripedBlocks {
 
     // update blocksMap
     cluster.triggerBlockReports();
+    Thread.sleep(2000);
     // add to invalidates
     cluster.triggerHeartbeats();
     // datanode delete block
@@ -259,7 +259,7 @@ public class TestAddOverReplicatedStripedBlocks {
     // we are left GROUP_SIZE - 1 blocks.
     lbs = cluster.getNameNodeRpc().getBlockLocations(
         filePath.toString(), 0, fileLen);
-    DFSTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
+    StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
index c27ead5..735f84d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
@@ -736,7 +737,13 @@ public class TestRetryCacheWithHA {
       DatanodeInfo[] newNodes = new DatanodeInfo[2];
       newNodes[0] = nodes[0];
       newNodes[1] = nodes[1];
-      String[] storageIDs = {"s0", "s1"};
+      final DatanodeManager dm = cluster.getNamesystem(0).getBlockManager()
+          .getDatanodeManager();
+      final String storageID1 = dm.getDatanode(newNodes[0]).getStorageInfos()[0]
+          .getStorageID();
+      final String storageID2 = dm.getDatanode(newNodes[1]).getStorageInfos()[0]
+          .getStorageID();
+      String[] storageIDs = {storageID1, storageID2};
       
       client.getNamenode().updatePipeline(client.getClientName(), oldBlock,
           newBlock, newNodes, storageIDs);


[2/2] hadoop git commit: HDFS-9040. Erasure coding: coordinate data streamers in DFSStripedOutputStream. Contributed by Jing Zhao and Walter Su.

Posted by ji...@apache.org.
HDFS-9040. Erasure coding: coordinate data streamers in DFSStripedOutputStream. Contributed by Jing Zhao and Walter Su.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6419900a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6419900a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6419900a

Branch: refs/heads/HDFS-7285
Commit: 6419900ac24a5493827abf9b5d90373bc1043e0b
Parents: c09dc25
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Sep 28 14:40:27 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Sep 28 14:40:27 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/protocol/DatanodeID.java |   2 +
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  62 +-
 .../hadoop/hdfs/DFSStripedOutputStream.java     | 603 ++++++++++++++-----
 .../org/apache/hadoop/hdfs/DataStreamer.java    | 212 +++----
 .../apache/hadoop/hdfs/StripedDataStreamer.java | 342 +++--------
 .../BlockUnderConstructionFeature.java          |  30 +-
 .../server/blockmanagement/DatanodeManager.java |   4 +
 .../hadoop/hdfs/util/StripedBlockUtil.java      |  23 +-
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  31 +-
 .../apache/hadoop/hdfs/StripedFileTestUtil.java | 213 ++++++-
 .../hadoop/hdfs/TestDFSStripedOutputStream.java | 144 +----
 .../TestDFSStripedOutputStreamWithFailure.java  | 300 ++++-----
 .../hdfs/TestWriteStripedFileWithFailure.java   |   8 +
 .../hdfs/server/balancer/TestBalancer.java      |   4 +-
 .../hadoop/hdfs/server/mover/TestMover.java     |   4 +-
 .../TestAddOverReplicatedStripedBlocks.java     |  12 +-
 .../namenode/ha/TestRetryCacheWithHA.java       |   9 +-
 18 files changed, 1068 insertions(+), 938 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
index 6d72285..c709cbd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
@@ -38,6 +38,8 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceStability.Evolving
 public class DatanodeID implements Comparable<DatanodeID> {
   public static final DatanodeID[] EMPTY_ARRAY = {};
+  public static final DatanodeID EMPTY_DATANODE_ID = new DatanodeID("null",
+      "null", "null", 0, 0, 0, 0);
 
   private String ipAddr;     // IP address
   private String hostName;   // hostname claimed by datanode

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index d62dbac..6a01d61 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -450,3 +450,6 @@
 
     HDFS-8882. Erasure Coding: Use datablocks, parityblocks and cell size from
     ErasureCodingPolicy (Vinayakumar B via zhz)
+
+    HDFS-9040. Erasure coding: coordinate data streamers in
+    DFSStripedOutputStream. (jing9 and Walter Su)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 4923c86..e77a00a 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.util.ByteArrayManager;
@@ -212,14 +213,17 @@ public class DFSOutputStream extends FSOutputSummer
   /** Construct a new output stream for creating a file. */
   protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
       EnumSet<CreateFlag> flag, Progressable progress,
-      DataChecksum checksum, String[] favoredNodes) throws IOException {
+      DataChecksum checksum, String[] favoredNodes, boolean createStreamer)
+      throws IOException {
     this(dfsClient, src, progress, stat, checksum);
     this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
 
     computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum);
 
-    streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
-        cachingStrategy, byteArrayManager, favoredNodes);
+    if (createStreamer) {
+      streamer = new DataStreamer(stat, null, dfsClient, src, progress,
+          checksum, cachingStrategy, byteArrayManager, favoredNodes);
+    }
   }
 
   static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
@@ -276,7 +280,7 @@ public class DFSOutputStream extends FSOutputSummer
             flag, progress, checksum, favoredNodes);
       } else {
         out = new DFSOutputStream(dfsClient, src, stat,
-            flag, progress, checksum, favoredNodes);
+            flag, progress, checksum, favoredNodes, true);
       }
       out.start();
       return out;
@@ -476,7 +480,7 @@ public class DFSOutputStream extends FSOutputSummer
    *
    * @throws IOException
    */
-  protected void endBlock() throws IOException {
+  void endBlock() throws IOException {
     if (getStreamer().getBytesCurBlock() == blockSize) {
       setCurrentPacketToEmpty();
       enqueueCurrentPacket();
@@ -921,4 +925,52 @@ public class DFSOutputStream extends FSOutputSummer
   public String toString() {
     return getClass().getSimpleName() + ":" + streamer;
   }
+
+  static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, DFSClient dfsClient,
+      String src, ExtendedBlock prevBlock, long fileId, String[] favoredNodes)
+      throws IOException {
+    final DfsClientConf conf = dfsClient.getConf();
+    int retries = conf.getNumBlockWriteLocateFollowingRetry();
+    long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
+    long localstart = Time.monotonicNow();
+    while (true) {
+      try {
+        return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock,
+            excludedNodes, fileId, favoredNodes);
+      } catch (RemoteException e) {
+        IOException ue = e.unwrapRemoteException(FileNotFoundException.class,
+            AccessControlException.class,
+            NSQuotaExceededException.class,
+            DSQuotaExceededException.class,
+            QuotaByStorageTypeExceededException.class,
+            UnresolvedPathException.class);
+        if (ue != e) {
+          throw ue; // no need to retry these exceptions
+        }
+        if (NotReplicatedYetException.class.getName().equals(e.getClassName())) {
+          if (retries == 0) {
+            throw e;
+          } else {
+            --retries;
+            LOG.info("Exception while adding a block", e);
+            long elapsed = Time.monotonicNow() - localstart;
+            if (elapsed > 5000) {
+              LOG.info("Waiting for replication for " + (elapsed / 1000)
+                  + " seconds");
+            }
+            try {
+              LOG.warn("NotReplicatedYetException sleeping " + src
+                  + " retries left " + retries);
+              Thread.sleep(sleeptime);
+              sleeptime *= 2;
+            } catch (InterruptedException ie) {
+              LOG.warn("Caught exception", ie);
+            }
+          }
+        } else {
+          throw e;
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index d3a054a..c145a2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -25,23 +25,34 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Time;
 import org.apache.htrace.Sampler;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
@@ -59,23 +70,11 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     private final List<BlockingQueue<T>> queues;
 
     MultipleBlockingQueue(int numQueue, int queueSize) {
-      queues = new ArrayList<>(numQueue);
+      List<BlockingQueue<T>> list = new ArrayList<>(numQueue);
       for (int i = 0; i < numQueue; i++) {
-        queues.add(new LinkedBlockingQueue<T>(queueSize));
+        list.add(new LinkedBlockingQueue<T>(queueSize));
       }
-    }
-
-    boolean isEmpty() {
-      for(int i = 0; i < queues.size(); i++) {
-        if (!queues.get(i).isEmpty()) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    int numQueues() {
-      return queues.size();
+      queues = Collections.synchronizedList(list);
     }
 
     void offer(int i, T object) {
@@ -92,6 +91,14 @@ public class DFSStripedOutputStream extends DFSOutputStream {
       }
     }
 
+    T takeWithTimeout(int i) throws InterruptedIOException {
+      try {
+        return queues.get(i).poll(100, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        throw DFSUtil.toInterruptedIOException("take interrupted, i=" + i, e);
+      }
+    }
+
     T poll(int i) {
       return queues.get(i).poll();
     }
@@ -99,23 +106,44 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     T peek(int i) {
       return queues.get(i).peek();
     }
+
+    void clear() {
+      for (BlockingQueue<T> q : queues) {
+        q.clear();
+      }
+    }
   }
 
   /** Coordinate the communication between the streamers. */
-  class Coordinator {
+  static class Coordinator {
+    /**
+     * The next internal block to write to for each streamers. The
+     * DFSStripedOutputStream makes the {@link ClientProtocol#addBlock} RPC to
+     * get a new block group. The block group is split to internal blocks, which
+     * are then distributed into the queue for streamers to retrieve.
+     */
     private final MultipleBlockingQueue<LocatedBlock> followingBlocks;
+    /**
+     * Used to sync among all the streamers before allocating a new block. The
+     * DFSStripedOutputStream uses this to make sure every streamer has finished
+     * writing the previous block.
+     */
     private final MultipleBlockingQueue<ExtendedBlock> endBlocks;
 
+    /**
+     * The following data structures are used for syncing while handling errors
+     */
     private final MultipleBlockingQueue<LocatedBlock> newBlocks;
-    private final MultipleBlockingQueue<ExtendedBlock> updateBlocks;
+    private final Map<StripedDataStreamer, Boolean> updateStreamerMap;
+    private final MultipleBlockingQueue<Boolean> streamerUpdateResult;
 
-    Coordinator(final DfsClientConf conf, final int numDataBlocks,
-        final int numAllBlocks) {
+    Coordinator(final int numAllBlocks) {
       followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
-      endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1);
-
+      endBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
       newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
-      updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
+      updateStreamerMap = Collections.synchronizedMap(
+          new HashMap<StripedDataStreamer, Boolean>(numAllBlocks));
+      streamerUpdateResult = new MultipleBlockingQueue<>(numAllBlocks, 1);
     }
 
     MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() {
@@ -126,68 +154,28 @@ public class DFSStripedOutputStream extends DFSOutputStream {
       return newBlocks;
     }
 
-    MultipleBlockingQueue<ExtendedBlock> getUpdateBlocks() {
-      return updateBlocks;
-    }
-
-    StripedDataStreamer getStripedDataStreamer(int i) {
-      return DFSStripedOutputStream.this.getStripedDataStreamer(i);
-    }
-
     void offerEndBlock(int i, ExtendedBlock block) {
       endBlocks.offer(i, block);
     }
 
-    ExtendedBlock takeEndBlock(int i) throws InterruptedIOException {
-      return endBlocks.take(i);
+    void offerStreamerUpdateResult(int i, boolean success) {
+      streamerUpdateResult.offer(i, success);
     }
 
-    boolean hasAllEndBlocks() {
-      for(int i = 0; i < endBlocks.numQueues(); i++) {
-        if (endBlocks.peek(i) == null) {
-          return false;
-        }
-      }
-      return true;
+    boolean takeStreamerUpdateResult(int i) throws InterruptedIOException {
+      return streamerUpdateResult.take(i);
     }
 
-    void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) {
-      ExtendedBlock b = endBlocks.peek(i);
-      if (b == null) {
-        // streamer just has failed, put end block and continue
-        b = block;
-        offerEndBlock(i, b);
-      }
-      b.setNumBytes(newBytes);
+    void updateStreamer(StripedDataStreamer streamer,
+        boolean success) {
+      assert !updateStreamerMap.containsKey(streamer);
+      updateStreamerMap.put(streamer, success);
     }
 
-    /** @return a block representing the entire block group. */
-    ExtendedBlock getBlockGroup() {
-      final StripedDataStreamer s0 = getStripedDataStreamer(0);
-      final ExtendedBlock b0 = s0.getBlock();
-      if (b0 == null) {
-        return null;
-      }
-
-      final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0;
-
-      final ExtendedBlock block = new ExtendedBlock(b0);
-      long numBytes = atBlockGroupBoundary? b0.getNumBytes(): s0.getBytesCurBlock();
-      for (int i = 1; i < numAllBlocks; i++) {
-        final StripedDataStreamer si = getStripedDataStreamer(i);
-        final ExtendedBlock bi = si.getBlock();
-        if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) {
-          block.setGenerationStamp(bi.getGenerationStamp());
-        }
-        if (i < numDataBlocks) {
-          numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock();
-        }
-      }
-      block.setNumBytes(numBytes);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("getBlockGroup: " + block + ", numBytes=" + block.getNumBytes());
-      }
-      return block;
+    void clearFailureStates() {
+      newBlocks.clear();
+      updateStreamerMap.clear();
+      streamerUpdateResult.clear();
     }
   }
 
@@ -263,18 +251,16 @@ public class DFSStripedOutputStream extends DFSOutputStream {
   private final int cellSize;
   private final int numAllBlocks;
   private final int numDataBlocks;
-
-  @Override
-  ExtendedBlock getBlock() {
-    return coordinator.getBlockGroup();
-  }
+  private ExtendedBlock currentBlockGroup;
+  private final String[] favoredNodes;
+  private final List<StripedDataStreamer> failedStreamers;
 
   /** Construct a new output stream for creating a file. */
   DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
                          EnumSet<CreateFlag> flag, Progressable progress,
                          DataChecksum checksum, String[] favoredNodes)
                          throws IOException {
-    super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
+    super(dfsClient, src, stat, flag, progress, checksum, favoredNodes, false);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Creating DFSStripedOutputStream for " + src);
     }
@@ -284,12 +270,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     cellSize = ecPolicy.getCellSize();
     numDataBlocks = ecPolicy.getNumDataUnits();
     numAllBlocks = numDataBlocks + numParityBlocks;
+    this.favoredNodes = favoredNodes;
+    failedStreamers = new ArrayList<>();
 
     encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(),
         numDataBlocks, numParityBlocks);
 
-    coordinator = new Coordinator(dfsClient.getConf(),
-        numDataBlocks, numAllBlocks);
+    coordinator = new Coordinator(numAllBlocks);
     try {
       cellBuffers = new CellBuffers(numParityBlocks);
     } catch (InterruptedException ie) {
@@ -297,14 +284,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
           "Failed to create cell buffers", ie);
     }
 
-    List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks);
+    streamers = new ArrayList<>(numAllBlocks);
     for (short i = 0; i < numAllBlocks; i++) {
       StripedDataStreamer streamer = new StripedDataStreamer(stat,
           dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
           favoredNodes, i, coordinator);
-      s.add(streamer);
+      streamers.add(streamer);
     }
-    streamers = Collections.unmodifiableList(s);
     currentPackets = new DFSPacket[streamers.size()];
     setCurrentStreamer(0);
   }
@@ -318,17 +304,19 @@ public class DFSStripedOutputStream extends DFSOutputStream {
   }
 
   private synchronized StripedDataStreamer getCurrentStreamer() {
-    return (StripedDataStreamer)streamer;
+    return (StripedDataStreamer) streamer;
   }
 
   private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) {
     // backup currentPacket for current streamer
-    int oldIdx = streamers.indexOf(streamer);
-    if (oldIdx >= 0) {
-      currentPackets[oldIdx] = currentPacket;
+    if (streamer != null) {
+      int oldIdx = streamers.indexOf(getCurrentStreamer());
+      if (oldIdx >= 0) {
+        currentPackets[oldIdx] = currentPacket;
+      }
     }
 
-    streamer = streamers.get(newIdx);
+    streamer = getStripedDataStreamer(newIdx);
     currentPacket = currentPackets[newIdx];
     adjustChunkBoundary();
 
@@ -350,40 +338,127 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     encoder.encode(dataBuffers, parityBuffers);
   }
 
-
-  private void checkStreamers(boolean setExternalError) throws IOException {
-    int count = 0;
+  /**
+   * check all the existing StripedDataStreamer and find newly failed streamers.
+   * @return The newly failed streamers.
+   * @throws IOException if less than {@link #numDataBlocks} streamers are still
+   *                     healthy.
+   */
+  private Set<StripedDataStreamer> checkStreamers() throws IOException {
+    Set<StripedDataStreamer> newFailed = new HashSet<>();
     for(StripedDataStreamer s : streamers) {
-      if (!s.isFailed()) {
-        if (setExternalError && s.getBlock() != null) {
-          s.getErrorState().initExternalError();
-        }
-        count++;
+      if (!s.isHealthy() && !failedStreamers.contains(s)) {
+        newFailed.add(s);
       }
     }
+
+    final int failCount = failedStreamers.size() + newFailed.size();
     if (LOG.isDebugEnabled()) {
       LOG.debug("checkStreamers: " + streamers);
-      LOG.debug("count=" + count);
+      LOG.debug("healthy streamer count=" + (numAllBlocks - failCount));
+      LOG.debug("original failed streamers: " + failedStreamers);
+      LOG.debug("newly failed streamers: " + newFailed);
     }
-    if (count < numDataBlocks) {
-      throw new IOException("Failed: the number of remaining blocks = "
-          + count + " < the number of data blocks = " + numDataBlocks);
+    if (failCount > (numAllBlocks - numDataBlocks)) {
+      throw new IOException("Failed: the number of failed blocks = "
+          + failCount + " > the number of data blocks = "
+          + (numAllBlocks - numDataBlocks));
     }
+    return newFailed;
   }
 
   private void handleStreamerFailure(String err, Exception e)
       throws IOException {
-    handleStreamerFailure(err, e, true);
-  }
-
-  private void handleStreamerFailure(String err, Exception e,
-      boolean setExternalError) throws IOException {
     LOG.warn("Failed: " + err + ", " + this, e);
-    getCurrentStreamer().setFailed(true);
-    checkStreamers(setExternalError);
+    getCurrentStreamer().getErrorState().setInternalError();
+    getCurrentStreamer().close(true);
+    checkStreamers();
     currentPacket = null;
   }
 
+  private void replaceFailedStreamers() {
+    assert streamers.size() == numAllBlocks;
+    for (short i = 0; i < numAllBlocks; i++) {
+      final StripedDataStreamer oldStreamer = getStripedDataStreamer(i);
+      if (!oldStreamer.isHealthy()) {
+        StripedDataStreamer streamer = new StripedDataStreamer(oldStreamer.stat,
+            dfsClient, src, oldStreamer.progress,
+            oldStreamer.checksum4WriteBlock, cachingStrategy, byteArrayManager,
+            favoredNodes, i, coordinator);
+        streamers.set(i, streamer);
+        currentPackets[i] = null;
+        if (i == 0) {
+          this.streamer = streamer;
+        }
+        streamer.start();
+      }
+    }
+  }
+
+  private void waitEndBlocks(int i) throws IOException {
+    while (getStripedDataStreamer(i).isHealthy()) {
+      final ExtendedBlock b = coordinator.endBlocks.takeWithTimeout(i);
+      if (b != null) {
+        StripedBlockUtil.checkBlocks(currentBlockGroup, i, b);
+        return;
+      }
+    }
+  }
+
+  private void allocateNewBlock() throws IOException {
+    if (currentBlockGroup != null) {
+      for (int i = 0; i < numAllBlocks; i++) {
+        // sync all the healthy streamers before writing to the new block
+        waitEndBlocks(i);
+      }
+    }
+    failedStreamers.clear();
+    // replace failed streamers
+    replaceFailedStreamers();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Allocating new block group. The previous block group: "
+          + currentBlockGroup);
+    }
+
+    // TODO collect excludedNodes from all the data streamers
+    final LocatedBlock lb = addBlock(null, dfsClient, src, currentBlockGroup,
+        fileId, favoredNodes);
+    assert lb.isStriped();
+    if (lb.getLocations().length < numDataBlocks) {
+      throw new IOException("Failed to get " + numDataBlocks
+          + " nodes from namenode: blockGroupSize= " + numAllBlocks
+          + ", blocks.length= " + lb.getLocations().length);
+    }
+    // assign the new block to the current block group
+    currentBlockGroup = lb.getBlock();
+
+    final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
+        (LocatedStripedBlock) lb, cellSize, numDataBlocks,
+        numAllBlocks - numDataBlocks);
+    for (int i = 0; i < blocks.length; i++) {
+      StripedDataStreamer si = getStripedDataStreamer(i);
+      if (si.isHealthy()) { // skipping failed data streamer
+        if (blocks[i] == null) {
+          // Set exception and close streamer as there is no block locations
+          // found for the parity block.
+          LOG.warn("Failed to get block location for parity block, index=" + i);
+          si.getLastException().set(
+              new IOException("Failed to get following block, i=" + i));
+          si.getErrorState().setInternalError();
+          si.close(true);
+        } else {
+          coordinator.getFollowingBlocks().offer(i, blocks[i]);
+        }
+      }
+    }
+  }
+
+  private boolean shouldEndBlockGroup() {
+    return currentBlockGroup != null &&
+        currentBlockGroup.getNumBytes() == blockSize * numDataBlocks;
+  }
+
   @Override
   protected synchronized void writeChunk(byte[] bytes, int offset, int len,
       byte[] checksum, int ckoff, int cklen) throws IOException {
@@ -392,8 +467,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     final int pos = cellBuffers.addTo(index, bytes, offset, len);
     final boolean cellFull = pos == cellSize;
 
-    final long oldBytes = current.getBytesCurBlock();
-    if (!current.isFailed()) {
+    if (currentBlockGroup == null || shouldEndBlockGroup()) {
+      // the incoming data should belong to a new block. Allocate a new block.
+      allocateNewBlock();
+    }
+
+    currentBlockGroup.setNumBytes(currentBlockGroup.getNumBytes() + len);
+    if (current.isHealthy()) {
       try {
         super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
       } catch(Exception e) {
@@ -401,12 +481,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
       }
     }
 
-    if (current.isFailed()) {
-      final long newBytes = oldBytes + len;
-      coordinator.setBytesEndBlock(index, newBytes, current.getBlock());
-      current.setBytesCurBlock(newBytes);
-    }
-
     // Two extra steps are needed when a striping cell is full:
     // 1. Forward the current index pointer
     // 2. Generate parity packets if a full stripe of data cells are present
@@ -419,11 +493,209 @@ public class DFSStripedOutputStream extends DFSOutputStream {
         cellBuffers.flipDataBuffers();
         writeParityCells();
         next = 0;
+        // check failure state for all the streamers. Bump GS if necessary
+        checkStreamerFailures();
+
+        // if this is the end of the block group, end each internal block
+        if (shouldEndBlockGroup()) {
+          for (int i = 0; i < numAllBlocks; i++) {
+            final StripedDataStreamer s = setCurrentStreamer(i);
+            if (s.isHealthy()) {
+              try {
+                endBlock();
+              } catch (IOException ignored) {}
+            }
+          }
+        }
       }
       setCurrentStreamer(next);
     }
   }
 
+  @Override
+  void enqueueCurrentPacketFull() throws IOException {
+    LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
+            + " appendChunk={}, {}", currentPacket, src, getStreamer()
+            .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
+        getStreamer());
+    enqueueCurrentPacket();
+    adjustChunkBoundary();
+    // no need to end block here
+  }
+
+  private Set<StripedDataStreamer> markExternalErrorOnStreamers() {
+    Set<StripedDataStreamer> healthySet = new HashSet<>();
+    for (StripedDataStreamer streamer : streamers) {
+      if (streamer.isHealthy() &&
+          streamer.getStage() == BlockConstructionStage.DATA_STREAMING) {
+        streamer.setExternalError();
+        healthySet.add(streamer);
+      }
+    }
+    return healthySet;
+  }
+
+  /**
+   * Check and handle data streamer failures. This is called only when we have
+   * written a full stripe (i.e., enqueue all packets for a full stripe), or
+   * when we're closing the outputstream.
+   */
+  private void checkStreamerFailures() throws IOException {
+    Set<StripedDataStreamer> newFailed = checkStreamers();
+    if (newFailed.size() > 0) {
+      // for healthy streamers, wait till all of them have fetched the new block
+      // and flushed out all the enqueued packets.
+      flushAllInternals();
+    }
+    // get all the current failed streamers after the flush
+    newFailed = checkStreamers();
+    while (newFailed.size() > 0) {
+      failedStreamers.addAll(newFailed);
+      coordinator.clearFailureStates();
+
+      // mark all the healthy streamers as external error
+      Set<StripedDataStreamer> healthySet = markExternalErrorOnStreamers();
+
+      // we have newly failed streamers, update block for pipeline
+      final ExtendedBlock newBG = updateBlockForPipeline(healthySet);
+
+      // wait till all the healthy streamers to
+      // 1) get the updated block info
+      // 2) create new block outputstream
+      newFailed = waitCreatingNewStreams(healthySet);
+      if (newFailed.size() + failedStreamers.size() >
+          numAllBlocks - numDataBlocks) {
+        throw new IOException(
+            "Data streamers failed while creating new block streams: "
+                + newFailed + ". There are not enough healthy streamers.");
+      }
+      for (StripedDataStreamer failedStreamer : newFailed) {
+        assert !failedStreamer.isHealthy();
+      }
+
+      // TODO we can also succeed if all the failed streamers have not taken
+      // the updated block
+      if (newFailed.size() == 0) {
+        // reset external error state of all the streamers
+        for (StripedDataStreamer streamer : healthySet) {
+          assert streamer.isHealthy();
+          streamer.getErrorState().reset();
+        }
+        updatePipeline(newBG);
+      }
+      for (int i = 0; i < numAllBlocks; i++) {
+        coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0);
+      }
+    }
+  }
+
+  private int checkStreamerUpdates(Set<StripedDataStreamer> failed,
+      Set<StripedDataStreamer> streamers) {
+    for (StripedDataStreamer streamer : streamers) {
+      if (!coordinator.updateStreamerMap.containsKey(streamer)) {
+        if (!streamer.isHealthy() &&
+            coordinator.getNewBlocks().peek(streamer.getIndex()) != null) {
+          // this streamer had internal error before getting updated block
+          failed.add(streamer);
+        }
+      }
+    }
+    return coordinator.updateStreamerMap.size() + failed.size();
+  }
+
+  private Set<StripedDataStreamer> waitCreatingNewStreams(
+      Set<StripedDataStreamer> healthyStreamers) throws IOException {
+    Set<StripedDataStreamer> failed = new HashSet<>();
+    final int expectedNum = healthyStreamers.size();
+    final long socketTimeout = dfsClient.getConf().getSocketTimeout();
+    // the total wait time should be less than the socket timeout, otherwise
+    // a slow streamer may cause other streamers to timeout. here we wait for
+    // half of the socket timeout
+    long remaingTime = socketTimeout > 0 ? socketTimeout/2 : Long.MAX_VALUE;
+    final long waitInterval = 1000;
+    synchronized (coordinator) {
+      while (checkStreamerUpdates(failed, healthyStreamers) < expectedNum
+          && remaingTime > 0) {
+        try {
+          long start = Time.monotonicNow();
+          coordinator.wait(waitInterval);
+          remaingTime -= Time.monotonicNow() - start;
+        } catch (InterruptedException e) {
+          throw DFSUtil.toInterruptedIOException("Interrupted when waiting" +
+              " for results of updating striped streamers", e);
+        }
+      }
+    }
+    synchronized (coordinator) {
+      for (StripedDataStreamer streamer : healthyStreamers) {
+        if (!coordinator.updateStreamerMap.containsKey(streamer)) {
+          // close the streamer if it is too slow to create new connection
+          streamer.setStreamerAsClosed();
+          failed.add(streamer);
+        }
+      }
+    }
+    for (Map.Entry<StripedDataStreamer, Boolean> entry :
+        coordinator.updateStreamerMap.entrySet()) {
+      if (!entry.getValue()) {
+        failed.add(entry.getKey());
+      }
+    }
+    for (StripedDataStreamer failedStreamer : failed) {
+      healthyStreamers.remove(failedStreamer);
+    }
+    return failed;
+  }
+
+  /**
+   * Call {@link ClientProtocol#updateBlockForPipeline} and assign updated block
+   * to healthy streamers.
+   * @param healthyStreamers The healthy data streamers. These streamers join
+   *                         the failure handling.
+   */
+  private ExtendedBlock updateBlockForPipeline(
+      Set<StripedDataStreamer> healthyStreamers) throws IOException {
+    final LocatedBlock updated = dfsClient.namenode.updateBlockForPipeline(
+        currentBlockGroup, dfsClient.clientName);
+    final long newGS = updated.getBlock().getGenerationStamp();
+    ExtendedBlock newBlock = new ExtendedBlock(currentBlockGroup);
+    newBlock.setGenerationStamp(newGS);
+    final LocatedBlock[] updatedBlks = StripedBlockUtil.parseStripedBlockGroup(
+        (LocatedStripedBlock) updated, cellSize, numDataBlocks,
+        numAllBlocks - numDataBlocks);
+
+    for (int i = 0; i < numAllBlocks; i++) {
+      StripedDataStreamer si = getStripedDataStreamer(i);
+      if (healthyStreamers.contains(si)) {
+        final LocatedBlock lb = new LocatedBlock(new ExtendedBlock(newBlock),
+            null, null, null, -1, updated.isCorrupt(), null);
+        lb.setBlockToken(updatedBlks[i].getBlockToken());
+        coordinator.getNewBlocks().offer(i, lb);
+      }
+    }
+    return newBlock;
+  }
+
+  private void updatePipeline(ExtendedBlock newBG) throws IOException {
+    final DatanodeInfo[] newNodes = new DatanodeInfo[numAllBlocks];
+    final String[] newStorageIDs = new String[numAllBlocks];
+    for (int i = 0; i < numAllBlocks; i++) {
+      final StripedDataStreamer streamer = getStripedDataStreamer(i);
+      final DatanodeInfo[] nodes = streamer.getNodes();
+      final String[] storageIDs = streamer.getStorageIDs();
+      if (streamer.isHealthy() && nodes != null && storageIDs != null) {
+        newNodes[i] = nodes[0];
+        newStorageIDs[i] = storageIDs[0];
+      } else {
+        newNodes[i] = new DatanodeInfo(DatanodeID.EMPTY_DATANODE_ID);
+        newStorageIDs[i] = "";
+      }
+    }
+    dfsClient.namenode.updatePipeline(dfsClient.clientName, currentBlockGroup,
+        newBG, newNodes, newStorageIDs);
+    currentBlockGroup = newBG;
+  }
+
   private int stripeDataSize() {
     return numDataBlocks * cellSize;
   }
@@ -500,28 +772,16 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     }
   }
 
-  /**
-   * Simply add bytesCurBlock together. Note that this result is not accurately
-   * the size of the block group.
-   */
-  private long getCurrentSumBytes() {
-    long sum = 0;
-    for (int i = 0; i < numDataBlocks; i++) {
-      sum += streamers.get(i).getBytesCurBlock();
-    }
-    return sum;
-  }
-
   private boolean generateParityCellsForLastStripe() {
-    final long currentBlockGroupBytes = getCurrentSumBytes();
-    if (currentBlockGroupBytes % stripeDataSize() == 0) {
+    final long currentBlockGroupBytes = currentBlockGroup == null ?
+        0 : currentBlockGroup.getNumBytes();
+    final long lastStripeSize = currentBlockGroupBytes % stripeDataSize();
+    if (lastStripeSize == 0) {
       return false;
     }
 
-    final int firstCellSize =
-        (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize);
-    final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize?
-        firstCellSize : cellSize;
+    final long parityCellSize = lastStripeSize < cellSize?
+        lastStripeSize : cellSize;
     final ByteBuffer[] buffers = cellBuffers.getBuffers();
 
     for (int i = 0; i < numAllBlocks; i++) {
@@ -550,13 +810,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     cellBuffers.clear();
   }
 
-  void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf
-      ) throws IOException {
+  void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf)
+      throws IOException {
     final StripedDataStreamer current = setCurrentStreamer(index);
     final int len = buffer.limit();
 
     final long oldBytes = current.getBytesCurBlock();
-    if (!current.isFailed()) {
+    if (current.isHealthy()) {
       try {
         DataChecksum sum = getDataChecksum();
         sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
@@ -570,18 +830,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
         handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
       }
     }
-
-    if (current.isFailed()) {
-      final long newBytes = oldBytes + len;
-      current.setBytesCurBlock(newBytes);
-    }
   }
 
   @Override
   void setClosed() {
     super.setClosed();
     for (int i = 0; i < numAllBlocks; i++) {
-      streamers.get(i).release();
+      getStripedDataStreamer(i).release();
     }
     cellBuffers.release();
   }
@@ -607,37 +862,40 @@ public class DFSStripedOutputStream extends DFSOutputStream {
 
     try {
       // flush from all upper layers
-      try {
-        flushBuffer();
-      } catch(Exception e) {
-        handleStreamerFailure("flushBuffer " + getCurrentStreamer(), e);
-      }
+      flushBuffer();
       // if the last stripe is incomplete, generate and write parity cells
       if (generateParityCellsForLastStripe()) {
         writeParityCells();
       }
       enqueueAllCurrentPackets();
 
+      // flush all the data packets
+      flushAllInternals();
+      // check failures
+      checkStreamerFailures();
+
       for (int i = 0; i < numAllBlocks; i++) {
         final StripedDataStreamer s = setCurrentStreamer(i);
-        if (!s.isFailed()) {
+        if (s.isHealthy()) {
           try {
             if (s.getBytesCurBlock() > 0) {
               setCurrentPacketToEmpty();
             }
-            // flush all data to Datanode
+            // flush the last "close" packet to Datanode
             flushInternal();
           } catch(Exception e) {
-            handleStreamerFailure("flushInternal " + s, e, false);
+            // TODO for both close and endBlock, we currently do not handle
+            // failures when sending the last packet. We actually do not need to
+            // bump GS for this kind of failure. Thus counting the total number
+            // of failures may be good enough.
           }
         }
       }
 
       closeThreads(false);
-      final ExtendedBlock lastBlock = coordinator.getBlockGroup();
       TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
       try {
-        completeFile(lastBlock);
+        completeFile(currentBlockGroup);
       } finally {
         scope.close();
       }
@@ -652,14 +910,45 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     int idx = streamers.indexOf(getCurrentStreamer());
     for(int i = 0; i < streamers.size(); i++) {
       final StripedDataStreamer si = setCurrentStreamer(i);
-      if (!si.isFailed() && currentPacket != null) {
+      if (si.isHealthy() && currentPacket != null) {
         try {
           enqueueCurrentPacket();
         } catch (IOException e) {
-          handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e, false);
+          handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e);
         }
       }
     }
     setCurrentStreamer(idx);
   }
+
+  void flushAllInternals() throws IOException {
+    int current = getCurrentIndex();
+
+    for (int i = 0; i < numAllBlocks; i++) {
+      final StripedDataStreamer s = setCurrentStreamer(i);
+      if (s.isHealthy()) {
+        try {
+          // flush all data to Datanode
+          flushInternal();
+        } catch(Exception e) {
+          handleStreamerFailure("flushInternal " + s, e);
+        }
+      }
+    }
+    setCurrentStreamer(current);
+  }
+
+  static void sleep(long ms, String op) throws InterruptedIOException {
+    try {
+      Thread.sleep(ms);
+    } catch(InterruptedException ie) {
+      throw DFSUtil.toInterruptedIOException(
+          "Sleep interrupted during " + op, ie);
+    }
+  }
+
+  @Override
+  ExtendedBlock getBlock() {
+    return currentBlockGroup;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index c478f1c..a6eb01f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -22,7 +22,6 @@ import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SU
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
@@ -46,16 +45,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
@@ -69,13 +64,10 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.util.ByteArrayManager;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
@@ -204,9 +196,12 @@ class DataStreamer extends Daemon {
     }
   }
 
+  enum ErrorType {
+    NONE, INTERNAL, EXTERNAL
+  }
+
   static class ErrorState {
-    private boolean error = false;
-    private boolean externalError = false;
+    ErrorType error = ErrorType.NONE;
     private int badNodeIndex = -1;
     private int restartingNodeIndex = -1;
     private long restartingNodeDeadline = 0;
@@ -216,35 +211,47 @@ class DataStreamer extends Daemon {
       this.datanodeRestartTimeout = datanodeRestartTimeout;
     }
 
+    synchronized void resetInternalError() {
+      if (hasInternalError()) {
+        error = ErrorType.NONE;
+      }
+      badNodeIndex = -1;
+      restartingNodeIndex = -1;
+      restartingNodeDeadline = 0;
+    }
+
     synchronized void reset() {
-      error = false;
-      externalError = false;
+      error = ErrorType.NONE;
       badNodeIndex = -1;
       restartingNodeIndex = -1;
       restartingNodeDeadline = 0;
     }
 
-    synchronized boolean hasError() {
-      return error;
+    synchronized boolean hasInternalError() {
+      return error == ErrorType.INTERNAL;
     }
 
-    synchronized boolean hasExternalErrorOnly() {
-      return error && externalError && !isNodeMarked();
+    synchronized boolean hasExternalError() {
+      return error == ErrorType.EXTERNAL;
     }
 
-    synchronized boolean hasDatanodeError() {
-      return error && (isNodeMarked() || externalError);
+    synchronized boolean hasError() {
+      return error != ErrorType.NONE;
     }
 
-    synchronized void setError(boolean err) {
-      this.error = err;
+    synchronized boolean hasDatanodeError() {
+      return error == ErrorType.INTERNAL && isNodeMarked();
     }
 
-    synchronized void initExternalError() {
-      setError(true);
-      this.externalError = true;
+    synchronized void setInternalError() {
+      this.error = ErrorType.INTERNAL;
     }
 
+    synchronized void setExternalError() {
+      if (!hasInternalError()) {
+        this.error = ErrorType.EXTERNAL;
+      }
+    }
 
     synchronized void setBadNodeIndex(int index) {
       this.badNodeIndex = index;
@@ -306,14 +313,14 @@ class DataStreamer extends Daemon {
       }
 
       if (!isRestartingNode()) {
-        error = false;
+        error = ErrorType.NONE;
       }
       badNodeIndex = -1;
     }
 
     synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) {
       if (restartingNodeIndex >= 0) {
-        if (!error) {
+        if (error == ErrorType.NONE) {
           throw new IllegalStateException("error=false while checking" +
               " restarting node deadline");
         }
@@ -345,7 +352,7 @@ class DataStreamer extends Daemon {
 
   private volatile boolean streamerClosed = false;
   protected ExtendedBlock block; // its length is number of bytes acked
-  private Token<BlockTokenIdentifier> accessToken;
+  protected Token<BlockTokenIdentifier> accessToken;
   private DataOutputStream blockStream;
   private DataInputStream blockReplyStream;
   private ResponseProcessor response = null;
@@ -355,7 +362,7 @@ class DataStreamer extends Daemon {
   private final ErrorState errorState;
 
   private BlockConstructionStage stage;  // block construction stage
-  private long bytesSent = 0; // number of bytes that've been sent
+  protected long bytesSent = 0; // number of bytes that've been sent
   private final boolean isLazyPersistFile;
 
   /** Nodes have been used in the pipeline before and have failed. */
@@ -378,13 +385,13 @@ class DataStreamer extends Daemon {
   protected final DFSClient dfsClient;
   protected final String src;
   /** Only for DataTransferProtocol.writeBlock(..) */
-  private final DataChecksum checksum4WriteBlock;
-  private final Progressable progress;
+  final DataChecksum checksum4WriteBlock;
+  final Progressable progress;
   protected final HdfsFileStatus stat;
   // appending to existing partial block
   private volatile boolean appendChunk = false;
   // both dataQueue and ackQueue are protected by dataQueue lock
-  private final LinkedList<DFSPacket> dataQueue = new LinkedList<>();
+  protected final LinkedList<DFSPacket> dataQueue = new LinkedList<>();
   private final LinkedList<DFSPacket> ackQueue = new LinkedList<>();
   private final AtomicReference<CachingStrategy> cachingStrategy;
   private final ByteArrayManager byteArrayManager;
@@ -401,7 +408,7 @@ class DataStreamer extends Daemon {
       CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
   private int lastCongestionBackoffTime;
 
-  private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
+  protected final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
   private final String[] favoredNodes;
 
   private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
@@ -473,6 +480,10 @@ class DataStreamer extends Daemon {
     }
   }
 
+  void setAccessToken(Token<BlockTokenIdentifier> t) {
+    this.accessToken = t;
+  }
+
   private void setPipeline(LocatedBlock lb) {
     setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
   }
@@ -533,7 +544,7 @@ class DataStreamer extends Daemon {
       DFSPacket one;
       try {
         // process datanode IO errors if any
-        boolean doSleep = processDatanodeError();
+        boolean doSleep = processDatanodeOrExternalError();
 
         final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2; 
         synchronized (dataQueue) {
@@ -696,7 +707,7 @@ class DataStreamer extends Daemon {
         }
         lastException.set(e);
         assert !(e instanceof NullPointerException);
-        errorState.setError(true);
+        errorState.setInternalError();
         if (!errorState.isNodeMarked()) {
           // Not a datanode issue
           streamerClosed = true;
@@ -837,6 +848,9 @@ class DataStreamer extends Daemon {
     }
   }
 
+  void setStreamerAsClosed() {
+    streamerClosed = true;
+  }
 
   private void checkClosed() throws IOException {
     if (streamerClosed) {
@@ -857,7 +871,7 @@ class DataStreamer extends Daemon {
     }
   }
 
-  private void closeStream() {
+  void closeStream() {
     final MultipleIOException.Builder b = new MultipleIOException.Builder();
 
     if (blockStream != null) {
@@ -1037,7 +1051,7 @@ class DataStreamer extends Daemon {
         } catch (Exception e) {
           if (!responderClosed) {
             lastException.set(e);
-            errorState.setError(true);
+            errorState.setInternalError();
             errorState.markFirstNodeIfNotMarked();
             synchronized (dataQueue) {
               dataQueue.notifyAll();
@@ -1059,18 +1073,18 @@ class DataStreamer extends Daemon {
     }
   }
 
+  private boolean shouldHandleExternalError(){
+    return errorState.hasExternalError() && blockStream != null;
+  }
+
   /**
    * If this stream has encountered any errors, shutdown threads
    * and mark the stream as closed.
    *
    * @return true if it should sleep for a while after returning.
    */
-  private boolean processDatanodeError() throws IOException {
-    if (!errorState.hasDatanodeError()) {
-      return false;
-    }
-    if (errorState.hasExternalErrorOnly() && block == null) {
-      // block is not yet initialized, handle external error later.
+  private boolean processDatanodeOrExternalError() throws IOException {
+    if (!errorState.hasDatanodeError() && !shouldHandleExternalError()) {
       return false;
     }
     if (response != null) {
@@ -1103,7 +1117,8 @@ class DataStreamer extends Daemon {
         return false;
       }
     }
-    boolean doSleep = setupPipelineForAppendOrRecovery();
+
+    setupPipelineForAppendOrRecovery();
 
     if (!streamerClosed && dfsClient.clientRunning) {
       if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
@@ -1135,7 +1150,7 @@ class DataStreamer extends Daemon {
       }
     }
 
-    return doSleep;
+    return false;
   }
 
   void setHflush() {
@@ -1266,7 +1281,7 @@ class DataStreamer extends Daemon {
    * This happens when a file is appended or data streaming fails
    * It keeps on trying until a pipeline is setup
    */
-  private boolean setupPipelineForAppendOrRecovery() throws IOException {
+  private void setupPipelineForAppendOrRecovery() throws IOException {
     // check number of datanodes
     if (nodes == null || nodes.length == 0) {
       String msg = "Could not get block locations. " + "Source file \""
@@ -1274,19 +1289,23 @@ class DataStreamer extends Daemon {
       LOG.warn(msg);
       lastException.set(new IOException(msg));
       streamerClosed = true;
-      return false;
+      return;
     }
+    setupPipelineInternal(nodes, storageTypes);
+  }
 
+  protected void setupPipelineInternal(DatanodeInfo[] datanodes,
+      StorageType[] nodeStorageTypes) throws IOException {
     boolean success = false;
     long newGS = 0L;
     while (!success && !streamerClosed && dfsClient.clientRunning) {
       if (!handleRestartingDatanode()) {
-        return false;
+        return;
       }
 
-      final boolean isRecovery = errorState.hasError();
+      final boolean isRecovery = errorState.hasInternalError();
       if (!handleBadDatanode()) {
-        return false;
+        return;
       }
 
       handleDatanodeReplacement();
@@ -1307,7 +1326,6 @@ class DataStreamer extends Daemon {
     if (success) {
       block = updatePipeline(newGS);
     }
-    return false; // do not sleep, continue processing
   }
 
   /**
@@ -1315,7 +1333,7 @@ class DataStreamer extends Daemon {
    * This process is repeated until the deadline or the node starts back up.
    * @return true if it should continue.
    */
-  private boolean handleRestartingDatanode() {
+  boolean handleRestartingDatanode() {
     if (errorState.isRestartingNode()) {
       // 4 seconds or the configured deadline period, whichever is shorter.
       // This is the retry interval and recovery will be retried in this
@@ -1338,7 +1356,7 @@ class DataStreamer extends Daemon {
    * Remove bad node from list of nodes if badNodeIndex was set.
    * @return true if it should continue.
    */
-  private boolean handleBadDatanode() {
+  boolean handleBadDatanode() {
     final int badNodeIndex = errorState.getBadNodeIndex();
     if (badNodeIndex >= 0) {
       if (nodes.length <= 1) {
@@ -1388,7 +1406,7 @@ class DataStreamer extends Daemon {
     }
   }
 
-  private void failPacket4Testing() {
+  void failPacket4Testing() {
     if (failPacket) { // for testing
       failPacket = false;
       try {
@@ -1400,13 +1418,8 @@ class DataStreamer extends Daemon {
     }
   }
 
-  LocatedBlock updateBlockForPipeline() throws IOException {
-    return callUpdateBlockForPipeline(block);
-  }
-
-  LocatedBlock callUpdateBlockForPipeline(ExtendedBlock newBlock) throws IOException {
-    return dfsClient.namenode.updateBlockForPipeline(
-        newBlock, dfsClient.clientName);
+  private LocatedBlock updateBlockForPipeline() throws IOException {
+    return dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
   }
 
   static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) {
@@ -1417,18 +1430,12 @@ class DataStreamer extends Daemon {
   /** update pipeline at the namenode */
   ExtendedBlock updatePipeline(long newGS) throws IOException {
     final ExtendedBlock newBlock = newBlock(block, newGS);
-    return callUpdatePipeline(block, newBlock, nodes, storageIDs);
-  }
-
-  ExtendedBlock callUpdatePipeline(ExtendedBlock oldBlock, ExtendedBlock newBlock,
-      DatanodeInfo[] newNodes, String[] newStorageIDs)
-      throws IOException {
-    dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, newBlock,
-        newNodes, newStorageIDs);
+    dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
+        nodes, storageIDs);
     return newBlock;
   }
 
-  int getNumBlockWriteRetry() {
+  private int getNumBlockWriteRetry() {
     return dfsClient.getConf().getNumBlockWriteRetry();
   }
 
@@ -1438,7 +1445,7 @@ class DataStreamer extends Daemon {
    * Must get block ID and the IDs of the destinations from the namenode.
    * Returns the list of target datanodes.
    */
-  private LocatedBlock nextBlockOutputStream() throws IOException {
+  protected LocatedBlock nextBlockOutputStream() throws IOException {
     LocatedBlock lb = null;
     DatanodeInfo[] nodes = null;
     StorageType[] storageTypes = null;
@@ -1446,9 +1453,8 @@ class DataStreamer extends Daemon {
     boolean success = false;
     ExtendedBlock oldBlock = block;
     do {
-      errorState.reset();
+      errorState.resetInternalError();
       lastException.clear();
-      success = false;
 
       DatanodeInfo[] excluded =
           excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
@@ -1488,7 +1494,7 @@ class DataStreamer extends Daemon {
   // connects to the first datanode in the pipeline
   // Returns true if success, otherwise return failure.
   //
-  private boolean createBlockOutputStream(DatanodeInfo[] nodes,
+  boolean createBlockOutputStream(DatanodeInfo[] nodes,
       StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
     if (nodes.length == 0) {
       LOG.info("nodes are empty for write pipeline of " + block);
@@ -1567,7 +1573,7 @@ class DataStreamer extends Daemon {
         assert null == blockStream : "Previous blockStream unclosed";
         blockStream = out;
         result =  true; // success
-        errorState.reset();
+        errorState.resetInternalError();
       } catch (IOException ie) {
         if (!errorState.isRestartingNode()) {
           LOG.info("Exception in createBlockOutputStream " + this, ie);
@@ -1603,7 +1609,7 @@ class DataStreamer extends Daemon {
         if (checkRestart && shouldWaitForRestart(i)) {
           errorState.initRestartingNode(i, "Datanode " + i + " is restarting: " + nodes[i]);
         }
-        errorState.setError(true);
+        errorState.setInternalError();
         lastException.set(ie);
         result =  false;  // error
       } finally {
@@ -1645,58 +1651,10 @@ class DataStreamer extends Daemon {
     }
   }
 
-  LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
+  private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
       throws IOException {
-    final DfsClientConf conf = dfsClient.getConf(); 
-    int retries = conf.getNumBlockWriteLocateFollowingRetry();
-    long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
-    while (true) {
-      long localstart = Time.monotonicNow();
-      while (true) {
-        try {
-          return dfsClient.namenode.addBlock(src, dfsClient.clientName,
-              block, excludedNodes, stat.getFileId(), favoredNodes);
-        } catch (RemoteException e) {
-          IOException ue =
-              e.unwrapRemoteException(FileNotFoundException.class,
-                  AccessControlException.class,
-                  NSQuotaExceededException.class,
-                  DSQuotaExceededException.class,
-                  QuotaByStorageTypeExceededException.class,
-                  UnresolvedPathException.class);
-          if (ue != e) {
-            throw ue; // no need to retry these exceptions
-          }
-
-
-          if (NotReplicatedYetException.class.getName().
-              equals(e.getClassName())) {
-            if (retries == 0) {
-              throw e;
-            } else {
-              --retries;
-              LOG.info("Exception while adding a block", e);
-              long elapsed = Time.monotonicNow() - localstart;
-              if (elapsed > 5000) {
-                LOG.info("Waiting for replication for "
-                    + (elapsed / 1000) + " seconds");
-              }
-              try {
-                LOG.warn("NotReplicatedYetException sleeping " + src
-                    + " retries left " + retries);
-                Thread.sleep(sleeptime);
-                sleeptime *= 2;
-              } catch (InterruptedException ie) {
-                LOG.warn("Caught exception", ie);
-              }
-            }
-          } else {
-            throw e;
-          }
-
-        }
-      }
-    }
+    return DFSOutputStream.addBlock(excludedNodes, dfsClient, src, block,
+        stat.getFileId(), favoredNodes);
   }
 
   /**
@@ -1755,6 +1713,10 @@ class DataStreamer extends Daemon {
     return storageIDs;
   }
 
+  BlockConstructionStage getStage() {
+    return stage;
+  }
+
   /**
    * return the token of the block
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index 2f83f7c..a313ecb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -19,18 +19,15 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator;
-import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.util.ByteArrayManager;
-import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
@@ -46,66 +43,8 @@ import com.google.common.annotations.VisibleForTesting;
  * other streamers.
  */
 public class StripedDataStreamer extends DataStreamer {
-  /**
-   * This class is designed for multiple threads to share a
-   * {@link MultipleBlockingQueue}. Initially, the queue is empty. The earliest
-   * thread calling poll populates entries to the queue and the other threads
-   * will wait for it. Once the entries are populated, all the threads can poll
-   * their entries.
-   *
-   * @param <T> the queue entry type.
-   */
-  static abstract class ConcurrentPoll<T> {
-    final MultipleBlockingQueue<T> queue;
-
-    ConcurrentPoll(MultipleBlockingQueue<T> queue) {
-      this.queue = queue;
-    }
-
-    T poll(final int i) throws IOException {
-      for(;;) {
-        synchronized(queue) {
-          final T polled = queue.poll(i);
-          if (polled != null) { // already populated; return polled item.
-            return polled;
-          }
-          if (isReady2Populate()) {
-            try {
-              populate();
-              return queue.poll(i);
-            } catch(IOException ioe) {
-              LOG.warn("Failed to populate, " + this, ioe);
-              throw ioe;
-            }
-          }
-        }
-
-        // sleep and then retry.
-        sleep(100, "poll");
-      }
-    }
-
-    boolean isReady2Populate() {
-      return queue.isEmpty();
-    }
-
-    abstract void populate() throws IOException;
-  }
-
-  private static void sleep(long ms, String op) throws InterruptedIOException {
-    try {
-      Thread.sleep(ms);
-    } catch(InterruptedException ie) {
-      throw DFSUtil.toInterruptedIOException(
-          "Sleep interrupted during " + op, ie);
-    }
-  }
-
   private final Coordinator coordinator;
   private final int index;
-  private volatile boolean failed;
-  private final ECSchema schema;
-  private final int cellSize;
 
   StripedDataStreamer(HdfsFileStatus stat,
                       DFSClient dfsClient, String src,
@@ -117,102 +56,59 @@ public class StripedDataStreamer extends DataStreamer {
         byteArrayManage, favoredNodes);
     this.index = index;
     this.coordinator = coordinator;
-    this.schema = stat.getErasureCodingPolicy().getSchema();
-    this.cellSize = stat.getErasureCodingPolicy().getCellSize();
   }
 
   int getIndex() {
     return index;
   }
 
-  void setFailed(boolean failed) {
-    this.failed = failed;
-  }
-
-  boolean isFailed() {
-    return failed;
-  }
-
-  private boolean isParityStreamer() {
-    return index >= schema.getNumDataUnits();
+  boolean isHealthy() {
+    return !streamerClosed() && !getErrorState().hasInternalError();
   }
 
   @Override
   protected void endBlock() {
-    if (!isParityStreamer()) {
-      coordinator.offerEndBlock(index, block);
-    }
+    coordinator.offerEndBlock(index, block);
     super.endBlock();
   }
 
-  @Override
-  int getNumBlockWriteRetry() {
-    return 0;
+  /**
+   * The upper level DFSStripedOutputStream will allocate the new block group.
+   * All the striped data streamer only needs to fetch from the queue, which
+   * should be already be ready.
+   */
+  private LocatedBlock getFollowingBlock() throws IOException {
+    if (!this.isHealthy()) {
+      // No internal block for this streamer, maybe no enough healthy DN.
+      // Throw the exception which has been set by the StripedOutputStream.
+      this.getLastException().check(false);
+    }
+    return coordinator.getFollowingBlocks().poll(index);
   }
 
   @Override
-  LocatedBlock locateFollowingBlock(final DatanodeInfo[] excludedNodes)
-      throws IOException {
-    return new ConcurrentPoll<LocatedBlock>(coordinator.getFollowingBlocks()) {
-      @Override
-      boolean isReady2Populate() {
-        return super.isReady2Populate()
-            && (block == null || coordinator.hasAllEndBlocks());
-      }
-
-      @Override
-      void populate() throws IOException {
-        getLastException().check(false);
-
-        if (block != null) {
-          // set numByte for the previous block group
-          long bytes = 0;
-          for (int i = 0; i < schema.getNumDataUnits(); i++) {
-            final ExtendedBlock b = coordinator.takeEndBlock(i);
-            StripedBlockUtil.checkBlocks(index, block, i, b);
-            bytes += b.getNumBytes();
-          }
-          block.setNumBytes(bytes);
-          block.setBlockId(block.getBlockId() - index);
-        }
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("locateFollowingBlock: index=" + index + ", block=" + block);
-        }
-
-        final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock(
-            excludedNodes);
-        if (lb.getLocations().length < schema.getNumDataUnits()) {
-          throw new IOException(
-              "Failed to get datablocks number of nodes from namenode: blockGroupSize= "
-                  + (schema.getNumDataUnits() + schema.getNumParityUnits())
-                  + ", blocks.length= " + lb.getLocations().length);
-        }
-        final LocatedBlock[] blocks =
-            StripedBlockUtil.parseStripedBlockGroup((LocatedStripedBlock) lb,
-                cellSize, schema.getNumDataUnits(), schema.getNumParityUnits());
-
-        for (int i = 0; i < blocks.length; i++) {
-          StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
-          if (si.isFailed()) {
-            continue; // skipping failed data streamer
-          }
-          if (blocks[i] == null) {
-            // Set exception and close streamer as there is no block locations
-            // found for the parity block.
-            LOG.warn("Failed to get block location for parity block, index="
-                + i);
-            si.getLastException().set(
-                new IOException("Failed to get following block, i=" + i));
-            si.setFailed(true);
-            si.endBlock();
-            si.close(true);
-          } else {
-            queue.offer(i, blocks[i]);
-          }
-        }
-      }
-    }.poll(index);
+  protected LocatedBlock nextBlockOutputStream() throws IOException {
+    boolean success;
+    LocatedBlock lb = getFollowingBlock();
+    block = lb.getBlock();
+    block.setNumBytes(0);
+    bytesSent = 0;
+    accessToken = lb.getBlockToken();
+
+    DatanodeInfo[] nodes = lb.getLocations();
+    StorageType[] storageTypes = lb.getStorageTypes();
+
+    // Connect to the DataNode. If fail the internal error state will be set.
+    success = createBlockOutputStream(nodes, storageTypes, 0L, false);
+
+    if (!success) {
+      block = null;
+      final DatanodeInfo badNode = nodes[getErrorState().getBadNodeIndex()];
+      LOG.info("Excluding datanode " + badNode);
+      excludedNodes.put(badNode, badNode);
+      throw new IOException("Unable to create new block.");
+    }
+    return lb;
   }
 
   @VisibleForTesting
@@ -221,119 +117,71 @@ public class StripedDataStreamer extends DataStreamer {
   }
 
   @Override
-  LocatedBlock updateBlockForPipeline() throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("updateBlockForPipeline(), " + this);
-    }
-    return new ConcurrentPoll<LocatedBlock>(coordinator.getNewBlocks()) {
-      @Override
-      void populate() throws IOException {
-        final ExtendedBlock bg = coordinator.getBlockGroup();
-        final LocatedBlock updated = callUpdateBlockForPipeline(bg);
-        final long newGS = updated.getBlock().getGenerationStamp();
-        final LocatedBlock[] updatedBlks = StripedBlockUtil
-            .parseStripedBlockGroup((LocatedStripedBlock) updated, cellSize,
-                schema.getNumDataUnits(), schema.getNumParityUnits());
-        for (int i = 0; i < schema.getNumDataUnits()
-            + schema.getNumParityUnits(); i++) {
-          StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
-          if (si.isFailed()) {
-            continue; // skipping failed data streamer
-          }
-          final ExtendedBlock bi = si.getBlock();
-          if (bi != null) {
-            final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS),
-                null, null, null, -1, updated.isCorrupt(), null);
-            lb.setBlockToken(updatedBlks[i].getBlockToken());
-            queue.offer(i, lb);
-          } else {
-            final MultipleBlockingQueue<LocatedBlock> followingBlocks
-                = coordinator.getFollowingBlocks();
-            synchronized(followingBlocks) {
-              final LocatedBlock lb = followingBlocks.peek(i);
-              if (lb != null) {
-                lb.getBlock().setGenerationStamp(newGS);
-                si.getErrorState().reset();
-                continue;
-              }
-            }
-
-            //streamer i just have polled the block, sleep and retry.
-            sleep(100, "updateBlockForPipeline, " + this);
-            i--;
-          }
-        }
+  protected void setupPipelineInternal(DatanodeInfo[] nodes,
+      StorageType[] nodeStorageTypes) throws IOException {
+    boolean success = false;
+    while (!success && !streamerClosed() && dfsClient.clientRunning) {
+      if (!handleRestartingDatanode()) {
+        return;
+      }
+      if (!handleBadDatanode()) {
+        // for striped streamer if it is datanode error then close the stream
+        // and return. no need to replace datanode
+        return;
       }
-    }.poll(index);
-  }
-
-  @Override
-  ExtendedBlock updatePipeline(final long newGS) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("updatePipeline(newGS=" + newGS + "), " + this);
-    }
-    return new ConcurrentPoll<ExtendedBlock>(coordinator.getUpdateBlocks()) {
-      @Override
-      void populate() throws IOException {
-        final MultipleBlockingQueue<LocatedBlock> followingBlocks
-            = coordinator.getFollowingBlocks();
-        final ExtendedBlock bg = coordinator.getBlockGroup();
-        final ExtendedBlock newBG = newBlock(bg, newGS);
 
-        final int n = schema.getNumDataUnits() + schema.getNumParityUnits();
-        final DatanodeInfo[] newNodes = new DatanodeInfo[n];
-        final String[] newStorageIDs = new String[n];
-        for (int i = 0; i < n; i++) {
-          final StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
-          DatanodeInfo[] nodes = si.getNodes();
-          String[] storageIDs = si.getStorageIDs();
-          if (nodes == null || storageIDs == null) {
-            synchronized(followingBlocks) {
-              final LocatedBlock lb = followingBlocks.peek(i);
-              if (lb != null) {
-                nodes = lb.getLocations();
-                storageIDs = lb.getStorageIDs();
-              }
-            }
-          }
-          if (nodes != null && storageIDs != null) {
-            newNodes[i] = nodes[0];
-            newStorageIDs[i] = storageIDs[0];
-          } else {
-            //streamer i just have polled the block, sleep and retry.
-            sleep(100, "updatePipeline, " + this);
-            i--;
-          }
+      // get a new generation stamp and an access token
+      final LocatedBlock lb = coordinator.getNewBlocks().take(index);
+      long newGS = lb.getBlock().getGenerationStamp();
+      setAccessToken(lb.getBlockToken());
+
+      // set up the pipeline again with the remaining nodes. when a striped
+      // data streamer comes here, it must be in external error state.
+      assert getErrorState().hasExternalError();
+      success = createBlockOutputStream(nodes, nodeStorageTypes, newGS, true);
+
+      failPacket4Testing();
+      getErrorState().checkRestartingNodeDeadline(nodes);
+
+      // notify coordinator the result of createBlockOutputStream
+      synchronized (coordinator) {
+        if (!streamerClosed()) {
+          coordinator.updateStreamer(this, success);
+          coordinator.notify();
+        } else {
+          success = false;
         }
-        final ExtendedBlock updated = callUpdatePipeline(bg, newBG, newNodes,
-            newStorageIDs);
-
-        for (int i = 0; i < n; i++) {
-          final StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
-          final ExtendedBlock bi = si.getBlock();
-          if (bi != null) {
-            queue.offer(i, newBlock(bi, updated.getGenerationStamp()));
-          } else if (!si.isFailed()) {
-            synchronized(followingBlocks) {
-              final LocatedBlock lb = followingBlocks.peek(i);
-              if (lb != null) {
-                lb.getBlock().setGenerationStamp(newGS);
-                si.getErrorState().reset();
-                continue;
-              }
-            }
+      }
 
-            //streamer i just have polled the block, sleep and retry.
-            sleep(100, "updatePipeline, " + this);
-            i--;
-          }
+      if (success) {
+        // wait for results of other streamers
+        success = coordinator.takeStreamerUpdateResult(index);
+        if (success) {
+          // if all succeeded, update its block using the new GS
+          block = newBlock(block, newGS);
+        } else {
+          // otherwise close the block stream and restart the recovery process
+          closeStream();
         }
+      } else {
+        // if fail, close the stream. The internal error state and last
+        // exception have already been set in createBlockOutputStream
+        // TODO: wait for restarting DataNodes during RollingUpgrade
+        closeStream();
+        setStreamerAsClosed();
       }
-    }.poll(index);
+    } // while
+  }
+
+  void setExternalError() {
+    getErrorState().setExternalError();
+    synchronized (dataQueue) {
+      dataQueue.notifyAll();
+    }
   }
 
   @Override
   public String toString() {
-    return "#" + index + ": " + (failed? "failed, ": "") + super.toString();
+    return "#" + index + ": " + (!isHealthy() ? "failed, ": "") + super.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
index 0e92779..1d4cff3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
@@ -68,16 +68,28 @@ public class BlockUnderConstructionFeature {
   /** Set expected locations */
   public void setExpectedLocations(Block block, DatanodeStorageInfo[] targets,
       boolean isStriped) {
-    int numLocations = targets == null ? 0 : targets.length;
+    if (targets == null) {
+      return;
+    }
+    int numLocations = 0;
+    for (DatanodeStorageInfo target : targets) {
+      if (target != null) {
+        numLocations++;
+      }
+    }
+
     this.replicas = new ReplicaUnderConstruction[numLocations];
-    for(int i = 0; i < numLocations; i++) {
-      // when creating a new striped block we simply sequentially assign block
-      // index to each storage
-      Block replicaBlock = isStriped ?
-          new Block(block.getBlockId() + i, 0, block.getGenerationStamp()) :
-          block;
-      replicas[i] = new ReplicaUnderConstruction(replicaBlock, targets[i],
-          ReplicaState.RBW);
+    int offset = 0;
+    for(int i = 0; i < targets.length; i++) {
+      if (targets[i] != null) {
+        // when creating a new striped block we simply sequentially assign block
+        // index to each storage
+        Block replicaBlock = isStriped ?
+            new Block(block.getBlockId() + i, 0, block.getGenerationStamp()) :
+            block;
+        replicas[offset++] = new ReplicaUnderConstruction(replicaBlock,
+            targets[i], ReplicaState.RBW);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index b5b3b97..61c6386 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -513,6 +513,10 @@ public class DatanodeManager {
     }
     final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[datanodeID.length];
     for(int i = 0; i < datanodeID.length; i++) {
+      if (datanodeID[i].equals(DatanodeID.EMPTY_DATANODE_ID)) {
+        storages[i] = null;
+        continue;
+      }
       final DatanodeDescriptor dd = getDatanode(datanodeID[i]);
       storages[i] = dd.getStorageInfo(storageIDs[i]);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 5af3585..d49d39b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -925,22 +925,21 @@ public class StripedBlockUtil {
 
   /**
    * Check if the information such as IDs and generation stamps in block-i
-   * match block-j, where block-i and block-j are in the same group.
+   * match the block group.
    */
-  public static void checkBlocks(int j, ExtendedBlock blockj,
+  public static void checkBlocks(ExtendedBlock blockGroup,
       int i, ExtendedBlock blocki) throws IOException {
-
-    if (!blocki.getBlockPoolId().equals(blockj.getBlockPoolId())) {
-      throw new IOException("Block pool IDs mismatched: block" + j + "="
-          + blockj + ", block" + i + "=" + blocki);
+    if (!blocki.getBlockPoolId().equals(blockGroup.getBlockPoolId())) {
+      throw new IOException("Block pool IDs mismatched: block" + i + "="
+          + blocki + ", expected block group=" + blockGroup);
     }
-    if (blocki.getBlockId() - i != blockj.getBlockId() - j) {
-      throw new IOException("Block IDs mismatched: block" + j + "="
-          + blockj + ", block" + i + "=" + blocki);
+    if (blocki.getBlockId() - i != blockGroup.getBlockId()) {
+      throw new IOException("Block IDs mismatched: block" + i + "="
+          + blocki + ", expected block group=" + blockGroup);
     }
-    if (blocki.getGenerationStamp() != blockj.getGenerationStamp()) {
-      throw new IOException("Generation stamps mismatched: block" + j + "="
-          + blockj + ", block" + i + "=" + blocki);
+    if (blocki.getGenerationStamp() != blockGroup.getGenerationStamp()) {
+      throw new IOException("Generation stamps mismatched: block" + i + "="
+          + blocki + ", expected block group=" + blockGroup);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6419900a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 274d319..e621f26 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1988,35 +1988,14 @@ public class DFSTestUtil {
    */
   public static ExtendedBlock flushInternal(DFSStripedOutputStream out)
       throws IOException {
-    out.flushInternal();
+    out.flushAllInternals();
     return out.getBlock();
   }
 
-  /**
-   * Verify that blocks in striped block group are on different nodes, and every
-   * internal blocks exists.
-   */
-  public static void verifyLocatedStripedBlocks(LocatedBlocks lbs,
-       int groupSize) {
-    for (LocatedBlock lb : lbs.getLocatedBlocks()) {
-      assert lb instanceof LocatedStripedBlock;
-      HashSet<DatanodeInfo> locs = new HashSet<>();
-      for (DatanodeInfo datanodeInfo : lb.getLocations()) {
-        locs.add(datanodeInfo);
-      }
-      assertEquals(groupSize, lb.getLocations().length);
-      assertEquals(groupSize, locs.size());
-
-      // verify that every internal blocks exists
-      int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices();
-      assertEquals(groupSize, blockIndices.length);
-      HashSet<Integer> found = new HashSet<>();
-      for (int index : blockIndices) {
-        assert index >=0;
-        found.add(index);
-      }
-      assertEquals(groupSize, found.size());
-    }
+  public static ExtendedBlock flushBuffer(DFSStripedOutputStream out)
+      throws IOException {
+    out.flush();
+    return out.getBlock();
   }
 
   public static void waitForMetric(final JMXGet jmx, final String metricName, final int expectedValue)