You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/04 19:57:51 UTC

[17/50] hadoop git commit: HDFS-7889 Subclass DFSOutputStream to support writing striping layout files. Contributed by Li Bo

HDFS-7889 Subclass DFSOutputStream to support writing striping layout files. Contributed by Li Bo


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/16d6f9ac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/16d6f9ac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/16d6f9ac

Branch: refs/heads/HDFS-7285
Commit: 16d6f9ac9dfc3c6292421ca45f3e9bc796b57299
Parents: 41128e9
Author: Kai Zheng <ka...@intel.com>
Authored: Sat Apr 11 01:03:37 2015 +0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon May 4 10:13:23 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   4 +-
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  13 +-
 .../java/org/apache/hadoop/hdfs/DFSPacket.java  |  26 +-
 .../hadoop/hdfs/DFSStripedOutputStream.java     | 439 +++++++++++++++++++
 .../org/apache/hadoop/hdfs/DataStreamer.java    |  11 +-
 .../apache/hadoop/hdfs/StripedDataStreamer.java | 241 ++++++++++
 .../hadoop/hdfs/TestDFSStripedOutputStream.java | 311 +++++++++++++
 7 files changed, 1031 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d6f9ac/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 1e695c4..753795a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -56,4 +56,6 @@
     
     HDFS-8074. Define a system-wide default EC schema. (Kai Zheng)
 
-    HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng)
\ No newline at end of file
+    HDFS-8104. Make hard-coded values consistent with the system default schema first before remove them. (Kai Zheng)
+
+    HDFS-7889. Subclass DFSOutputStream to support writing striping layout files. (Li Bo via Kai Zheng)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d6f9ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index ae5d3eb..0280d71 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -266,8 +266,14 @@ public class DFSOutputStream extends FSOutputSummer
         }
       }
       Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
-      final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
-          flag, progress, checksum, favoredNodes);
+      final DFSOutputStream out;
+      if(stat.getReplication() == 0) {
+        out = new DFSStripedOutputStream(dfsClient, src, stat,
+            flag, progress, checksum, favoredNodes);
+      } else {
+        out = new DFSOutputStream(dfsClient, src, stat,
+            flag, progress, checksum, favoredNodes);
+      }
       out.start();
       return out;
     } finally {
@@ -347,6 +353,9 @@ public class DFSOutputStream extends FSOutputSummer
       String[] favoredNodes) throws IOException {
     TraceScope scope =
         dfsClient.getPathTraceScope("newStreamForAppend", src);
+	if(stat.getReplication() == 0) {
+      throw new IOException("Not support appending to a striping layout file yet.");
+    }
     try {
       final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
           progress, lastBlock, stat, checksum, favoredNodes);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d6f9ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
index 22055c3..9cd1ec1 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
 
@@ -113,6 +114,19 @@ class DFSPacket {
     dataPos += len;
   }
 
+  synchronized void writeData(ByteBuffer inBuffer, int len)
+      throws ClosedChannelException {
+    checkBuffer();
+    len =  len > inBuffer.remaining() ? inBuffer.remaining() : len;
+    if (dataPos + len > buf.length) {
+      throw new BufferOverflowException();
+    }
+    for (int i = 0; i < len; i++) {
+      buf[dataPos + i] = inBuffer.get();
+    }
+    dataPos += len;
+  }
+
   /**
    * Write checksums to this packet
    *
@@ -222,7 +236,7 @@ class DFSPacket {
    *
    * @return true if the packet is the last packet
    */
-  boolean isLastPacketInBlock(){
+  boolean isLastPacketInBlock() {
     return lastPacketInBlock;
   }
 
@@ -231,7 +245,7 @@ class DFSPacket {
    *
    * @return the sequence number of this packet
    */
-  long getSeqno(){
+  long getSeqno() {
     return seqno;
   }
 
@@ -240,14 +254,14 @@ class DFSPacket {
    *
    * @return the number of chunks in this packet
    */
-  synchronized int getNumChunks(){
+  synchronized int getNumChunks() {
     return numChunks;
   }
 
   /**
    * increase the number of chunks by one
    */
-  synchronized void incNumChunks(){
+  synchronized void incNumChunks() {
     numChunks++;
   }
 
@@ -256,7 +270,7 @@ class DFSPacket {
    *
    * @return the maximum number of packets
    */
-  int getMaxChunks(){
+  int getMaxChunks() {
     return maxChunks;
   }
 
@@ -265,7 +279,7 @@ class DFSPacket {
    *
    * @param syncBlock if to sync block
    */
-  synchronized void setSyncBlock(boolean syncBlock){
+  synchronized void setSyncBlock(boolean syncBlock) {
     this.syncBlock = syncBlock;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d6f9ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
new file mode 100644
index 0000000..aded4fe
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+
+/****************************************************************
+ * The DFSStripedOutputStream class supports writing files in striped
+ * layout. Each stripe contains a sequence of cells and multiple
+ * {@link StripedDataStreamer}s in DFSStripedOutputStream are responsible
+ * for writing the cells to different datanodes.
+ *
+ ****************************************************************/
+
+@InterfaceAudience.Private
+public class DFSStripedOutputStream extends DFSOutputStream {
+
+  private final List<StripedDataStreamer> streamers;
+  /**
+   * Size of each striping cell, must be a multiple of bytesPerChecksum
+   */
+  private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private ByteBuffer[] cellBuffers;
+  private final short blockGroupBlocks = HdfsConstants.NUM_DATA_BLOCKS
+      + HdfsConstants.NUM_PARITY_BLOCKS;
+  private final short blockGroupDataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  private int curIdx = 0;
+  /* bytes written in current block group */
+  private long currentBlockGroupBytes = 0;
+
+  //TODO: Use ErasureCoder interface (HDFS-7781)
+  private RawErasureEncoder encoder;
+
+  private StripedDataStreamer getLeadingStreamer() {
+    return streamers.get(0);
+  }
+
+  private long getBlockGroupSize() {
+    return blockSize * HdfsConstants.NUM_DATA_BLOCKS;
+  }
+
+  /** Construct a new output stream for creating a file. */
+  DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
+                         EnumSet<CreateFlag> flag, Progressable progress,
+                         DataChecksum checksum, String[] favoredNodes)
+                         throws IOException {
+    super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
+    DFSClient.LOG.info("Creating striped output stream");
+    if (blockGroupBlocks <= 1) {
+      throw new IOException("The block group must contain more than one block.");
+    }
+
+    cellBuffers = new ByteBuffer[blockGroupBlocks];
+    List<BlockingQueue<LocatedBlock>> stripeBlocks = new ArrayList<>();
+
+    for (int i = 0; i < blockGroupBlocks; i++) {
+      stripeBlocks.add(new LinkedBlockingQueue<LocatedBlock>(blockGroupBlocks));
+      try {
+        cellBuffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
+      } catch (InterruptedException ie) {
+        final InterruptedIOException iioe = new InterruptedIOException(
+            "create cell buffers");
+        iioe.initCause(ie);
+        throw iioe;
+      }
+    }
+    encoder = new RSRawEncoder();
+    encoder.initialize(blockGroupDataBlocks,
+        blockGroupBlocks - blockGroupDataBlocks, cellSize);
+
+    streamers = new ArrayList<>(blockGroupBlocks);
+    for (short i = 0; i < blockGroupBlocks; i++) {
+      StripedDataStreamer streamer = new StripedDataStreamer(stat, null,
+          dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
+          i, stripeBlocks);
+      if (favoredNodes != null && favoredNodes.length != 0) {
+        streamer.setFavoredNodes(favoredNodes);
+      }
+      streamers.add(streamer);
+    }
+
+    refreshStreamer();
+  }
+
+  private void refreshStreamer() {
+    streamer = streamers.get(curIdx);
+  }
+
+  private void moveToNextStreamer() {
+    curIdx = (curIdx + 1) % blockGroupBlocks;
+    refreshStreamer();
+  }
+
+  /**
+   * encode the buffers.
+   * After encoding, flip each buffer.
+   *
+   * @param buffers data buffers + parity buffers
+   */
+  private void encode(ByteBuffer[] buffers) {
+    ByteBuffer[] dataBuffers = new ByteBuffer[blockGroupDataBlocks];
+    ByteBuffer[] parityBuffers = new ByteBuffer[blockGroupBlocks - blockGroupDataBlocks];
+    for (int i = 0; i < blockGroupBlocks; i++) {
+      if (i < blockGroupDataBlocks) {
+        dataBuffers[i] = buffers[i];
+      } else {
+        parityBuffers[i - blockGroupDataBlocks] = buffers[i];
+      }
+    }
+    encoder.encode(dataBuffers, parityBuffers);
+  }
+
+  /**
+   * Generate packets from a given buffer
+   *
+   * @param byteBuffer the given buffer to generate packets
+   * @return packets generated
+   * @throws IOException
+   */
+  private List<DFSPacket> generatePackets(ByteBuffer byteBuffer)
+      throws IOException{
+    List<DFSPacket> packets = new ArrayList<>();
+    while (byteBuffer.remaining() > 0) {
+      DFSPacket p = createPacket(packetSize, chunksPerPacket,
+          streamer.getBytesCurBlock(),
+          streamer.getAndIncCurrentSeqno(), false);
+      int maxBytesToPacket = p.getMaxChunks() * bytesPerChecksum;
+      int toWrite = byteBuffer.remaining() > maxBytesToPacket ?
+          maxBytesToPacket: byteBuffer.remaining();
+      p.writeData(byteBuffer, toWrite);
+      streamer.incBytesCurBlock(toWrite);
+      packets.add(p);
+    }
+    return packets;
+  }
+
+  @Override
+  protected synchronized void writeChunk(byte[] b, int offset, int len,
+      byte[] checksum, int ckoff, int cklen) throws IOException {
+    super.writeChunk(b, offset, len, checksum, ckoff, cklen);
+
+    if (getSizeOfCellnBuffer(curIdx) <= cellSize) {
+      addToCellBuffer(b, offset, len);
+    } else {
+      String msg = "Writing a chunk should not overflow the cell buffer.";
+      DFSClient.LOG.info(msg);
+      throw new IOException(msg);
+    }
+
+
+    // If current packet has not been enqueued for transmission,
+    // but the cell buffer is full, we need to enqueue the packet
+    if (currentPacket != null && getSizeOfCellnBuffer(curIdx) == cellSize) {
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("DFSClient writeChunk cell buffer full seqno=" +
+            currentPacket.getSeqno() +
+            ", curIdx=" + curIdx +
+            ", src=" + src +
+            ", bytesCurBlock=" + streamer.getBytesCurBlock() +
+            ", blockSize=" + blockSize +
+            ", appendChunk=" + streamer.getAppendChunk());
+      }
+      streamer.waitAndQueuePacket(currentPacket);
+      currentPacket = null;
+      adjustChunkBoundary();
+      endBlock();
+    }
+
+    // Two extra steps are needed when a striping cell is full:
+    // 1. Forward the current index pointer
+    // 2. Generate parity packets if a full stripe of data cells are present
+    if (getSizeOfCellnBuffer(curIdx) == cellSize) {
+      //move curIdx to next cell
+      moveToNextStreamer();
+      //When all data cells in a stripe are ready, we need to encode
+      //them and generate some parity cells. These cells will be
+      //converted to packets and put to their DataStreamer's queue.
+      if (curIdx == blockGroupDataBlocks) {
+        //encode the data cells
+        for (int k = 0; k < blockGroupDataBlocks; k++) {
+          cellBuffers[k].flip();
+        }
+        encode(cellBuffers);
+        for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) {
+          ByteBuffer parityBuffer = cellBuffers[i];
+          List<DFSPacket> packets = generatePackets(parityBuffer);
+          for (DFSPacket p : packets) {
+            currentPacket = p;
+            streamer.waitAndQueuePacket(currentPacket);
+            currentPacket = null;
+          }
+          endBlock();
+          moveToNextStreamer();
+        }
+        //read next stripe to cellBuffers
+        clearCellBuffers();
+      }
+    }
+  }
+
+  private void addToCellBuffer(byte[] b, int off, int len) {
+    cellBuffers[curIdx].put(b, off, len);
+  }
+
+  private int getSizeOfCellnBuffer(int cellIndex) {
+    return cellBuffers[cellIndex].position();
+  }
+
+  private void clearCellBuffers() {
+    for (int i = 0; i< blockGroupBlocks; i++) {
+      cellBuffers[i].clear();
+    }
+  }
+
+  private int stripeDataSize() {
+    return blockGroupDataBlocks * cellSize;
+  }
+
+  private void notSupported(String headMsg)
+      throws IOException{
+      throw new IOException(
+          headMsg + " is now not supported for striping layout.");
+  }
+
+  @Override
+  public void hflush() throws IOException {
+    notSupported("hflush");
+  }
+
+  @Override
+  public void hsync() throws IOException {
+    notSupported("hsync");
+  }
+
+
+  @Override
+  protected synchronized void start() {
+    for (StripedDataStreamer streamer : streamers) {
+      streamer.start();
+    }
+  }
+
+  @Override
+  synchronized void abort() throws IOException {
+    if (isClosed()) {
+      return;
+    }
+    for (StripedDataStreamer streamer : streamers) {
+      streamer.setLastException(new IOException("Lease timeout of "
+          + (dfsClient.getHdfsTimeout()/1000) + " seconds expired."));
+    }
+    closeThreads(true);
+    dfsClient.endFileLease(fileId);
+  }
+
+  //TODO: Handle slow writers (HDFS-7786)
+  //Cuurently only check if the leading streamer is terminated
+  boolean isClosed() {
+    return closed || getLeadingStreamer().streamerClosed();
+  }
+
+  // shutdown datastreamer and responseprocessor threads.
+  // interrupt datastreamer if force is true
+  @Override
+  protected void closeThreads(boolean force) throws IOException {
+    StripedDataStreamer leadingStreamer = null;
+    for (StripedDataStreamer streamer : streamers) {
+      try {
+        streamer.close(force);
+        streamer.join();
+        streamer.closeSocket();
+        if (streamer.isLeadingStreamer()) {
+          leadingStreamer = streamer;
+        } else {
+          streamer.countTailingBlockGroupBytes();
+        }
+
+      } catch (InterruptedException e) {
+        throw new IOException("Failed to shutdown streamer");
+      } finally {
+        streamer.setSocketToNull();
+        setClosed();
+      }
+    }
+    leadingStreamer.countTailingBlockGroupBytes();
+  }
+
+  @Override
+  public synchronized void write(int b) throws IOException {
+    super.write(b);
+    currentBlockGroupBytes = (currentBlockGroupBytes + 1) % getBlockGroupSize();
+  }
+
+  @Override
+  public synchronized void write(byte b[], int off, int len)
+      throws IOException {
+    super.write(b, off, len);
+    currentBlockGroupBytes = (currentBlockGroupBytes + len) % getBlockGroupSize();
+  }
+
+  private void writeParityCellsForLastStripe() throws IOException{
+    if(currentBlockGroupBytes == 0 ||
+        currentBlockGroupBytes % stripeDataSize() == 0)
+      return;
+    int lastStripeLen =(int)(currentBlockGroupBytes % stripeDataSize());
+    // Size of parity cells should equal the size of the first cell, if it
+    // is not full.
+    int parityCellSize = cellSize;
+    int index = lastStripeLen / cellSize;
+    if (lastStripeLen < cellSize) {
+      parityCellSize = lastStripeLen;
+      index++;
+    }
+    for (int i = 0; i < blockGroupBlocks; i++) {
+      if (i >= index) {
+        int position = cellBuffers[i].position();
+        for (int j = 0; j < parityCellSize - position; j++) {
+          cellBuffers[i].put((byte)0);
+        }
+      }
+      cellBuffers[i].flip();
+    }
+    encode(cellBuffers);
+
+    //write parity cells
+    curIdx = blockGroupDataBlocks;
+    refreshStreamer();
+    for (int i = blockGroupDataBlocks; i < blockGroupBlocks; i++) {
+      ByteBuffer parityBuffer = cellBuffers[i];
+      List<DFSPacket> packets = generatePackets(parityBuffer);
+      for (DFSPacket p : packets) {
+        currentPacket = p;
+        streamer.waitAndQueuePacket(currentPacket);
+        currentPacket = null;
+      }
+      endBlock();
+      moveToNextStreamer();
+    }
+
+    clearCellBuffers();
+  }
+
+  @Override
+  void setClosed() {
+    super.setClosed();
+    for (int i = 0; i < blockGroupBlocks; i++) {
+      byteArrayManager.release(cellBuffers[i].array());
+      streamers.get(i).release();
+    }
+  }
+
+  @Override
+  protected synchronized void closeImpl() throws IOException {
+    if (isClosed()) {
+      IOException e = getLeadingStreamer().getLastException().getAndSet(null);
+      if (e == null)
+        return;
+      else
+        throw e;
+    }
+
+    try {
+      // flush from all upper layers
+      flushBuffer();
+      if (currentPacket != null) {
+        streamer.waitAndQueuePacket(currentPacket);
+        currentPacket = null;
+      }
+      //if the last stripe is incomplete, generate and write parity cells
+      writeParityCellsForLastStripe();
+
+      for (int i = 0; i < blockGroupBlocks; i++) {
+        curIdx = i;
+        refreshStreamer();
+        if (streamer.getBytesCurBlock()!= 0 ||
+            currentBlockGroupBytes < getBlockGroupSize()) {
+          // send an empty packet to mark the end of the block
+          currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
+              streamer.getAndIncCurrentSeqno(), true);
+          currentPacket.setSyncBlock(shouldSyncBlock);
+        }
+        // flush all data to Datanode
+        flushInternal();
+      }
+
+      // get last block before destroying the streamer
+      ExtendedBlock lastBlock = streamers.get(0).getBlock();
+      closeThreads(false);
+      TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
+      try {
+        completeFile(lastBlock);
+      } finally {
+        scope.close();
+      }
+      dfsClient.endFileLease(fileId);
+    } catch (ClosedChannelException e) {
+    } finally {
+      setClosed();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d6f9ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 43787ab..0f5c54e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -210,7 +210,7 @@ class DataStreamer extends Daemon {
   }
 
   private volatile boolean streamerClosed = false;
-  private ExtendedBlock block; // its length is number of bytes acked
+  protected ExtendedBlock block; // its length is number of bytes acked
   private Token<BlockTokenIdentifier> accessToken;
   private DataOutputStream blockStream;
   private DataInputStream blockReplyStream;
@@ -218,6 +218,7 @@ class DataStreamer extends Daemon {
   private volatile DatanodeInfo[] nodes = null; // list of targets for current block
   private volatile StorageType[] storageTypes = null;
   private volatile String[] storageIDs = null;
+  protected String[] favoredNodes;
   volatile boolean hasError = false;
   volatile int errorIndex = -1;
   // Restarting node index
@@ -244,12 +245,12 @@ class DataStreamer extends Daemon {
   private final LastExceptionInStreamer lastException = new LastExceptionInStreamer();
   private Socket s;
 
-  private final DFSClient dfsClient;
-  private final String src;
+  protected final DFSClient dfsClient;
+  protected final String src;
   /** Only for DataTransferProtocol.writeBlock(..) */
   private final DataChecksum checksum4WriteBlock;
   private final Progressable progress;
-  private final HdfsFileStatus stat;
+  protected final HdfsFileStatus stat;
   // appending to existing partial block
   private volatile boolean appendChunk = false;
   // both dataQueue and ackQueue are protected by dataQueue lock
@@ -365,7 +366,7 @@ class DataStreamer extends Daemon {
     stage = BlockConstructionStage.DATA_STREAMING;
   }
 
-  private void endBlock() {
+  protected void endBlock() {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Closing old block " + block);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d6f9ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
new file mode 100644
index 0000000..710d92d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.util.List;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/****************************************************************************
+ * The StripedDataStreamer class is used by {@link DFSStripedOutputStream}.
+ * There are two kinds of StripedDataStreamer, leading streamer and ordinary
+ * stream. Leading streamer requests a block group from NameNode, unwraps
+ * it to located blocks and transfers each located block to its corresponding
+ * ordinary streamer via a blocking queue.
+ *
+ ****************************************************************************/
+public class StripedDataStreamer extends DataStreamer {
+  private final short index;
+  private final  List<BlockingQueue<LocatedBlock>> stripedBlocks;
+  private static short blockGroupSize = HdfsConstants.NUM_DATA_BLOCKS
+      + HdfsConstants.NUM_PARITY_BLOCKS;
+  private boolean hasCommittedBlock = false;
+
+  StripedDataStreamer(HdfsFileStatus stat, ExtendedBlock block,
+                      DFSClient dfsClient, String src,
+                      Progressable progress, DataChecksum checksum,
+                      AtomicReference<CachingStrategy> cachingStrategy,
+                      ByteArrayManager byteArrayManage, short index,
+                      List<BlockingQueue<LocatedBlock>> stripedBlocks) {
+    super(stat,block, dfsClient, src, progress, checksum, cachingStrategy,
+        byteArrayManage);
+    this.index = index;
+    this.stripedBlocks = stripedBlocks;
+  }
+
+  /**
+   * Construct a data streamer for appending to the last partial block
+   * @param lastBlock last block of the file to be appended
+   * @param stat status of the file to be appended
+   * @throws IOException if error occurs
+   */
+  StripedDataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
+                      DFSClient dfsClient, String src,
+                      Progressable progress, DataChecksum checksum,
+                      AtomicReference<CachingStrategy> cachingStrategy,
+                      ByteArrayManager byteArrayManage, short index,
+                      List<BlockingQueue<LocatedBlock>> stripedBlocks)
+      throws IOException {
+    super(lastBlock, stat, dfsClient, src, progress, checksum, cachingStrategy,
+        byteArrayManage);
+    this.index = index;
+    this.stripedBlocks = stripedBlocks;
+  }
+
+  public boolean isLeadingStreamer () {
+    return index == 0;
+  }
+
+  private boolean isParityStreamer() {
+    return index >= HdfsConstants.NUM_DATA_BLOCKS;
+  }
+
+  @Override
+  protected void endBlock() {
+    if (!isLeadingStreamer() && !isParityStreamer()) {
+      //before retrieving a new block, transfer the finished block to
+      //leading streamer
+      LocatedBlock finishedBlock = new LocatedBlock(
+          new ExtendedBlock(block.getBlockPoolId(), block.getBlockId(),
+                       block.getNumBytes(),block.getGenerationStamp()), null);
+      try{
+        boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
+            TimeUnit.SECONDS);
+      }catch (InterruptedException ie) {
+      //TODO: Handle InterruptedException (HDFS-7786)
+      }
+    }
+    super.endBlock();
+  }
+
+  /**
+   * This function is called after the streamer is closed.
+   */
+  void countTailingBlockGroupBytes () throws IOException {
+    if (isLeadingStreamer()) {
+      //when committing a block group, leading streamer has to adjust
+      // {@link block} including the size of block group
+      for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
+        try {
+          LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
+              TimeUnit.SECONDS);
+          if (finishedLocatedBlock == null) {
+            throw new IOException("Fail to get finished LocatedBlock " +
+                "from streamer, i=" + i);
+          }
+          ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
+          long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
+          if (block != null) {
+            block.setNumBytes(block.getNumBytes() + bytes);
+          }
+        } catch (InterruptedException ie) {
+          DFSClient.LOG.info("InterruptedException received when " +
+              "putting a block to stripeBlocks, ie = " + ie);
+        }
+      }
+    } else if (!isParityStreamer()) {
+      if (block == null || block.getNumBytes() == 0) {
+        LocatedBlock finishedBlock = new LocatedBlock(null, null);
+        try {
+          boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
+              TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+          //TODO: Handle InterruptedException (HDFS-7786)
+          ie.printStackTrace();
+        }
+      }
+    }
+
+  }
+
+  @Override
+  protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
+      throws IOException {
+    LocatedBlock lb = null;
+    if (isLeadingStreamer()) {
+      if(hasCommittedBlock) {
+        //when committing a block group, leading streamer has to adjust
+        // {@link block} including the size of block group
+        for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
+          try {
+            LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
+                TimeUnit.SECONDS);
+            if (finishedLocatedBlock == null) {
+              throw new IOException("Fail to get finished LocatedBlock " +
+                  "from streamer, i=" + i);
+            }
+            ExtendedBlock finishedBlock = finishedLocatedBlock.getBlock();
+            long bytes = finishedBlock == null ? 0 : finishedBlock.getNumBytes();
+            if(block != null) {
+              block.setNumBytes(block.getNumBytes() + bytes);
+            }
+          } catch (InterruptedException ie) {
+            DFSClient.LOG.info("InterruptedException received when putting" +
+                " a block to stripeBlocks, ie = " + ie);
+          }
+        }
+      }
+
+      lb = super.locateFollowingBlock(excludedNodes);
+      hasCommittedBlock = true;
+      LocatedBlock[] blocks = unwrapBlockGroup(lb);
+      assert blocks.length == blockGroupSize :
+          "Fail to get block group from namenode: blockGroupSize: " +
+              blockGroupSize + ", blocks.length: " + blocks.length;
+      lb = blocks[0];
+      for (int i = 1; i < blocks.length; i++) {
+        try {
+          boolean offSuccess = stripedBlocks.get(i).offer(blocks[i],
+              90, TimeUnit.SECONDS);
+          if(!offSuccess){
+            String msg = "Fail to put block to stripeBlocks. i = " + i;
+            DFSClient.LOG.info(msg);
+            throw new IOException(msg);
+          } else {
+            DFSClient.LOG.info("Allocate a new block to a streamer. i = " + i
+                + ", block: " + blocks[i]);
+          }
+        } catch (InterruptedException ie) {
+          DFSClient.LOG.info("InterruptedException received when putting" +
+              " a block to stripeBlocks, ie = " + ie);
+        }
+      }
+    } else {
+      try {
+        //wait 90 seconds to get a block from the queue
+        lb = stripedBlocks.get(index).poll(90, TimeUnit.SECONDS);
+      } catch (InterruptedException ie) {
+        DFSClient.LOG.info("InterruptedException received when retrieving " +
+            "a block from stripeBlocks, ie = " + ie);
+      }
+    }
+    return lb;
+  }
+
+  /**
+   * Generate other blocks in a block group according to the first one.
+   *
+   * @param firstBlockInGroup the first block in a block group
+   * @return  other blocks in this group
+   */
+  public static LocatedBlock[] unwrapBlockGroup(
+      final LocatedBlock firstBlockInGroup) {
+    ExtendedBlock eb = firstBlockInGroup.getBlock();
+    DatanodeInfo[] locs = firstBlockInGroup.getLocations();
+    String[] storageIDs = firstBlockInGroup.getStorageIDs();
+    StorageType[] storageTypes = firstBlockInGroup.getStorageTypes();
+    Token<BlockTokenIdentifier> blockToken = firstBlockInGroup.getBlockToken();
+    LocatedBlock[] blocksInGroup = new LocatedBlock[locs.length];
+    for (int i = 0; i < blocksInGroup.length; i++) {
+      //each block in a group has the same number of bytes and timestamp
+      ExtendedBlock extendedBlock = new ExtendedBlock(eb.getBlockPoolId(),
+          eb.getBlockId() + i, eb.getNumBytes(), eb.getGenerationStamp());
+      blocksInGroup[i] = new LocatedBlock(extendedBlock,
+          new DatanodeInfo[] {locs[i]}, new String[]{storageIDs[i]},
+          new StorageType[] {storageTypes[i]});
+      blocksInGroup[i].setBlockToken(blockToken);
+    }
+    return blocksInGroup;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d6f9ac/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
new file mode 100644
index 0000000..f5a37f3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
@@ -0,0 +1,311 @@
+package org.apache.hadoop.hdfs;
+
+import java.util.Arrays;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestDFSStripedOutputStream {
+  private int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+
+  private MiniDFSCluster cluster;
+  private Configuration conf = new Configuration();
+  private DistributedFileSystem fs;
+  int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  int blockSize = 8 * 1024 * 1024;
+  int cellsInBlock = blockSize / cellSize;
+  private int mod = 29;
+
+  @Before
+  public void setup() throws IOException {
+    int numDNs = dataBlocks + parityBlocks + 2;
+    Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, cellsInBlock * cellSize);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.getFileSystem().getClient().createErasureCodingZone("/");
+    fs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void TestFileEmpty() throws IOException {
+    testOneFile("/EmptyFile", 0);
+  }
+
+  @Test
+  public void TestFileSmallerThanOneCell1() throws IOException {
+    testOneFile("/SmallerThanOneCell", 1);
+  }
+
+  @Test
+  public void TestFileSmallerThanOneCell2() throws IOException {
+    testOneFile("/SmallerThanOneCell", cellSize - 1);
+  }
+
+  @Test
+  public void TestFileEqualsWithOneCell() throws IOException {
+    testOneFile("/EqualsWithOneCell", cellSize);
+  }
+
+  @Test
+  public void TestFileSmallerThanOneStripe1() throws IOException {
+    testOneFile("/SmallerThanOneStripe", cellSize * dataBlocks - 1);
+  }
+
+  @Test
+  public void TestFileSmallerThanOneStripe2() throws IOException {
+    testOneFile("/SmallerThanOneStripe", cellSize + 123);
+  }
+
+  @Test
+  public void TestFileEqualsWithOneStripe() throws IOException {
+    testOneFile("/EqualsWithOneStripe", cellSize * dataBlocks);
+  }
+
+  @Test
+  public void TestFileMoreThanOneStripe1() throws IOException {
+    testOneFile("/MoreThanOneStripe1", cellSize * dataBlocks + 123);
+  }
+
+  @Test
+  public void TestFileMoreThanOneStripe2() throws IOException {
+    testOneFile("/MoreThanOneStripe2",
+        cellSize * dataBlocks * (cellsInBlock >= 2 ? cellsInBlock / 2 : 1)
+            + cellSize * dataBlocks + 123);
+  }
+
+  @Test
+  public void TestFileFullBlockGroup() throws IOException {
+    testOneFile("/FullBlockGroup", blockSize * dataBlocks);
+  }
+
+  //TODO: The following tests will pass after HDFS-8121 fixed
+//  @Test
+  public void TestFileMoreThanABlockGroup1() throws IOException {
+    testOneFile("/MoreThanABlockGroup1", blockSize * dataBlocks + 123);
+  }
+
+  //  @Test
+  public void TestFileMoreThanABlockGroup2() throws IOException {
+    testOneFile("/MoreThanABlockGroup2",
+        blockSize * dataBlocks * 3
+            + (cellsInBlock >= 2 ? cellsInBlock / 2 : 1) * cellSize * dataBlocks
+            + 123);
+  }
+
+  private int stripeDataSize() {
+    return cellSize * dataBlocks;
+  }
+
+  private byte[] generateBytes(int cnt) {
+    byte[] bytes = new byte[cnt];
+    for (int i = 0; i < cnt; i++) {
+      bytes[i] = getByte(i);
+    }
+    return bytes;
+  }
+
+  private byte getByte(long pos) {
+    return (byte) (pos % mod + 1);
+  }
+
+  private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
+      throws IOException {
+    Path TestPath = new Path(src);
+    byte[] bytes = generateBytes(writeBytes);
+    DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
+
+    //check file length
+    FileStatus status = fs.getFileStatus(TestPath);
+    long fileLength = status.getLen();
+    if (fileLength != writeBytes) {
+      Assert.fail("File Length error: expect=" + writeBytes
+          + ", actual=" + fileLength);
+    }
+
+    DFSStripedInputStream dis = new DFSStripedInputStream(
+        fs.getClient(), src, true);
+    byte[] buf = new byte[writeBytes + 100];
+    int readLen = dis.read(0, buf, 0, buf.length);
+    readLen = readLen >= 0 ? readLen : 0;
+    if (readLen != writeBytes) {
+      Assert.fail("The length of file is not correct.");
+    }
+
+    for (int i = 0; i < writeBytes; i++) {
+      if (getByte(i) != buf[i]) {
+        Assert.fail("Byte at i = " + i + " is wrongly written.");
+      }
+    }
+  }
+
+  private void testOneFile(String src, int writeBytes)
+      throws IOException {
+    Path TestPath = new Path(src);
+
+    int allBlocks = dataBlocks + parityBlocks;
+    byte[] bytes = generateBytes(writeBytes);
+    DFSTestUtil.writeFile(fs, TestPath, new String(bytes));
+
+    //check file length
+    FileStatus status = fs.getFileStatus(TestPath);
+    long fileLength = status.getLen();
+    if (fileLength != writeBytes) {
+      Assert.fail("File Length error: expect=" + writeBytes
+          + ", actual=" + fileLength);
+    }
+
+    List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
+    LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);
+
+    for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
+      LocatedBlock[] blocks = StripedDataStreamer.unwrapBlockGroup(firstBlock);
+      List<LocatedBlock> oneGroup = Arrays.asList(blocks);
+      blockGroupList.add(oneGroup);
+    }
+
+    //test each block group
+    for (int group = 0; group < blockGroupList.size(); group++) {
+      //get the data of this block
+      List<LocatedBlock> blockList = blockGroupList.get(group);
+      byte[][] dataBlockBytes = new byte[dataBlocks][];
+      byte[][] parityBlockBytes = new byte[allBlocks - dataBlocks][];
+
+      //calculate the size of this block group
+      int lenOfBlockGroup = group < blockGroupList.size() - 1 ?
+          blockSize * dataBlocks :
+          writeBytes - blockSize * (blockGroupList.size() - 1) * dataBlocks;
+      int intactStripes = lenOfBlockGroup / stripeDataSize();
+      int lastStripeLen = lenOfBlockGroup % stripeDataSize();
+
+      //for each block, use BlockReader to read data
+      for (int i = 0; i < blockList.size(); i++) {
+        LocatedBlock lblock = blockList.get(i);
+        if (lblock == null) {
+          continue;
+        }
+        DatanodeInfo[] nodes = lblock.getLocations();
+        ExtendedBlock block = lblock.getBlock();
+        InetSocketAddress targetAddr = NetUtils.createSocketAddr(
+            nodes[0].getXferAddr());
+
+        int lenOfCell = cellSize;
+        if (i == lastStripeLen / cellSize) {
+          lenOfCell = lastStripeLen % cellSize;
+        } else if (i > lastStripeLen / cellSize) {
+          lenOfCell = 0;
+        }
+        int lenOfBlock = cellSize * intactStripes + lenOfCell;
+        byte[] blockBytes = new byte[lenOfBlock];
+        if (i < dataBlocks) {
+          dataBlockBytes[i] = blockBytes;
+        } else {
+          parityBlockBytes[i - dataBlocks] = blockBytes;
+        }
+
+        if (lenOfBlock == 0) {
+          continue;
+        }
+
+        block.setNumBytes(lenOfBlock);
+        BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)).
+            setFileName(src).
+            setBlock(block).
+            setBlockToken(lblock.getBlockToken()).
+            setInetSocketAddress(targetAddr).
+            setStartOffset(0).
+            setLength(block.getNumBytes()).
+            setVerifyChecksum(true).
+            setClientName("TestStripeLayoutWrite").
+            setDatanodeInfo(nodes[0]).
+            setCachingStrategy(CachingStrategy.newDefaultStrategy()).
+            setClientCacheContext(ClientContext.getFromConf(conf)).
+            setConfiguration(conf).
+            setRemotePeerFactory(new RemotePeerFactory() {
+              @Override
+              public Peer newConnectedPeer(InetSocketAddress addr,
+                                           Token<BlockTokenIdentifier> blockToken,
+                                           DatanodeID datanodeId)
+                  throws IOException {
+                Peer peer = null;
+                Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+                try {
+                  sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+                  sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+                  peer = TcpPeerServer.peerFromSocket(sock);
+                } finally {
+                  if (peer == null) {
+                    IOUtils.closeSocket(sock);
+                  }
+                }
+                return peer;
+              }
+            }).build();
+
+        blockReader.readAll(blockBytes, 0, lenOfBlock);
+        blockReader.close();
+      }
+
+      //check if we write the data correctly
+      for (int i = 0; i < dataBlockBytes.length; i++) {
+        byte[] cells = dataBlockBytes[i];
+        if (cells == null) {
+          continue;
+        }
+        for (int j = 0; j < cells.length; j++) {
+          byte expected;
+          //calculate the postion of this byte in the file
+          long pos = group * dataBlocks * blockSize
+              + (i * cellSize + j / cellSize * cellSize * dataBlocks)
+              + j % cellSize;
+          if (pos >= writeBytes) {
+            expected = 0;
+          } else {
+            expected = getByte(pos);
+          }
+
+          if (expected != cells[j]) {
+            Assert.fail("Unexpected byte " + cells[j] + ", expect " + expected
+                + ". Block group index is " + group +
+                ", stripe index is " + j / cellSize +
+                ", cell index is " + i + ", byte index is " + j % cellSize);
+          }
+        }
+      }
+    }
+  }
+
+}