You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/04 19:58:01 UTC

[27/50] hadoop git commit: HDFS-8166. DFSStripedOutputStream should not create empty blocks. Contributed by Jing Zhao.

HDFS-8166. DFSStripedOutputStream should not create empty blocks. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/567105ef
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/567105ef
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/567105ef

Branch: refs/heads/HDFS-7285
Commit: 567105ef3f58c94aa55d607e4d1d150c772bb167
Parents: 647173e
Author: Jing Zhao <ji...@apache.org>
Authored: Fri Apr 17 17:55:19 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon May 4 10:13:26 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/DFSStripedOutputStream.java     | 163 +++++++++++--------
 .../apache/hadoop/hdfs/StripedDataStreamer.java |  72 +++-----
 .../server/blockmanagement/BlockManager.java    |  17 +-
 .../hadoop/hdfs/TestDFSStripedOutputStream.java | 162 +++++++++++-------
 4 files changed, 236 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/567105ef/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index f11a657..7dc0091 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -22,10 +22,14 @@ import java.io.InterruptedIOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -59,12 +63,12 @@ public class DFSStripedOutputStream extends DFSOutputStream {
    */
   private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
   private ByteBuffer[] cellBuffers;
-  private final short blockGroupBlocks = HdfsConstants.NUM_DATA_BLOCKS
+  private final short numAllBlocks = HdfsConstants.NUM_DATA_BLOCKS
       + HdfsConstants.NUM_PARITY_BLOCKS;
-  private final short blockGroupDataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  private final short numDataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
   private int curIdx = 0;
   /* bytes written in current block group */
-  private long currentBlockGroupBytes = 0;
+  //private long currentBlockGroupBytes = 0;
 
   //TODO: Use ErasureCoder interface (HDFS-7781)
   private RawErasureEncoder encoder;
@@ -73,10 +77,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     return streamers.get(0);
   }
 
-  private long getBlockGroupSize() {
-    return blockSize * HdfsConstants.NUM_DATA_BLOCKS;
-  }
-
   /** Construct a new output stream for creating a file. */
   DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
                          EnumSet<CreateFlag> flag, Progressable progress,
@@ -84,15 +84,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
                          throws IOException {
     super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
     DFSClient.LOG.info("Creating striped output stream");
-    if (blockGroupBlocks <= 1) {
-      throw new IOException("The block group must contain more than one block.");
-    }
+    checkConfiguration();
 
-    cellBuffers = new ByteBuffer[blockGroupBlocks];
+    cellBuffers = new ByteBuffer[numAllBlocks];
     List<BlockingQueue<LocatedBlock>> stripeBlocks = new ArrayList<>();
 
-    for (int i = 0; i < blockGroupBlocks; i++) {
-      stripeBlocks.add(new LinkedBlockingQueue<LocatedBlock>(blockGroupBlocks));
+    for (int i = 0; i < numAllBlocks; i++) {
+      stripeBlocks.add(new LinkedBlockingQueue<LocatedBlock>(numAllBlocks));
       try {
         cellBuffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
       } catch (InterruptedException ie) {
@@ -103,29 +101,38 @@ public class DFSStripedOutputStream extends DFSOutputStream {
       }
     }
     encoder = new RSRawEncoder();
-    encoder.initialize(blockGroupDataBlocks,
-        blockGroupBlocks - blockGroupDataBlocks, cellSize);
+    encoder.initialize(numDataBlocks,
+        numAllBlocks - numDataBlocks, cellSize);
 
-    streamers = new ArrayList<>(blockGroupBlocks);
-    for (short i = 0; i < blockGroupBlocks; i++) {
+    List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks);
+    for (short i = 0; i < numAllBlocks; i++) {
       StripedDataStreamer streamer = new StripedDataStreamer(stat, null,
           dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
           i, stripeBlocks);
       if (favoredNodes != null && favoredNodes.length != 0) {
         streamer.setFavoredNodes(favoredNodes);
       }
-      streamers.add(streamer);
+      s.add(streamer);
     }
+    streamers = Collections.unmodifiableList(s);
 
     refreshStreamer();
   }
 
+  private void checkConfiguration() {
+    if (cellSize % bytesPerChecksum != 0) {
+      throw new HadoopIllegalArgumentException("Invalid values: "
+          + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
+          + ") must divide cell size (=" + cellSize + ").");
+    }
+  }
+
   private void refreshStreamer() {
     streamer = streamers.get(curIdx);
   }
 
   private void moveToNextStreamer() {
-    curIdx = (curIdx + 1) % blockGroupBlocks;
+    curIdx = (curIdx + 1) % numAllBlocks;
     refreshStreamer();
   }
 
@@ -136,20 +143,21 @@ public class DFSStripedOutputStream extends DFSOutputStream {
    * @param buffers data buffers + parity buffers
    */
   private void encode(ByteBuffer[] buffers) {
-    ByteBuffer[] dataBuffers = new ByteBuffer[blockGroupDataBlocks];
-    ByteBuffer[] parityBuffers = new ByteBuffer[blockGroupBlocks - blockGroupDataBlocks];
-    for (int i = 0; i < blockGroupBlocks; i++) {
-      if (i < blockGroupDataBlocks) {
+    ByteBuffer[] dataBuffers = new ByteBuffer[numDataBlocks];
+    ByteBuffer[] parityBuffers = new ByteBuffer[numAllBlocks - numDataBlocks];
+    for (int i = 0; i < numAllBlocks; i++) {
+      if (i < numDataBlocks) {
         dataBuffers[i] = buffers[i];
       } else {
-        parityBuffers[i - blockGroupDataBlocks] = buffers[i];
+        parityBuffers[i - numDataBlocks] = buffers[i];
       }
     }
     encoder.encode(dataBuffers, parityBuffers);
   }
 
   /**
-   * Generate packets from a given buffer
+   * Generate packets from a given buffer. This is only used for streamers
+   * writing parity blocks.
    *
    * @param byteBuffer the given buffer to generate packets
    * @return packets generated
@@ -185,7 +193,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
       throw new IOException(msg);
     }
 
-
     // If current packet has not been enqueued for transmission,
     // but the cell buffer is full, we need to enqueue the packet
     if (currentPacket != null && getSizeOfCellnBuffer(curIdx) == cellSize) {
@@ -213,13 +220,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
       //When all data cells in a stripe are ready, we need to encode
       //them and generate some parity cells. These cells will be
       //converted to packets and put to their DataStreamer's queue.
-      if (curIdx == blockGroupDataBlocks) {
+      if (curIdx == numDataBlocks) {
         //encode the data cells
-        for (int k = 0; k < blockGroupDataBlocks; k++) {
+        for (int k = 0; k < numDataBlocks; k++) {
           cellBuffers[k].flip();
         }
         encode(cellBuffers);
-        for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) {
+        for (int i = numDataBlocks; i < numAllBlocks; i++) {
           ByteBuffer parityBuffer = cellBuffers[i];
           List<DFSPacket> packets = generatePackets(parityBuffer);
           for (DFSPacket p : packets) {
@@ -245,13 +252,24 @@ public class DFSStripedOutputStream extends DFSOutputStream {
   }
 
   private void clearCellBuffers() {
-    for (int i = 0; i< blockGroupBlocks; i++) {
+    for (int i = 0; i< numAllBlocks; i++) {
       cellBuffers[i].clear();
+      if (i >= numDataBlocks) {
+        Arrays.fill(cellBuffers[i].array(), (byte) 0);
+      }
     }
   }
 
   private int stripeDataSize() {
-    return blockGroupDataBlocks * cellSize;
+    return numDataBlocks * cellSize;
+  }
+
+  private long getCurrentBlockGroupBytes() {
+    long sum = 0;
+    for (int i = 0; i < numDataBlocks; i++) {
+      sum += streamers.get(i).getBytesCurBlock();
+    }
+    return sum;
   }
 
   private void notSupported(String headMsg)
@@ -270,7 +288,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     notSupported("hsync");
   }
 
-
   @Override
   protected synchronized void start() {
     for (StripedDataStreamer streamer : streamers) {
@@ -302,15 +319,11 @@ public class DFSStripedOutputStream extends DFSOutputStream {
   // interrupt datastreamer if force is true
   @Override
   protected void closeThreads(boolean force) throws IOException {
-    StripedDataStreamer leadingStreamer = null;
     for (StripedDataStreamer streamer : streamers) {
       try {
         streamer.close(force);
         streamer.join();
         streamer.closeSocket();
-        if (streamer.isLeadingStreamer()) {
-          leadingStreamer = streamer;
-        }
       } catch (InterruptedException e) {
         throw new IOException("Failed to shutdown streamer");
       } finally {
@@ -318,40 +331,26 @@ public class DFSStripedOutputStream extends DFSOutputStream {
         setClosed();
       }
     }
-    assert leadingStreamer != null : "One streamer should be leader";
-    leadingStreamer.countTailingBlockGroupBytes();
-  }
-
-  @Override
-  public synchronized void write(int b) throws IOException {
-    super.write(b);
-    currentBlockGroupBytes = (currentBlockGroupBytes + 1) % getBlockGroupSize();
-  }
-
-  @Override
-  public synchronized void write(byte b[], int off, int len)
-      throws IOException {
-    super.write(b, off, len);
-    currentBlockGroupBytes = (currentBlockGroupBytes + len) % getBlockGroupSize();
   }
 
-  private void writeParityCellsForLastStripe() throws IOException{
+  private void writeParityCellsForLastStripe() throws IOException {
+    final long currentBlockGroupBytes = getCurrentBlockGroupBytes();
     long parityBlkSize = StripedBlockUtil.getInternalBlockLength(
-        currentBlockGroupBytes, cellSize, blockGroupDataBlocks,
-        blockGroupDataBlocks + 1);
+        currentBlockGroupBytes, cellSize, numDataBlocks,
+        numDataBlocks + 1);
     if (parityBlkSize == 0 || currentBlockGroupBytes % stripeDataSize() == 0) {
       return;
     }
     int parityCellSize = parityBlkSize % cellSize == 0 ? cellSize :
                         (int) (parityBlkSize % cellSize);
 
-    for (int i = 0; i < blockGroupBlocks; i++) {
+    for (int i = 0; i < numAllBlocks; i++) {
       long internalBlkLen = StripedBlockUtil.getInternalBlockLength(
-          currentBlockGroupBytes, cellSize, blockGroupDataBlocks, i);
+          currentBlockGroupBytes, cellSize, numDataBlocks, i);
       // Pad zero bytes to make all cells exactly the size of parityCellSize
       // If internal block is smaller than parity block, pad zero bytes.
       // Also pad zero bytes to all parity cells
-      if (internalBlkLen < parityBlkSize || i >= blockGroupDataBlocks) {
+      if (internalBlkLen < parityBlkSize || i >= numDataBlocks) {
         int position = cellBuffers[i].position();
         assert position <= parityCellSize : "If an internal block is smaller" +
             " than parity block, then its last cell should be small than last" +
@@ -365,9 +364,9 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     encode(cellBuffers);
 
     //write parity cells
-    curIdx = blockGroupDataBlocks;
+    curIdx = numDataBlocks;
     refreshStreamer();
-    for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) {
+    for (int i = numDataBlocks; i < numAllBlocks; i++) {
       ByteBuffer parityBuffer = cellBuffers[i];
       List<DFSPacket> packets = generatePackets(parityBuffer);
       for (DFSPacket p : packets) {
@@ -385,7 +384,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
   @Override
   void setClosed() {
     super.setClosed();
-    for (int i = 0; i < blockGroupBlocks; i++) {
+    for (int i = 0; i < numAllBlocks; i++) {
       byteArrayManager.release(cellBuffers[i].array());
       streamers.get(i).release();
     }
@@ -395,10 +394,11 @@ public class DFSStripedOutputStream extends DFSOutputStream {
   protected synchronized void closeImpl() throws IOException {
     if (isClosed()) {
       IOException e = getLeadingStreamer().getLastException().getAndSet(null);
-      if (e == null)
-        return;
-      else
+      if (e != null) {
         throw e;
+      } else {
+        return;
+      }
     }
 
     try {
@@ -408,14 +408,13 @@ public class DFSStripedOutputStream extends DFSOutputStream {
         streamer.waitAndQueuePacket(currentPacket);
         currentPacket = null;
       }
-      //if the last stripe is incomplete, generate and write parity cells
+      // if the last stripe is incomplete, generate and write parity cells
       writeParityCellsForLastStripe();
 
-      for (int i = 0; i < blockGroupBlocks; i++) {
+      for (int i = 0; i < numAllBlocks; i++) {
         curIdx = i;
         refreshStreamer();
-        if (streamer.getBytesCurBlock()!= 0 ||
-            currentBlockGroupBytes < getBlockGroupSize()) {
+        if (streamer.getBytesCurBlock() > 0) {
           // send an empty packet to mark the end of the block
           currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
               streamer.getAndIncCurrentSeqno(), true);
@@ -425,9 +424,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
         flushInternal();
       }
 
-      // get last block before destroying the streamer
-      ExtendedBlock lastBlock = streamers.get(0).getBlock();
       closeThreads(false);
+      final ExtendedBlock lastBlock = getCommittedBlock();
       TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
       try {
         completeFile(lastBlock);
@@ -435,10 +433,35 @@ public class DFSStripedOutputStream extends DFSOutputStream {
         scope.close();
       }
       dfsClient.endFileLease(fileId);
-    } catch (ClosedChannelException e) {
+    } catch (ClosedChannelException ignored) {
     } finally {
       setClosed();
     }
   }
 
+  /**
+   * Generate the block which is reported and will be committed in NameNode.
+   * Need to go through all the streamers writing data blocks and add their
+   * bytesCurBlock together. Note that at this time all streamers have been
+   * closed. Also this calculation can cover streamers with writing failures.
+   *
+   * @return An ExtendedBlock with size of the whole block group.
+   */
+  ExtendedBlock getCommittedBlock() throws IOException {
+    ExtendedBlock b = getLeadingStreamer().getBlock();
+    if (b == null) {
+      return null;
+    }
+    final ExtendedBlock block = new ExtendedBlock(b);
+    final boolean atBlockGroupBoundary =
+        getLeadingStreamer().getBytesCurBlock() == 0 &&
+            getLeadingStreamer().getBlock() != null &&
+            getLeadingStreamer().getBlock().getNumBytes() > 0;
+    for (int i = 1; i < numDataBlocks; i++) {
+      block.setNumBytes(block.getNumBytes() +
+          (atBlockGroupBoundary ? streamers.get(i).getBlock().getNumBytes() :
+              streamers.get(i).getBytesCurBlock()));
+    }
+    return block;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/567105ef/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index 5614852..19c205e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -22,7 +22,6 @@ import java.util.List;
 
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@@ -37,6 +36,10 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
+
 /****************************************************************************
  * The StripedDataStreamer class is used by {@link DFSStripedOutputStream}.
  * There are two kinds of StripedDataStreamer, leading streamer and ordinary
@@ -47,9 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
  ****************************************************************************/
 public class StripedDataStreamer extends DataStreamer {
   private final short index;
-  private final  List<BlockingQueue<LocatedBlock>> stripedBlocks;
-  private static short blockGroupSize = HdfsConstants.NUM_DATA_BLOCKS
-      + HdfsConstants.NUM_PARITY_BLOCKS;
+  private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
   private boolean hasCommittedBlock = false;
 
   StripedDataStreamer(HdfsFileStatus stat, ExtendedBlock block,
@@ -88,66 +89,38 @@ public class StripedDataStreamer extends DataStreamer {
   }
 
   private boolean isParityStreamer() {
-    return index >= HdfsConstants.NUM_DATA_BLOCKS;
+    return index >= NUM_DATA_BLOCKS;
   }
 
   @Override
   protected void endBlock() {
     if (!isLeadingStreamer() && !isParityStreamer()) {
-      //before retrieving a new block, transfer the finished block to
-      //leading streamer
+      // before retrieving a new block, transfer the finished block to
+      // leading streamer
       LocatedBlock finishedBlock = new LocatedBlock(
           new ExtendedBlock(block.getBlockPoolId(), block.getBlockId(),
-                       block.getNumBytes(),block.getGenerationStamp()), null);
-      try{
+              block.getNumBytes(), block.getGenerationStamp()), null);
+      try {
         boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
             TimeUnit.SECONDS);
-      }catch (InterruptedException ie) {
-      //TODO: Handle InterruptedException (HDFS-7786)
+      } catch (InterruptedException ie) {
+        // TODO: Handle InterruptedException (HDFS-7786)
       }
     }
     super.endBlock();
   }
 
-  /**
-   * This function is called after the streamer is closed.
-   */
-  void countTailingBlockGroupBytes () throws IOException {
-    if (isLeadingStreamer()) {
-      //when committing a block group, leading streamer has to adjust
-      // {@link block} including the size of block group
-      for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
-        try {
-          LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
-              TimeUnit.SECONDS);
-          if (finishedLocatedBlock == null) {
-            throw new IOException("Fail to get finished LocatedBlock " +
-                "from streamer, i=" + i);
-          }
-          ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
-          long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
-          if (block != null) {
-            block.setNumBytes(block.getNumBytes() + bytes);
-          }
-        } catch (InterruptedException ie) {
-          DFSClient.LOG.info("InterruptedException received when " +
-              "putting a block to stripeBlocks, ie = " + ie);
-        }
-      }
-    }
-  }
-
   @Override
   protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
       throws IOException {
     LocatedBlock lb = null;
     if (isLeadingStreamer()) {
-      if(hasCommittedBlock) {
+      if (hasCommittedBlock) {
         /**
          * when committing a block group, leading streamer has to adjust
          * {@link block} to include the size of block group
          */
-        for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
+        for (int i = 1; i < NUM_DATA_BLOCKS; i++) {
           try {
             LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
                 TimeUnit.SECONDS);
@@ -157,7 +130,7 @@ public class StripedDataStreamer extends DataStreamer {
             }
             ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
             long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
-            if(block != null) {
+            if (block != null) {
               block.setNumBytes(block.getNumBytes() + bytes);
             }
           } catch (InterruptedException ie) {
@@ -171,14 +144,13 @@ public class StripedDataStreamer extends DataStreamer {
       hasCommittedBlock = true;
       assert lb instanceof LocatedStripedBlock;
       DFSClient.LOG.debug("Leading streamer obtained bg " + lb);
-      LocatedBlock[] blocks = StripedBlockUtil.
-          parseStripedBlockGroup((LocatedStripedBlock) lb,
-              HdfsConstants.BLOCK_STRIPED_CELL_SIZE, HdfsConstants.NUM_DATA_BLOCKS,
-              HdfsConstants.NUM_PARITY_BLOCKS
-          );
-      assert blocks.length == blockGroupSize :
+      LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
+          (LocatedStripedBlock) lb, BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS,
+          NUM_PARITY_BLOCKS);
+      assert blocks.length == (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) :
           "Fail to get block group from namenode: blockGroupSize: " +
-              blockGroupSize + ", blocks.length: " + blocks.length;
+              (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length: " +
+              blocks.length;
       lb = blocks[0];
       for (int i = 1; i < blocks.length; i++) {
         try {
@@ -199,7 +171,7 @@ public class StripedDataStreamer extends DataStreamer {
       }
     } else {
       try {
-        //wait 90 seconds to get a block from the queue
+        // wait 90 seconds to get a block from the queue
         lb = stripedBlocks.get(index).poll(90, TimeUnit.SECONDS);
       } catch (InterruptedException ie) {
         DFSClient.LOG.info("InterruptedException received when retrieving " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/567105ef/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 01422db..a0f945b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -597,8 +598,20 @@ public class BlockManager {
   }
 
   public short getMinStorageNum(BlockInfo block) {
-    return block.isStriped() ?
-        ((BlockInfoStriped) block).getDataBlockNum() : minReplication;
+    if (block.isStriped()) {
+      final BlockInfoStriped sblock = (BlockInfoStriped) block;
+      short dataBlockNum = sblock.getDataBlockNum();
+      if (sblock.isComplete() ||
+          sblock.getBlockUCState() == BlockUCState.COMMITTED) {
+        // if the sblock is committed/completed and its length is less than a
+        // full stripe, the minimum storage number needs to be adjusted
+        dataBlockNum = (short) Math.min(dataBlockNum,
+            (sblock.getNumBytes() - 1) / HdfsConstants.BLOCK_STRIPED_CELL_SIZE + 1);
+      }
+      return dataBlockNum;
+    } else {
+      return minReplication;
+    }
   }
 
   public boolean hasMinStorage(BlockInfo block) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/567105ef/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
index 4a09bda..cc20f40 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -1,5 +1,6 @@
 package org.apache.hadoop.hdfs;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -20,6 +21,8 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.junit.After;
@@ -42,8 +45,8 @@ public class TestDFSStripedOutputStream {
   private DistributedFileSystem fs;
   private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
   private final int stripesPerBlock = 4;
-  int blockSize = cellSize * stripesPerBlock;
-  private int mod = 29;
+  private final int blockSize = cellSize * stripesPerBlock;
+  private final RawErasureEncoder encoder = new RSRawEncoder();
 
   @Before
   public void setup() throws IOException {
@@ -53,6 +56,7 @@ public class TestDFSStripedOutputStream {
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
     cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
     fs = cluster.getFileSystem();
+    encoder.initialize(dataBlocks, parityBlocks, cellSize);
   }
 
   @After
@@ -144,60 +148,27 @@ public class TestDFSStripedOutputStream {
   }
 
   private byte getByte(long pos) {
+    int mod = 29;
     return (byte) (pos % mod + 1);
   }
 
-  private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
-      throws IOException {
-    Path TestPath = new Path(src);
-    byte[] bytes = generateBytes(writeBytes);
-    DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
-
-    //check file length
-    FileStatus status = fs.getFileStatus(TestPath);
-    long fileLength = status.getLen();
-    if (fileLength != writeBytes) {
-      Assert.fail("File Length error: expect=" + writeBytes
-          + ", actual=" + fileLength);
-    }
-
-    DFSStripedInputStream dis = new DFSStripedInputStream(
-        fs.getClient(), src, true);
-    byte[] buf = new byte[writeBytes + 100];
-    int readLen = dis.read(0, buf, 0, buf.length);
-    readLen = readLen >= 0 ? readLen : 0;
-    if (readLen != writeBytes) {
-      Assert.fail("The length of file is not correct.");
-    }
-
-    for (int i = 0; i < writeBytes; i++) {
-      if (getByte(i) != buf[i]) {
-        Assert.fail("Byte at i = " + i + " is wrongly written.");
-      }
-    }
-  }
-
   private void testOneFile(String src, int writeBytes)
       throws IOException {
-    Path TestPath = new Path(src);
+    Path testPath = new Path(src);
 
-    int allBlocks = dataBlocks + parityBlocks;
     byte[] bytes = generateBytes(writeBytes);
-    DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
+    DFSTestUtil.writeFile(fs, testPath, new String(bytes));
 
-    //check file length
-    FileStatus status = fs.getFileStatus(TestPath);
+    // check file length
+    FileStatus status = fs.getFileStatus(testPath);
     long fileLength = status.getLen();
-    if (fileLength != writeBytes) {
-      Assert.fail("File Length error: expect=" + writeBytes
-          + ", actual=" + fileLength);
-    }
+    Assert.assertEquals(writeBytes, fileLength);
 
     List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
     LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
 
     for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
-      assert firstBlock instanceof LocatedStripedBlock;
+      Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
       LocatedBlock[] blocks = StripedBlockUtil.
           parseStripedBlockGroup((LocatedStripedBlock) firstBlock,
               cellSize, dataBlocks, parityBlocks);
@@ -205,15 +176,14 @@ public class TestDFSStripedOutputStream {
       blockGroupList.add(oneGroup);
     }
 
-    //test each block group
+    // test each block group
     for (int group = 0; group < blockGroupList.size(); group++) {
       //get the data of this block
       List<LocatedBlock> blockList = blockGroupList.get(group);
       byte[][] dataBlockBytes = new byte[dataBlocks][];
-      byte[][] parityBlockBytes = new byte[allBlocks - dataBlocks][];
+      byte[][] parityBlockBytes = new byte[parityBlocks][];
 
-
-      //for each block, use BlockReader to read data
+      // for each block, use BlockReader to read data
       for (int i = 0; i < blockList.size(); i++) {
         LocatedBlock lblock = blockList.get(i);
         if (lblock == null) {
@@ -269,19 +239,20 @@ public class TestDFSStripedOutputStream {
               }
             }).build();
 
-        blockReader.readAll(blockBytes, 0, (int)block.getNumBytes());
+        blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
         blockReader.close();
       }
 
-      //check if we write the data correctly
-      for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length; blkIdxInGroup++) {
-        byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup];
+      // check if we write the data correctly
+      for (int blkIdxInGroup = 0; blkIdxInGroup < dataBlockBytes.length;
+           blkIdxInGroup++) {
+        final byte[] actualBlkBytes = dataBlockBytes[blkIdxInGroup];
         if (actualBlkBytes == null) {
           continue;
         }
         for (int posInBlk = 0; posInBlk < actualBlkBytes.length; posInBlk++) {
           byte expected;
-          //calculate the postion of this byte in the file
+          // calculate the position of this byte in the file
           long posInFile = StripedBlockUtil.offsetInBlkToOffsetInBG(cellSize,
               dataBlocks, posInBlk, blkIdxInGroup) +
               group * blockSize * dataBlocks;
@@ -291,15 +262,94 @@ public class TestDFSStripedOutputStream {
             expected = getByte(posInFile);
           }
 
-          if (expected != actualBlkBytes[posInBlk]) {
-            Assert.fail("Unexpected byte " + actualBlkBytes[posInBlk] + ", expect " + expected
-                + ". Block group index is " + group +
-                ", stripe index is " + posInBlk / cellSize +
-                ", cell index is " + blkIdxInGroup + ", byte index is " + posInBlk % cellSize);
+          String s = "Unexpected byte " + actualBlkBytes[posInBlk]
+              + ", expect " + expected
+              + ". Block group index is " + group
+              + ", stripe index is " + posInBlk / cellSize
+              + ", cell index is " + blkIdxInGroup
+              + ", byte index is " + posInBlk % cellSize;
+          Assert.assertEquals(s, expected, actualBlkBytes[posInBlk]);
+        }
+      }
+
+      // verify the parity blocks
+      final ByteBuffer[] parityBuffers = new ByteBuffer[parityBlocks];
+      final long groupSize = lbs.getLocatedBlocks().get(group).getBlockSize();
+      int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(groupSize,
+          cellSize, dataBlocks, dataBlocks);
+      for (int i = 0; i < parityBlocks; i++) {
+        parityBuffers[i] = ByteBuffer.allocate(parityBlkSize);
+      }
+      final int numStripes = (int) (groupSize - 1) / stripeDataSize() + 1;
+      for (int i = 0; i < numStripes; i++) {
+        final int parityCellSize = i < numStripes - 1 || parityBlkSize % cellSize == 0
+            ? cellSize : parityBlkSize % cellSize;
+        ByteBuffer[] stripeBuf = new ByteBuffer[dataBlocks];
+        for (int k = 0; k < stripeBuf.length; k++) {
+          stripeBuf[k] = ByteBuffer.allocate(cellSize);
+        }
+        for (int j = 0; j < dataBlocks; j++) {
+          if (dataBlockBytes[j] != null) {
+            int length = Math.min(cellSize,
+                dataBlockBytes[j].length - cellSize * i);
+            if (length > 0) {
+              stripeBuf[j].put(dataBlockBytes[j], cellSize * i, length);
+            }
+          }
+          final long pos = stripeBuf[j].position();
+          for (int k = 0; k < parityCellSize - pos; k++) {
+            stripeBuf[j].put((byte) 0);
           }
+          stripeBuf[j].flip();
         }
+        ByteBuffer[] parityBuf = new ByteBuffer[parityBlocks];
+        for (int j = 0; j < parityBlocks; j++) {
+          parityBuf[j] = ByteBuffer.allocate(cellSize);
+          for (int k = 0; k < parityCellSize; k++) {
+            parityBuf[j].put((byte) 0);
+          }
+          parityBuf[j].flip();
+        }
+
+        encoder.encode(stripeBuf, parityBuf);
+        for (int j = 0; j < parityBlocks; j++) {
+          parityBuffers[j].put(parityBuf[j]);
+        }
+      }
+
+      for (int i = 0; i < parityBlocks; i++) {
+        Assert.assertArrayEquals(parityBuffers[i].array(), parityBlockBytes[i]);
       }
     }
   }
 
+  private void testReadWriteOneFile(String src, int writeBytes)
+      throws IOException {
+    Path TestPath = new Path(src);
+    byte[] bytes = generateBytes(writeBytes);
+    DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
+
+    //check file length
+    FileStatus status = fs.getFileStatus(TestPath);
+    long fileLength = status.getLen();
+    if (fileLength != writeBytes) {
+      Assert.fail("File Length error: expect=" + writeBytes
+          + ", actual=" + fileLength);
+    }
+
+    DFSStripedInputStream dis = new DFSStripedInputStream(
+        fs.getClient(), src, true);
+    byte[] buf = new byte[writeBytes + 100];
+    int readLen = dis.read(0, buf, 0, buf.length);
+    readLen = readLen >= 0 ? readLen : 0;
+    if (readLen != writeBytes) {
+      Assert.fail("The length of file is not correct.");
+    }
+
+    for (int i = 0; i < writeBytes; i++) {
+      if (getByte(i) != buf[i]) {
+        Assert.fail("Byte at i = " + i + " is wrongly written.");
+      }
+    }
+  }
 }