You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/07/31 21:16:35 UTC

hadoop git commit: HDFS-8202. Improve end to end stirpping file test to add erasure recovering test. Contributed by Xinwei Qin.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7285 4a72be6e0 -> ba90c0285


HDFS-8202. Improve end to end stirpping file test to add erasure recovering test. Contributed by Xinwei Qin.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ba90c028
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ba90c028
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ba90c028

Branch: refs/heads/HDFS-7285
Commit: ba90c02853f85f7ee11d6f37a0c6d0e8a2101ec8
Parents: 4a72be6
Author: Zhe Zhang <zh...@cloudera.com>
Authored: Fri Jul 31 12:16:15 2015 -0700
Committer: Zhe Zhang <zh...@cloudera.com>
Committed: Fri Jul 31 12:16:15 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../apache/hadoop/hdfs/StripedFileTestUtil.java |  65 ++++
 .../TestDFSStripedOutputStreamWithFailure.java  |  26 +-
 .../hdfs/TestReadStripedFileWithDecoding.java   | 320 ++++++++++---------
 .../hdfs/TestWriteStripedFileWithFailure.java   | 162 ++++++++++
 5 files changed, 398 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba90c028/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 1ded203..673fbab 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -379,3 +379,6 @@
 
     HDFS-8769. Erasure coding: unit test for SequentialBlockGroupIdGenerator.
     (Rakesh R via waltersu4549)
+
+    HDFS-8202. Improve end to end stirpping file test to add erasure recovering
+    test. (Xinwei Qin via zhz)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba90c028/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
index 2866a0e..ca4b2aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.hdfs;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
 import org.junit.Assert;
@@ -29,8 +32,11 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class StripedFileTestUtil {
+  public static final Log LOG = LogFactory.getLog(StripedFileTestUtil.class);
+
   static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
   static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
 
@@ -193,4 +199,63 @@ public class StripedFileTestUtil {
           StripedFileTestUtil.getByte(pos + i), buf[i]);
     }
   }
+
+  static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out,
+      final int dnIndex, final AtomicInteger pos) {
+    final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
+    final DatanodeInfo datanode = getDatanodes(s);
+    LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
+    cluster.stopDataNode(datanode.getXferAddr());
+  }
+
+  static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
+    for(;;) {
+      final DatanodeInfo[] datanodes = streamer.getNodes();
+      if (datanodes != null) {
+        Assert.assertEquals(1, datanodes.length);
+        Assert.assertNotNull(datanodes[0]);
+        return datanodes[0];
+      }
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException ignored) {
+        return null;
+      }
+    }
+  }
+
+  /**
+   * Generate n random and different numbers within
+   * specified non-negative integer range
+   * @param min minimum of the range
+   * @param max maximum of the range
+   * @param n number to be generated
+   * @return
+   */
+  public static int[] randomArray(int min, int max, int n){
+    if (n > (max - min + 1) || max < min || min < 0 || max < 0) {
+      return null;
+    }
+    int[] result = new int[n];
+    for (int i = 0; i < n; i++) {
+      result[i] = -1;
+    }
+
+    int count = 0;
+    while(count < n) {
+      int num = (int) (Math.random() * (max - min)) + min;
+      boolean flag = true;
+      for (int j = 0; j < n; j++) {
+        if(num == result[j]){
+          flag = false;
+          break;
+        }
+      }
+      if(flag){
+        result[count] = num;
+        count++;
+      }
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba90c028/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
index 54fcdf8..6594ae1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -181,7 +181,7 @@ public class TestDFSStripedOutputStreamWithFailure {
           waitTokenExpires(out);
         }
 
-        killDatanode(cluster, stripedOut, dnIndex, pos);
+        StripedFileTestUtil.killDatanode(cluster, stripedOut, dnIndex, pos);
         killed = true;
       }
 
@@ -217,30 +217,6 @@ public class TestDFSStripedOutputStreamWithFailure {
 
   }
 
-  static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
-    for(;;) {
-      final DatanodeInfo[] datanodes = streamer.getNodes();
-      if (datanodes != null) {
-        Assert.assertEquals(1, datanodes.length);
-        Assert.assertNotNull(datanodes[0]);
-        return datanodes[0];
-      }
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException ignored) {
-        return null;
-      }
-    }
-  }
-
-  static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out,
-      final int dnIndex, final AtomicInteger pos) {
-    final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
-    final DatanodeInfo datanode = getDatanodes(s);
-    LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
-    cluster.stopDataNode(datanode.getXferAddr());
-  }
-
   static void checkData(DistributedFileSystem dfs, String src, int length,
       int killedDnIndex, long oldGS) throws IOException {
     List<List<LocatedBlock>> blockGroupList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba90c028/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
index 8afea19..1719d3f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
@@ -20,10 +20,11 @@ package org.apache.hadoop.hdfs;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -43,13 +44,21 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import static org.apache.hadoop.hdfs.StripedFileTestUtil.*;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.parityBlocks;
 
 public class TestReadStripedFileWithDecoding {
   static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class);
 
   private MiniDFSCluster cluster;
   private DistributedFileSystem fs;
+  private final int smallFileLength = blockSize * dataBlocks - 123;
+  private final int largeFileLength = blockSize * dataBlocks + 123;
+  private final int[] fileLengths = {smallFileLength, largeFileLength};
+  private final int[] dnFailureNums = {1, 2, 3};
 
   @Before
   public void setup() throws IOException {
@@ -67,82 +76,64 @@ public class TestReadStripedFileWithDecoding {
     }
   }
 
-  @Test
-  public void testReadWithDNFailure1() throws IOException {
-    testReadWithDNFailure("/foo", cellSize * (dataBlocks + 2), 0);
-  }
-
-  @Test
-  public void testReadWithDNFailure2() throws IOException {
-    testReadWithDNFailure("/foo", cellSize * (dataBlocks + 2), cellSize * 5);
-  }
-
-  @Test
-  public void testReadWithDNFailure3() throws IOException {
-    testReadWithDNFailure("/foo", cellSize * dataBlocks, 0);
+  /**
+   * Shutdown tolerable number of Datanode before reading.
+   * Verify the decoding works correctly.
+   */
+  @Test(timeout=300000)
+  public void testReadWithDNFailure() throws IOException {
+    for (int fileLength : fileLengths) {
+      for (int dnFailureNum : dnFailureNums) {
+        try {
+          // setup a new cluster with no dead datanode
+          setup();
+          testReadWithDNFailure(fileLength, dnFailureNum);
+        } catch (IOException ioe) {
+          String fileType = fileLength < (blockSize * dataBlocks) ?
+              "smallFile" : "largeFile";
+          LOG.error("Failed to read file with DN failure:"
+              + " fileType = "+ fileType
+              + ", dnFailureNum = " + dnFailureNum);
+        } finally {
+          // tear down the cluster
+          tearDown();
+        }
+      }
+    }
   }
 
   /**
-   * Delete a data block before reading. Verify the decoding works correctly.
+   * Corrupt tolerable number of block before reading.
+   * Verify the decoding works correctly.
    */
-  @Test
+  @Test(timeout=300000)
   public void testReadCorruptedData() throws IOException {
-    // create file
-    final Path file = new Path("/partially_deleted");
-    final int length = cellSize * dataBlocks * 2;
-    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
-    DFSTestUtil.writeFile(fs, file, bytes);
-
-    // corrupt the first data block
-    // find the corresponding data node
-    int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
-    Assert.assertNotEquals(-1, dnIndex);
-    // find the target block
-    LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
-        .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
-    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
-        cellSize, dataBlocks, parityBlocks);
-    // find the target block file
-    File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
-    File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
-    Assert.assertTrue("Block file does not exist", blkFile.exists());
-    // delete the block file
-    LOG.info("Deliberately removing file " + blkFile.getName());
-    Assert.assertTrue("Cannot remove file", blkFile.delete());
-    verifyRead(file, length, bytes);
+    for (int fileLength : fileLengths) {
+      for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) {
+        for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) {
+          String src = "/corrupted_" + dataDelNum + "_" + parityDelNum;
+          testReadWithBlockCorrupted(src, fileLength,
+              dataDelNum, parityDelNum, false);
+        }
+      }
+    }
   }
 
   /**
-   * Corrupt the content of the data block before reading.
+   * Delete tolerable number of block before reading.
+   * Verify the decoding works correctly.
    */
-  @Test
-  public void testReadCorruptedData2() throws IOException {
-    // create file
-    final Path file = new Path("/partially_corrupted");
-    final int length = cellSize * dataBlocks * 2;
-    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
-    DFSTestUtil.writeFile(fs, file, bytes);
-
-    // corrupt the first data block
-    // find the first data node
-    int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
-    Assert.assertNotEquals(-1, dnIndex);
-    // find the first data block
-    LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
-        .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
-    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
-        cellSize, dataBlocks, parityBlocks);
-    // find the first block file
-    File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
-    File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
-    Assert.assertTrue("Block file does not exist", blkFile.exists());
-    // corrupt the block file
-    LOG.info("Deliberately corrupting file " + blkFile.getName());
-    try (FileOutputStream out = new FileOutputStream(blkFile)) {
-      out.write("corruption".getBytes());
+  @Test(timeout=300000)
+  public void testReadCorruptedDataByDeleting() throws IOException {
+    for (int fileLength : fileLengths) {
+      for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) {
+        for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) {
+          String src = "/deleted_" + dataDelNum + "_" + parityDelNum;
+          testReadWithBlockCorrupted(src, fileLength,
+              dataDelNum, parityDelNum, true);
+        }
+      }
     }
-
-    verifyRead(file, length, bytes);
   }
 
   private int findFirstDataNode(Path file, long length) throws IOException {
@@ -159,87 +150,45 @@ public class TestReadStripedFileWithDecoding {
     return -1;
   }
 
-  private void verifyRead(Path file, int length, byte[] expected)
+  private void verifyRead(Path testPath, int length, byte[] expected)
       throws IOException {
-    // pread
-    try (FSDataInputStream fsdis = fs.open(file)) {
-      byte[] buf = new byte[length];
-      int readLen = fsdis.read(0, buf, 0, buf.length);
-      Assert.assertEquals("The fileSize of file should be the same to write size",
-          length, readLen);
-      Assert.assertArrayEquals(expected, buf);
-    }
-
-    // stateful read
-    ByteBuffer result = ByteBuffer.allocate(length);
-    ByteBuffer buf = ByteBuffer.allocate(1024);
-    int readLen = 0;
-    int ret;
-    try (FSDataInputStream in = fs.open(file)) {
-      while ((ret = in.read(buf)) >= 0) {
-        readLen += ret;
-        buf.flip();
-        result.put(buf);
-        buf.clear();
-      }
-    }
-    Assert.assertEquals("The length of file should be the same to write size",
-        length, readLen);
-    Assert.assertArrayEquals(expected, result.array());
+    byte[] buffer = new byte[length + 100];
+    StripedFileTestUtil.verifyLength(fs, testPath, length);
+    StripedFileTestUtil.verifyPread(fs, testPath, length, expected, buffer);
+    StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected, buffer);
+    StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected,
+        ByteBuffer.allocate(length + 100));
+    StripedFileTestUtil.verifySeek(fs, testPath, length);
   }
 
-  private void testReadWithDNFailure(String file, int fileSize,
-      int startOffsetInFile) throws IOException {
-    final int failedDNIdx = 2;
-    Path testPath = new Path(file);
-    final byte[] bytes = StripedFileTestUtil.generateBytes(fileSize);
+  private void testReadWithDNFailure(int fileLength, int dnFailureNum)
+      throws IOException {
+    String fileType = fileLength < (blockSize * dataBlocks) ?
+        "smallFile" : "largeFile";
+    String src = "/dnFailure_" + dnFailureNum + "_" + fileType;
+    LOG.info("testReadWithDNFailure: file = " + src
+        + ", fileSize = " + fileLength
+        + ", dnFailureNum = " + dnFailureNum);
+
+    Path testPath = new Path(src);
+    final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
     DFSTestUtil.writeFile(fs, testPath, bytes);
 
     // shut down the DN that holds an internal data block
     BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
         cellSize);
-    String name = (locs[0].getNames())[failedDNIdx];
-    for (DataNode dn : cluster.getDataNodes()) {
-      int port = dn.getXferPort();
-      if (name.contains(Integer.toString(port))) {
-        dn.shutdown();
-        break;
-      }
-    }
-
-    // pread
-    try (FSDataInputStream fsdis = fs.open(testPath)) {
-      byte[] buf = new byte[fileSize];
-      int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length);
-      Assert.assertEquals("The fileSize of file should be the same to write size",
-          fileSize - startOffsetInFile, readLen);
-
-      byte[] expected = new byte[readLen];
-      System.arraycopy(bytes, startOffsetInFile, expected, 0,
-          fileSize - startOffsetInFile);
-
-      for (int i = startOffsetInFile; i < fileSize; i++) {
-        Assert.assertEquals("Byte at " + i + " should be the same",
-            expected[i - startOffsetInFile], buf[i - startOffsetInFile]);
+    for (int failedDnIdx = 0; failedDnIdx < dnFailureNum; failedDnIdx++) {
+      String name = (locs[0].getNames())[failedDnIdx];
+      for (DataNode dn : cluster.getDataNodes()) {
+        int port = dn.getXferPort();
+        if (name.contains(Integer.toString(port))) {
+          dn.shutdown();
+        }
       }
     }
 
-    // stateful read
-    ByteBuffer result = ByteBuffer.allocate(fileSize);
-    ByteBuffer buf = ByteBuffer.allocate(1024);
-    int readLen = 0;
-    int ret;
-    try (FSDataInputStream in = fs.open(testPath)) {
-      while ((ret = in.read(buf)) >= 0) {
-        readLen += ret;
-        buf.flip();
-        result.put(buf);
-        buf.clear();
-      }
-    }
-    Assert.assertEquals("The length of file should be the same to write size",
-        fileSize, readLen);
-    Assert.assertArrayEquals(bytes, result.array());
+    // check file length, pread, stateful read and seek
+    verifyRead(testPath, fileLength, bytes);
   }
 
   /**
@@ -279,21 +228,8 @@ public class TestReadStripedFileWithDecoding {
 
     try {
       // do stateful read
-      ByteBuffer result = ByteBuffer.allocate(length);
-      ByteBuffer buf = ByteBuffer.allocate(1024);
-      int readLen = 0;
-      int ret;
-      try (FSDataInputStream in = fs.open(file)) {
-        while ((ret = in.read(buf)) >= 0) {
-          readLen += ret;
-          buf.flip();
-          result.put(buf);
-          buf.clear();
-        }
-      }
-      Assert.assertEquals("The length of file should be the same to write size",
-          length, readLen);
-      Assert.assertArrayEquals(bytes, result.array());
+      StripedFileTestUtil.verifyStatefulRead(fs, file, length, bytes,
+          ByteBuffer.allocate(1024));
 
       // check whether the corruption has been reported to the NameNode
       final FSNamesystem ns = cluster.getNamesystem();
@@ -341,4 +277,82 @@ public class TestReadStripedFileWithDecoding {
       DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
     }
   }
+
+  /**
+   * Test reading a file with some blocks(data blocks or parity blocks or both)
+   * deleted or corrupted.
+   * @param src file path
+   * @param fileLength file length
+   * @param dataBlkDelNum the deleted or corrupted number of data blocks.
+   * @param parityBlkDelNum the deleted or corrupted number of parity blocks.
+   * @param deleteBlockFile whether block file is deleted or corrupted.
+   *                        true is to delete the block file.
+   *                        false is to corrupt the content of the block file.
+   * @throws IOException
+   */
+  private void testReadWithBlockCorrupted(String src, int fileLength,
+      int dataBlkDelNum, int parityBlkDelNum, boolean deleteBlockFile)
+      throws IOException {
+    LOG.info("testReadWithBlockCorrupted: file = " + src
+        + ", dataBlkDelNum = " + dataBlkDelNum
+        + ", parityBlkDelNum = " + parityBlkDelNum
+        + ", deleteBlockFile? " + deleteBlockFile);
+    int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
+    Assert.assertTrue("dataBlkDelNum and parityBlkDelNum should be positive",
+        dataBlkDelNum >= 0 && parityBlkDelNum >= 0);
+    Assert.assertTrue("The sum of dataBlkDelNum and parityBlkDelNum " +
+        "should be between 1 ~ " + parityBlocks, recoverBlkNum <= parityBlocks);
+
+    // write a file with the length of writeLen
+    Path srcPath = new Path(src);
+    final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
+    DFSTestUtil.writeFile(fs, srcPath, bytes);
+
+    // delete or corrupt some blocks
+    corruptBlocks(srcPath, dataBlkDelNum, parityBlkDelNum, deleteBlockFile);
+
+    // check the file can be read after some blocks were deleted
+    verifyRead(srcPath, fileLength, bytes);
+  }
+
+  private void corruptBlocks(Path srcPath, int dataBlkDelNum,
+      int parityBlkDelNum, boolean deleteBlockFile) throws IOException {
+    int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
+
+    LocatedBlocks locatedBlocks = getLocatedBlocks(srcPath);
+    LocatedStripedBlock lastBlock =
+        (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
+
+    int[] delDataBlkIndices = StripedFileTestUtil.randomArray(0, dataBlocks,
+        dataBlkDelNum);
+    Assert.assertNotNull(delDataBlkIndices);
+    int[] delParityBlkIndices = StripedFileTestUtil.randomArray(dataBlocks,
+        dataBlocks + parityBlocks, parityBlkDelNum);
+    Assert.assertNotNull(delParityBlkIndices);
+
+    int[] delBlkIndices = new int[recoverBlkNum];
+    System.arraycopy(delDataBlkIndices, 0,
+        delBlkIndices, 0, delDataBlkIndices.length);
+    System.arraycopy(delParityBlkIndices, 0,
+        delBlkIndices, delDataBlkIndices.length, delParityBlkIndices.length);
+
+    ExtendedBlock[] delBlocks = new ExtendedBlock[recoverBlkNum];
+    for (int i = 0; i < recoverBlkNum; i++) {
+      delBlocks[i] = StripedBlockUtil
+          .constructInternalBlock(lastBlock.getBlock(),
+              cellSize, dataBlocks, delBlkIndices[i]);
+      if (deleteBlockFile) {
+        // delete the block file
+        cluster.corruptBlockOnDataNodesByDeletingBlockFile(delBlocks[i]);
+      } else {
+        // corrupt the block file
+        cluster.corruptBlockOnDataNodes(delBlocks[i]);
+      }
+    }
+  }
+
+  private LocatedBlocks getLocatedBlocks(Path filePath) throws IOException {
+    return fs.getClient().getLocatedBlocks(filePath.toString(),
+        0, Long.MAX_VALUE);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba90c028/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
new file mode 100644
index 0000000..5448773
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.parityBlocks;
+
+public class TestWriteStripedFileWithFailure {
+  public static final Log LOG = LogFactory
+      .getLog(TestReadStripedFileWithMissingBlocks.class);
+  private static MiniDFSCluster cluster;
+  private static FileSystem fs;
+  private static Configuration conf = new HdfsConfiguration();
+  private final int smallFileLength = blockSize * dataBlocks - 123;
+  private final int largeFileLength = blockSize * dataBlocks + 123;
+  private final int[] fileLengths = {smallFileLength, largeFileLength};
+
+  public void setup() throws IOException {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.getFileSystem().getClient().createErasureCodingZone("/",
+        null, cellSize);
+    fs = cluster.getFileSystem();
+  }
+
+  public void tearDown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  // Test writing file with some Datanodes failure
+  @Test(timeout = 300000)
+  public void testWriteStripedFileWithDNFailure() throws IOException {
+    for (int fileLength : fileLengths) {
+      for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) {
+        for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) {
+          try {
+            // setup a new cluster with no dead datanode
+            setup();
+            writeFileWithDNFailure(fileLength, dataDelNum, parityDelNum);
+          } catch (IOException ioe) {
+            String fileType = fileLength < (blockSize * dataBlocks) ?
+                "smallFile" : "largeFile";
+            LOG.error("Failed to write file with DN failure:"
+                + " fileType = "+ fileType
+                + ", dataDelNum = " + dataDelNum
+                + ", parityDelNum = " + parityDelNum);
+            throw ioe;
+          } finally {
+            // tear down the cluster
+            tearDown();
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Test writing a file with shutting down some DNs(data DNs or parity DNs or both).
+   * @param fileLength file length
+   * @param dataDNFailureNum the shutdown number of data DNs
+   * @param parityDNFailureNum the shutdown number of parity DNs
+   * @throws IOException
+   */
+  private void writeFileWithDNFailure(int fileLength,
+      int dataDNFailureNum, int parityDNFailureNum) throws IOException {
+    String fileType = fileLength < (blockSize * dataBlocks) ?
+        "smallFile" : "largeFile";
+    String src = "/dnFailure_" + dataDNFailureNum + "_" + parityDNFailureNum
+        + "_" + fileType;
+    LOG.info("writeFileWithDNFailure: file = " + src
+        + ", fileType = " + fileType
+        + ", dataDNFailureNum = " + dataDNFailureNum
+        + ", parityDNFailureNum = " + parityDNFailureNum);
+
+    Path srcPath = new Path(src);
+    final AtomicInteger pos = new AtomicInteger();
+    final FSDataOutputStream out = fs.create(srcPath);
+    final DFSStripedOutputStream stripedOut
+        = (DFSStripedOutputStream)out.getWrappedStream();
+
+    int[] dataDNFailureIndices = StripedFileTestUtil.randomArray(0, dataBlocks,
+        dataDNFailureNum);
+    Assert.assertNotNull(dataDNFailureIndices);
+    int[] parityDNFailureIndices = StripedFileTestUtil.randomArray(dataBlocks,
+        dataBlocks + parityBlocks, dataDNFailureNum);
+    Assert.assertNotNull(parityDNFailureIndices);
+
+    int[] failedDataNodes = new int[dataDNFailureNum + parityDNFailureNum];
+    System.arraycopy(dataDNFailureIndices, 0, failedDataNodes,
+        0, dataDNFailureIndices.length);
+    System.arraycopy(parityDNFailureIndices, 0, failedDataNodes,
+        dataDNFailureIndices.length, parityDNFailureIndices.length);
+
+    final int killPos = fileLength/2;
+    for (; pos.get() < fileLength; ) {
+      final int i = pos.getAndIncrement();
+      if (i == killPos) {
+        for(int failedDn : failedDataNodes) {
+          StripedFileTestUtil.killDatanode(cluster, stripedOut, failedDn, pos);
+        }
+      }
+      write(out, i);
+    }
+    out.close();
+
+    // make sure the expected number of Datanode have been killed
+    int dnFailureNum = dataDNFailureNum + parityDNFailureNum;
+    Assert.assertEquals(cluster.getDataNodes().size(), numDNs - dnFailureNum);
+
+    byte[] smallBuf = new byte[1024];
+    byte[] largeBuf = new byte[fileLength + 100];
+    final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
+    StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
+        smallBuf);
+    StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf);
+
+    // delete the file
+    fs.delete(srcPath, true);
+  }
+
+  void write(FSDataOutputStream out, int i) throws IOException {
+    try {
+      out.write(StripedFileTestUtil.getByte(i));
+    } catch (IOException e) {
+      throw new IOException("Failed at i=" + i, e);
+    }
+  }
+}
\ No newline at end of file