You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/30 17:41:37 UTC

[37/58] [abbrv] hadoop git commit: Merge remote-tracking branch 'apache/trunk' into HDFS-7285

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 0000000,0000000..69105a0
new file mode 100644
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@@ -1,0 -1,0 +1,972 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hdfs;
++
++import com.google.common.base.Preconditions;
++import org.apache.hadoop.fs.ChecksumException;
++import org.apache.hadoop.fs.ReadOption;
++import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
++import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
++import org.apache.hadoop.hdfs.protocol.LocatedBlock;
++import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
++import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
++import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
++import org.apache.hadoop.hdfs.util.StripedBlockUtil;
++import org.apache.hadoop.io.ByteBufferPool;
++
++import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
++import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
++import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
++
++import org.apache.hadoop.io.IOUtils;
++import org.apache.hadoop.io.erasurecode.CodecUtil;
++import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
++
++import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
++import org.apache.hadoop.util.DirectBufferPool;
++
++import java.io.EOFException;
++import java.io.IOException;
++import java.io.InterruptedIOException;
++import java.nio.ByteBuffer;
++import java.util.ArrayList;
++import java.util.Arrays;
++import java.util.Collections;
++import java.util.EnumSet;
++import java.util.List;
++import java.util.Set;
++import java.util.Collection;
++import java.util.Map;
++import java.util.HashMap;
++import java.util.concurrent.CompletionService;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.ExecutorCompletionService;
++import java.util.concurrent.Callable;
++import java.util.concurrent.Future;
++
++/**
++ * DFSStripedInputStream reads from striped block groups
++ */
++public class DFSStripedInputStream extends DFSInputStream {
++
++  private static class ReaderRetryPolicy {
++    private int fetchEncryptionKeyTimes = 1;
++    private int fetchTokenTimes = 1;
++
++    void refetchEncryptionKey() {
++      fetchEncryptionKeyTimes--;
++    }
++
++    void refetchToken() {
++      fetchTokenTimes--;
++    }
++
++    boolean shouldRefetchEncryptionKey() {
++      return fetchEncryptionKeyTimes > 0;
++    }
++
++    boolean shouldRefetchToken() {
++      return fetchTokenTimes > 0;
++    }
++  }
++
++  /** Used to indicate the buffered data's range in the block group */
++  private static class StripeRange {
++    /** start offset in the block group (inclusive) */
++    final long offsetInBlock;
++    /** length of the stripe range */
++    final long length;
++
++    StripeRange(long offsetInBlock, long length) {
++      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
++      this.offsetInBlock = offsetInBlock;
++      this.length = length;
++    }
++
++    boolean include(long pos) {
++      return pos >= offsetInBlock && pos < offsetInBlock + length;
++    }
++  }
++
++  private static class BlockReaderInfo {
++    final BlockReader reader;
++    final DatanodeInfo datanode;
++    /**
++     * when initializing block readers, their starting offsets are set to the same
++     * number: the smallest internal block offsets among all the readers. This is
++     * because it is possible that for some internal blocks we have to read
++     * "backwards" for decoding purpose. We thus use this offset array to track
++     * offsets for all the block readers so that we can skip data if necessary.
++     */
++    long blockReaderOffset;
++    /**
++     * We use this field to indicate whether we should use this reader. In case
++     * we hit any issue with this reader, we set this field to true and avoid
++     * using it for the next stripe.
++     */
++    boolean shouldSkip = false;
++
++    BlockReaderInfo(BlockReader reader, DatanodeInfo dn, long offset) {
++      this.reader = reader;
++      this.datanode = dn;
++      this.blockReaderOffset = offset;
++    }
++
++    void setOffset(long offset) {
++      this.blockReaderOffset = offset;
++    }
++
++    void skip() {
++      this.shouldSkip = true;
++    }
++  }
++
++  private static final DirectBufferPool bufferPool = new DirectBufferPool();
++
++  private final BlockReaderInfo[] blockReaders;
++  private final int cellSize;
++  private final short dataBlkNum;
++  private final short parityBlkNum;
++  private final int groupSize;
++  /** the buffer for a complete stripe */
++  private ByteBuffer curStripeBuf;
++  private ByteBuffer parityBuf;
++  private final ErasureCodingPolicy ecPolicy;
++  private final RawErasureDecoder decoder;
++
++  /**
++   * indicate the start/end offset of the current buffered stripe in the
++   * block group
++   */
++  private StripeRange curStripeRange;
++  private final CompletionService<Void> readingService;
++
++  /**
++   * When warning the user of a lost block in striping mode, we remember the
++   * dead nodes we've logged. All other striping blocks on these nodes can be
++   * considered lost too, and we don't want to log a warning for each of them.
++   * This is to prevent the log from being too verbose. Refer to HDFS-8920.
++   *
++   * To minimize the overhead, we only store the datanodeUuid in this set
++   */
++  private final Set<String> warnedNodes = Collections.newSetFromMap(
++      new ConcurrentHashMap<String, Boolean>());
++
++  DFSStripedInputStream(DFSClient dfsClient, String src,
++      boolean verifyChecksum, ErasureCodingPolicy ecPolicy,
++      LocatedBlocks locatedBlocks) throws IOException {
++    super(dfsClient, src, verifyChecksum, locatedBlocks);
++
++    assert ecPolicy != null;
++    this.ecPolicy = ecPolicy;
++    this.cellSize = ecPolicy.getCellSize();
++    dataBlkNum = (short) ecPolicy.getNumDataUnits();
++    parityBlkNum = (short) ecPolicy.getNumParityUnits();
++    groupSize = dataBlkNum + parityBlkNum;
++    blockReaders = new BlockReaderInfo[groupSize];
++    curStripeRange = new StripeRange(0, 0);
++    readingService =
++        new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
++    decoder = CodecUtil.createRSRawDecoder(dfsClient.getConfiguration(),
++        dataBlkNum, parityBlkNum);
++    if (DFSClient.LOG.isDebugEnabled()) {
++      DFSClient.LOG.debug("Creating an striped input stream for file " + src);
++    }
++  }
++
++  private void resetCurStripeBuffer() {
++    if (curStripeBuf == null) {
++      curStripeBuf = bufferPool.getBuffer(cellSize * dataBlkNum);
++    }
++    curStripeBuf.clear();
++    curStripeRange = new StripeRange(0, 0);
++  }
++
++  private ByteBuffer getParityBuffer() {
++    if (parityBuf == null) {
++      parityBuf = bufferPool.getBuffer(cellSize * parityBlkNum);
++    }
++    parityBuf.clear();
++    return parityBuf;
++  }
++
++  /**
++   * When seeking into a new block group, create blockReader for each internal
++   * block in the group.
++   */
++  private synchronized void blockSeekTo(long target) throws IOException {
++    if (target >= getFileLength()) {
++      throw new IOException("Attempted to read past end of file");
++    }
++
++    // Will be getting a new BlockReader.
++    closeCurrentBlockReaders();
++
++    // Compute desired striped block group
++    LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target);
++    // Update current position
++    this.pos = target;
++    this.blockEnd = targetBlockGroup.getStartOffset() +
++        targetBlockGroup.getBlockSize() - 1;
++    currentLocatedBlock = targetBlockGroup;
++  }
++
++  @Override
++  public synchronized void close() throws IOException {
++    super.close();
++    if (curStripeBuf != null) {
++      bufferPool.returnBuffer(curStripeBuf);
++      curStripeBuf = null;
++    }
++    if (parityBuf != null) {
++      bufferPool.returnBuffer(parityBuf);
++      parityBuf = null;
++    }
++  }
++
++  /**
++   * Extend the super method with the logic of switching between cells.
++   * When reaching the end of a cell, proceed to the next cell and read it
++   * with the next blockReader.
++   */
++  @Override
++  protected void closeCurrentBlockReaders() {
++    resetCurStripeBuffer();
++    if (blockReaders ==  null || blockReaders.length == 0) {
++      return;
++    }
++    for (int i = 0; i < groupSize; i++) {
++      closeReader(blockReaders[i]);
++      blockReaders[i] = null;
++    }
++    blockEnd = -1;
++  }
++
++  private void closeReader(BlockReaderInfo readerInfo) {
++    if (readerInfo != null) {
++//      IOUtils.cleanup(null, readerInfo.reader);
++      readerInfo.skip();
++    }
++  }
++
++  private long getOffsetInBlockGroup() {
++    return getOffsetInBlockGroup(pos);
++  }
++
++  private long getOffsetInBlockGroup(long pos) {
++    return pos - currentLocatedBlock.getStartOffset();
++  }
++
++  /**
++   * Read a new stripe covering the current position, and store the data in the
++   * {@link #curStripeBuf}.
++   */
++  private void readOneStripe(
++      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
++      throws IOException {
++    resetCurStripeBuffer();
++
++    // compute stripe range based on pos
++    final long offsetInBlockGroup = getOffsetInBlockGroup();
++    final long stripeLen = cellSize * dataBlkNum;
++    final int stripeIndex = (int) (offsetInBlockGroup / stripeLen);
++    final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
++    final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
++        - (stripeIndex * stripeLen), stripeLen);
++    StripeRange stripeRange = new StripeRange(offsetInBlockGroup,
++        stripeLimit - stripeBufOffset);
++
++    LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
++    AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy, cellSize,
++        blockGroup, offsetInBlockGroup,
++        offsetInBlockGroup + stripeRange.length - 1, curStripeBuf);
++    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
++        blockGroup, cellSize, dataBlkNum, parityBlkNum);
++    // read the whole stripe
++    for (AlignedStripe stripe : stripes) {
++      // Parse group to get chosen DN location
++      StripeReader sreader = new StatefulStripeReader(readingService, stripe,
++          blks, blockReaders, corruptedBlockMap);
++      sreader.readStripe();
++    }
++    curStripeBuf.position(stripeBufOffset);
++    curStripeBuf.limit(stripeLimit);
++    curStripeRange = stripeRange;
++  }
++
++  private Callable<Void> readCells(final BlockReader reader,
++      final DatanodeInfo datanode, final long currentReaderOffset,
++      final long targetReaderOffset, final ByteBufferStrategy[] strategies,
++      final ExtendedBlock currentBlock,
++      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
++    return new Callable<Void>() {
++      @Override
++      public Void call() throws Exception {
++        // reader can be null if getBlockReaderWithRetry failed or
++        // the reader hit exception before
++        if (reader == null) {
++          throw new IOException("The BlockReader is null. " +
++              "The BlockReader creation failed or the reader hit exception.");
++        }
++        Preconditions.checkState(currentReaderOffset <= targetReaderOffset);
++        if (currentReaderOffset < targetReaderOffset) {
++          long skipped = reader.skip(targetReaderOffset - currentReaderOffset);
++          Preconditions.checkState(
++              skipped == targetReaderOffset - currentReaderOffset);
++        }
++        int result = 0;
++        for (ByteBufferStrategy strategy : strategies) {
++          result += readToBuffer(reader, datanode, strategy, currentBlock,
++              corruptedBlockMap);
++        }
++        return null;
++      }
++    };
++  }
++
++  private int readToBuffer(BlockReader blockReader,
++      DatanodeInfo currentNode, ByteBufferStrategy strategy,
++      ExtendedBlock currentBlock,
++      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
++      throws IOException {
++    final int targetLength = strategy.buf.remaining();
++    int length = 0;
++    try {
++      while (length < targetLength) {
++        int ret = strategy.doRead(blockReader, 0, 0);
++        if (ret < 0) {
++          throw new IOException("Unexpected EOS from the reader");
++        }
++        length += ret;
++      }
++      return length;
++    } catch (ChecksumException ce) {
++      DFSClient.LOG.warn("Found Checksum error for "
++          + currentBlock + " from " + currentNode
++          + " at " + ce.getPos());
++      // we want to remember which block replicas we have tried
++      addIntoCorruptedBlockMap(currentBlock, currentNode,
++          corruptedBlockMap);
++      throw ce;
++    } catch (IOException e) {
++      DFSClient.LOG.warn("Exception while reading from "
++          + currentBlock + " of " + src + " from "
++          + currentNode, e);
++      throw e;
++    }
++  }
++
++  /**
++   * Seek to a new arbitrary location
++   */
++  @Override
++  public synchronized void seek(long targetPos) throws IOException {
++    if (targetPos > getFileLength()) {
++      throw new EOFException("Cannot seek after EOF");
++    }
++    if (targetPos < 0) {
++      throw new EOFException("Cannot seek to negative offset");
++    }
++    if (closed.get()) {
++      throw new IOException("Stream is closed!");
++    }
++    if (targetPos <= blockEnd) {
++      final long targetOffsetInBlk = getOffsetInBlockGroup(targetPos);
++      if (curStripeRange.include(targetOffsetInBlk)) {
++        int bufOffset = getStripedBufOffset(targetOffsetInBlk);
++        curStripeBuf.position(bufOffset);
++        pos = targetPos;
++        return;
++      }
++    }
++    pos = targetPos;
++    blockEnd = -1;
++  }
++
++  private int getStripedBufOffset(long offsetInBlockGroup) {
++    final long stripeLen = cellSize * dataBlkNum;
++    // compute the position in the curStripeBuf based on "pos"
++    return (int) (offsetInBlockGroup % stripeLen);
++  }
++
++  @Override
++  public synchronized boolean seekToNewSource(long targetPos)
++      throws IOException {
++    return false;
++  }
++
++  @Override
++  protected synchronized int readWithStrategy(ReaderStrategy strategy,
++      int off, int len) throws IOException {
++    dfsClient.checkOpen();
++    if (closed.get()) {
++      throw new IOException("Stream closed");
++    }
++    Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap =
++        new ConcurrentHashMap<>();
++    if (pos < getFileLength()) {
++      try {
++        if (pos > blockEnd) {
++          blockSeekTo(pos);
++        }
++        int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
++        synchronized (infoLock) {
++          if (locatedBlocks.isLastBlockComplete()) {
++            realLen = (int) Math.min(realLen,
++                locatedBlocks.getFileLength() - pos);
++          }
++        }
++
++        /** Number of bytes already read into buffer */
++        int result = 0;
++        while (result < realLen) {
++          if (!curStripeRange.include(getOffsetInBlockGroup())) {
++            readOneStripe(corruptedBlockMap);
++          }
++          int ret = copyToTargetBuf(strategy, off + result, realLen - result);
++          result += ret;
++          pos += ret;
++        }
++        if (dfsClient.stats != null) {
++          dfsClient.stats.incrementBytesRead(result);
++        }
++        return result;
++      } finally {
++        // Check if need to report block replicas corruption either read
++        // was successful or ChecksumException occured.
++        reportCheckSumFailure(corruptedBlockMap,
++            currentLocatedBlock.getLocations().length);
++      }
++    }
++    return -1;
++  }
++
++  /**
++   * Copy the data from {@link #curStripeBuf} into the given buffer
++   * @param strategy the ReaderStrategy containing the given buffer
++   * @param offset the offset of the given buffer. Used only when strategy is
++   *               a ByteArrayStrategy
++   * @param length target length
++   * @return number of bytes copied
++   */
++  private int copyToTargetBuf(ReaderStrategy strategy, int offset, int length) {
++    final long offsetInBlk = getOffsetInBlockGroup();
++    int bufOffset = getStripedBufOffset(offsetInBlk);
++    curStripeBuf.position(bufOffset);
++    return strategy.copyFrom(curStripeBuf, offset,
++        Math.min(length, curStripeBuf.remaining()));
++  }
++
++  /**
++   * The super method {@link DFSInputStream#refreshLocatedBlock} refreshes
++   * cached LocatedBlock by executing {@link DFSInputStream#getBlockAt} again.
++   * This method extends the logic by first remembering the index of the
++   * internal block, and re-parsing the refreshed block group with the same
++   * index.
++   */
++  @Override
++  protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
++      throws IOException {
++    int idx = StripedBlockUtil.getBlockIndex(block.getBlock().getLocalBlock());
++    LocatedBlock lb = getBlockGroupAt(block.getStartOffset());
++    // If indexing information is returned, iterate through the index array
++    // to find the entry for position idx in the group
++    LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
++    int i = 0;
++    for (; i < lsb.getBlockIndices().length; i++) {
++      if (lsb.getBlockIndices()[i] == idx) {
++        break;
++      }
++    }
++    if (DFSClient.LOG.isDebugEnabled()) {
++      DFSClient.LOG.debug("refreshLocatedBlock for striped blocks, offset="
++          + block.getStartOffset() + ". Obtained block " + lb + ", idx=" + idx);
++    }
++    return StripedBlockUtil.constructInternalBlock(
++        lsb, i, cellSize, dataBlkNum, idx);
++  }
++
++  private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException {
++    LocatedBlock lb = super.getBlockAt(offset);
++    assert lb instanceof LocatedStripedBlock : "NameNode" +
++        " should return a LocatedStripedBlock for a striped file";
++    return (LocatedStripedBlock)lb;
++  }
++
++  /**
++   * Real implementation of pread.
++   */
++  @Override
++  protected void fetchBlockByteRange(LocatedBlock block, long start,
++      long end, byte[] buf, int offset,
++      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
++      throws IOException {
++    // Refresh the striped block group
++    LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset());
++
++    AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes(
++        ecPolicy, cellSize, blockGroup, start, end, buf, offset);
++    CompletionService<Void> readService = new ExecutorCompletionService<>(
++        dfsClient.getStripedReadsThreadPool());
++    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
++        blockGroup, cellSize, dataBlkNum, parityBlkNum);
++    final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize];
++    try {
++      for (AlignedStripe stripe : stripes) {
++        // Parse group to get chosen DN location
++        StripeReader preader = new PositionStripeReader(readService, stripe,
++            blks, preaderInfos, corruptedBlockMap);
++        preader.readStripe();
++      }
++    } finally {
++      for (BlockReaderInfo preaderInfo : preaderInfos) {
++        closeReader(preaderInfo);
++      }
++    }
++  }
++
++  @Override
++  protected void reportLostBlock(LocatedBlock lostBlock,
++      Collection<DatanodeInfo> ignoredNodes) {
++    DatanodeInfo[] nodes = lostBlock.getLocations();
++    if (nodes != null && nodes.length > 0) {
++      List<String> dnUUIDs = new ArrayList<>();
++      for (DatanodeInfo node : nodes) {
++        dnUUIDs.add(node.getDatanodeUuid());
++      }
++      if (!warnedNodes.containsAll(dnUUIDs)) {
++        DFSClient.LOG.warn(Arrays.toString(nodes) + " are unavailable and " +
++            "all striping blocks on them are lost. " +
++            "IgnoredNodes = " + ignoredNodes);
++        warnedNodes.addAll(dnUUIDs);
++      }
++    } else {
++      super.reportLostBlock(lostBlock, ignoredNodes);
++    }
++  }
++
++  /**
++   * The reader for reading a complete {@link AlignedStripe}. Note that an
++   * {@link AlignedStripe} may cross multiple stripes with cellSize width.
++   */
++  private abstract class StripeReader {
++    final Map<Future<Void>, Integer> futures = new HashMap<>();
++    final AlignedStripe alignedStripe;
++    final CompletionService<Void> service;
++    final LocatedBlock[] targetBlocks;
++    final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap;
++    final BlockReaderInfo[] readerInfos;
++
++    StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe,
++        LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
++        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
++      this.service = service;
++      this.alignedStripe = alignedStripe;
++      this.targetBlocks = targetBlocks;
++      this.readerInfos = readerInfos;
++      this.corruptedBlockMap = corruptedBlockMap;
++    }
++
++    /** prepare all the data chunks */
++    abstract void prepareDecodeInputs();
++
++    /** prepare the parity chunk and block reader if necessary */
++    abstract boolean prepareParityChunk(int index) throws IOException;
++
++    abstract void decode();
++
++    void updateState4SuccessRead(StripingChunkReadResult result) {
++      Preconditions.checkArgument(
++          result.state == StripingChunkReadResult.SUCCESSFUL);
++      readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock()
++          + alignedStripe.getSpanInBlock());
++    }
++
++    private void checkMissingBlocks() throws IOException {
++      if (alignedStripe.missingChunksNum > parityBlkNum) {
++        clearFutures(futures.keySet());
++        throw new IOException(alignedStripe.missingChunksNum
++            + " missing blocks, the stripe is: " + alignedStripe);
++      }
++    }
++
++    /**
++     * We need decoding. Thus go through all the data chunks and make sure we
++     * submit read requests for all of them.
++     */
++    private void readDataForDecoding() throws IOException {
++      prepareDecodeInputs();
++      for (int i = 0; i < dataBlkNum; i++) {
++        Preconditions.checkNotNull(alignedStripe.chunks[i]);
++        if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
++          if (!readChunk(targetBlocks[i], i)) {
++            alignedStripe.missingChunksNum++;
++          }
++        }
++      }
++      checkMissingBlocks();
++    }
++
++    void readParityChunks(int num) throws IOException {
++      for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
++           i++) {
++        if (alignedStripe.chunks[i] == null) {
++          if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) {
++            j++;
++          } else {
++            alignedStripe.missingChunksNum++;
++          }
++        }
++      }
++      checkMissingBlocks();
++    }
++
++    boolean createBlockReader(LocatedBlock block, int chunkIndex)
++        throws IOException {
++      BlockReader reader = null;
++      final ReaderRetryPolicy retry = new ReaderRetryPolicy();
++      DNAddrPair dnInfo = new DNAddrPair(null, null, null);
++
++      while(true) {
++        try {
++          // the cached block location might have been re-fetched, so always
++          // get it from cache.
++          block = refreshLocatedBlock(block);
++          targetBlocks[chunkIndex] = block;
++
++          // internal block has one location, just rule out the deadNodes
++          dnInfo = getBestNodeDNAddrPair(block, null);
++          if (dnInfo == null) {
++            break;
++          }
++          reader = getBlockReader(block, alignedStripe.getOffsetInBlock(),
++              block.getBlockSize() - alignedStripe.getOffsetInBlock(),
++              dnInfo.addr, dnInfo.storageType, dnInfo.info);
++        } catch (IOException e) {
++          if (e instanceof InvalidEncryptionKeyException &&
++              retry.shouldRefetchEncryptionKey()) {
++            DFSClient.LOG.info("Will fetch a new encryption key and retry, "
++                + "encryption key was invalid when connecting to " + dnInfo.addr
++                + " : " + e);
++            dfsClient.clearDataEncryptionKey();
++            retry.refetchEncryptionKey();
++          } else if (retry.shouldRefetchToken() &&
++              tokenRefetchNeeded(e, dnInfo.addr)) {
++            fetchBlockAt(block.getStartOffset());
++            retry.refetchToken();
++          } else {
++            //TODO: handles connection issues
++            DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
++                "block" + block.getBlock(), e);
++            // re-fetch the block in case the block has been moved
++            fetchBlockAt(block.getStartOffset());
++            addToDeadNodes(dnInfo.info);
++          }
++        }
++        if (reader != null) {
++          readerInfos[chunkIndex] = new BlockReaderInfo(reader, dnInfo.info,
++              alignedStripe.getOffsetInBlock());
++          return true;
++        }
++      }
++      return false;
++    }
++
++    private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
++      if (chunk.byteBuffer != null) {
++        ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer);
++        return new ByteBufferStrategy[]{strategy};
++      } else {
++        ByteBufferStrategy[] strategies =
++            new ByteBufferStrategy[chunk.byteArray.getOffsets().length];
++        for (int i = 0; i < strategies.length; i++) {
++          ByteBuffer buffer = ByteBuffer.wrap(chunk.byteArray.buf(),
++              chunk.byteArray.getOffsets()[i], chunk.byteArray.getLengths()[i]);
++          strategies[i] = new ByteBufferStrategy(buffer);
++        }
++        return strategies;
++      }
++    }
++
++    boolean readChunk(final LocatedBlock block, int chunkIndex)
++        throws IOException {
++      final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
++      if (block == null) {
++        chunk.state = StripingChunk.MISSING;
++        return false;
++      }
++      if (readerInfos[chunkIndex] == null) {
++        if (!createBlockReader(block, chunkIndex)) {
++          chunk.state = StripingChunk.MISSING;
++          return false;
++        }
++      } else if (readerInfos[chunkIndex].shouldSkip) {
++        chunk.state = StripingChunk.MISSING;
++        return false;
++      }
++
++      chunk.state = StripingChunk.PENDING;
++      Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
++          readerInfos[chunkIndex].datanode,
++          readerInfos[chunkIndex].blockReaderOffset,
++          alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
++          block.getBlock(), corruptedBlockMap);
++
++      Future<Void> request = service.submit(readCallable);
++      futures.put(request, chunkIndex);
++      return true;
++    }
++
++    /** read the whole stripe. do decoding if necessary */
++    void readStripe() throws IOException {
++      for (int i = 0; i < dataBlkNum; i++) {
++        if (alignedStripe.chunks[i] != null &&
++            alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
++          if (!readChunk(targetBlocks[i], i)) {
++            alignedStripe.missingChunksNum++;
++          }
++        }
++      }
++      // There are missing block locations at this stage. Thus we need to read
++      // the full stripe and one more parity block.
++      if (alignedStripe.missingChunksNum > 0) {
++        checkMissingBlocks();
++        readDataForDecoding();
++        // read parity chunks
++        readParityChunks(alignedStripe.missingChunksNum);
++      }
++      // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
++
++      // Input buffers for potential decode operation, which remains null until
++      // first read failure
++      while (!futures.isEmpty()) {
++        try {
++          StripingChunkReadResult r = StripedBlockUtil
++              .getNextCompletedStripedRead(service, futures, 0);
++          if (DFSClient.LOG.isDebugEnabled()) {
++            DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
++                + alignedStripe);
++          }
++          StripingChunk returnedChunk = alignedStripe.chunks[r.index];
++          Preconditions.checkNotNull(returnedChunk);
++          Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
++
++          if (r.state == StripingChunkReadResult.SUCCESSFUL) {
++            returnedChunk.state = StripingChunk.FETCHED;
++            alignedStripe.fetchedChunksNum++;
++            updateState4SuccessRead(r);
++            if (alignedStripe.fetchedChunksNum == dataBlkNum) {
++              clearFutures(futures.keySet());
++              break;
++            }
++          } else {
++            returnedChunk.state = StripingChunk.MISSING;
++            // close the corresponding reader
++            closeReader(readerInfos[r.index]);
++
++            final int missing = alignedStripe.missingChunksNum;
++            alignedStripe.missingChunksNum++;
++            checkMissingBlocks();
++
++            readDataForDecoding();
++            readParityChunks(alignedStripe.missingChunksNum - missing);
++          }
++        } catch (InterruptedException ie) {
++          String err = "Read request interrupted";
++          DFSClient.LOG.error(err);
++          clearFutures(futures.keySet());
++          // Don't decode if read interrupted
++          throw new InterruptedIOException(err);
++        }
++      }
++
++      if (alignedStripe.missingChunksNum > 0) {
++        decode();
++      }
++    }
++  }
++
++  class PositionStripeReader extends StripeReader {
++    private byte[][] decodeInputs = null;
++
++    PositionStripeReader(CompletionService<Void> service,
++        AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
++        BlockReaderInfo[] readerInfos,
++        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
++      super(service, alignedStripe, targetBlocks, readerInfos,
++          corruptedBlockMap);
++    }
++
++    @Override
++    void prepareDecodeInputs() {
++      if (decodeInputs == null) {
++        decodeInputs = StripedBlockUtil.initDecodeInputs(alignedStripe,
++            dataBlkNum, parityBlkNum);
++      }
++    }
++
++    @Override
++    boolean prepareParityChunk(int index) {
++      Preconditions.checkState(index >= dataBlkNum &&
++          alignedStripe.chunks[index] == null);
++      final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index,
++          dataBlkNum, parityBlkNum);
++      alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]);
++      alignedStripe.chunks[index].addByteArraySlice(0,
++          (int) alignedStripe.getSpanInBlock());
++      return true;
++    }
++
++    @Override
++    void decode() {
++      StripedBlockUtil.finalizeDecodeInputs(decodeInputs, dataBlkNum,
++          parityBlkNum, alignedStripe);
++      StripedBlockUtil.decodeAndFillBuffer(decodeInputs, alignedStripe,
++          dataBlkNum, parityBlkNum, decoder);
++    }
++  }
++
++  class StatefulStripeReader extends StripeReader {
++    ByteBuffer[] decodeInputs;
++
++    StatefulStripeReader(CompletionService<Void> service,
++        AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
++        BlockReaderInfo[] readerInfos,
++        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
++      super(service, alignedStripe, targetBlocks, readerInfos,
++          corruptedBlockMap);
++    }
++
++    @Override
++    void prepareDecodeInputs() {
++      if (decodeInputs == null) {
++        decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
++        final ByteBuffer cur;
++        synchronized (DFSStripedInputStream.this) {
++          cur = curStripeBuf.duplicate();
++        }
++        StripedBlockUtil.VerticalRange range = alignedStripe.range;
++        for (int i = 0; i < dataBlkNum; i++) {
++          cur.limit(cur.capacity());
++          int pos = (int) (range.offsetInBlock % cellSize + cellSize * i);
++          cur.position(pos);
++          cur.limit((int) (pos + range.spanInBlock));
++          final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i,
++              dataBlkNum, parityBlkNum);
++          decodeInputs[decodeIndex] = cur.slice();
++          if (alignedStripe.chunks[i] == null) {
++            alignedStripe.chunks[i] = new StripingChunk(
++                decodeInputs[decodeIndex]);
++          }
++        }
++      }
++    }
++
++    @Override
++    boolean prepareParityChunk(int index) throws IOException {
++      Preconditions.checkState(index >= dataBlkNum
++          && alignedStripe.chunks[index] == null);
++      if (blockReaders[index] != null && blockReaders[index].shouldSkip) {
++        alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
++        // we have failed the block reader before
++        return false;
++      }
++      final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index,
++          dataBlkNum, parityBlkNum);
++      ByteBuffer buf = getParityBuffer().duplicate();
++      buf.position(cellSize * decodeIndex);
++      buf.limit(cellSize * decodeIndex + (int) alignedStripe.range.spanInBlock);
++      decodeInputs[decodeIndex] = buf.slice();
++      alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]);
++      return true;
++    }
++
++    @Override
++    void decode() {
++      // TODO no copy for data chunks. this depends on HADOOP-12047
++      final int span = (int) alignedStripe.getSpanInBlock();
++      for (int i = 0; i < alignedStripe.chunks.length; i++) {
++        final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i,
++            dataBlkNum, parityBlkNum);
++        if (alignedStripe.chunks[i] != null &&
++            alignedStripe.chunks[i].state == StripingChunk.ALLZERO) {
++          for (int j = 0; j < span; j++) {
++            decodeInputs[decodeIndex].put((byte) 0);
++          }
++          decodeInputs[decodeIndex].flip();
++        } else if (alignedStripe.chunks[i] != null &&
++            alignedStripe.chunks[i].state == StripingChunk.FETCHED) {
++          decodeInputs[decodeIndex].position(0);
++          decodeInputs[decodeIndex].limit(span);
++        }
++      }
++      int[] decodeIndices = new int[parityBlkNum];
++      int pos = 0;
++      for (int i = 0; i < alignedStripe.chunks.length; i++) {
++        if (alignedStripe.chunks[i] != null &&
++            alignedStripe.chunks[i].state == StripingChunk.MISSING) {
++          int  decodeIndex = StripedBlockUtil.convertIndex4Decode(i,
++              dataBlkNum, parityBlkNum);
++          if (i < dataBlkNum) {
++            decodeIndices[pos++] = decodeIndex;
++          } else {
++            decodeInputs[decodeIndex] = null;
++          }
++        }
++      }
++      decodeIndices = Arrays.copyOf(decodeIndices, pos);
++
++      final int decodeChunkNum = decodeIndices.length;
++      ByteBuffer[] outputs = new ByteBuffer[decodeChunkNum];
++      for (int i = 0; i < decodeChunkNum; i++) {
++        outputs[i] = decodeInputs[decodeIndices[i]];
++        outputs[i].position(0);
++        outputs[i].limit((int) alignedStripe.range.spanInBlock);
++        decodeInputs[decodeIndices[i]] = null;
++      }
++
++      decoder.decode(decodeInputs, decodeIndices, outputs);
++    }
++  }
++
++  /**
++   * May need online read recovery, zero-copy read doesn't make
++   * sense, so don't support it.
++   */
++  @Override
++  public synchronized ByteBuffer read(ByteBufferPool bufferPool,
++      int maxLength, EnumSet<ReadOption> opts)
++          throws IOException, UnsupportedOperationException {
++    throw new UnsupportedOperationException(
++        "Not support enhanced byte buffer access.");
++  }
++
++  @Override
++  public synchronized void releaseBuffer(ByteBuffer buffer) {
++    throw new UnsupportedOperationException(
++        "Not support enhanced byte buffer access.");
++  }
++
++  /** A variation to {@link DFSInputStream#cancelAll} */
++  private void clearFutures(Collection<Future<Void>> futures) {
++    for (Future<Void> future : futures) {
++      future.cancel(false);
++    }
++    futures.clear();
++  }
++}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index 0000000,0000000..bf4e10e
new file mode 100644
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@@ -1,0 -1,0 +1,953 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hdfs;
++
++import java.io.IOException;
++import java.io.InterruptedIOException;
++import java.nio.ByteBuffer;
++import java.nio.channels.ClosedChannelException;
++import java.util.ArrayList;
++import java.util.Arrays;
++import java.util.Collections;
++import java.util.EnumSet;
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.List;
++import java.util.Map;
++import java.util.Set;
++import java.util.concurrent.BlockingQueue;
++import java.util.concurrent.LinkedBlockingQueue;
++import java.util.concurrent.TimeUnit;
++
++import org.apache.hadoop.HadoopIllegalArgumentException;
++import org.apache.hadoop.classification.InterfaceAudience;
++import org.apache.hadoop.fs.CreateFlag;
++import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
++import org.apache.hadoop.hdfs.protocol.ClientProtocol;
++import org.apache.hadoop.hdfs.protocol.DatanodeID;
++import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
++import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
++import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
++import org.apache.hadoop.hdfs.protocol.LocatedBlock;
++import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
++import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
++import org.apache.hadoop.hdfs.util.StripedBlockUtil;
++import org.apache.hadoop.io.MultipleIOException;
++import org.apache.hadoop.io.erasurecode.CodecUtil;
++import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
++import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
++import org.apache.hadoop.util.DataChecksum;
++import org.apache.hadoop.util.Progressable;
++import org.apache.hadoop.util.Time;
++
++import com.google.common.base.Preconditions;
++import org.apache.htrace.core.TraceScope;
++
++
++/**
++ * This class supports writing files in striped layout and erasure coded format.
++ * Each stripe contains a sequence of cells.
++ */
++@InterfaceAudience.Private
++public class DFSStripedOutputStream extends DFSOutputStream {
++  static class MultipleBlockingQueue<T> {
++    private final List<BlockingQueue<T>> queues;
++
++    MultipleBlockingQueue(int numQueue, int queueSize) {
++      List<BlockingQueue<T>> list = new ArrayList<>(numQueue);
++      for (int i = 0; i < numQueue; i++) {
++        list.add(new LinkedBlockingQueue<T>(queueSize));
++      }
++      queues = Collections.synchronizedList(list);
++    }
++
++    void offer(int i, T object) {
++      final boolean b = queues.get(i).offer(object);
++      Preconditions.checkState(b, "Failed to offer " + object
++          + " to queue, i=" + i);
++    }
++
++    T take(int i) throws InterruptedIOException {
++      try {
++        return queues.get(i).take();
++      } catch(InterruptedException ie) {
++        throw DFSUtilClient.toInterruptedIOException("take interrupted, i=" + i, ie);
++      }
++    }
++
++    T takeWithTimeout(int i) throws InterruptedIOException {
++      try {
++        return queues.get(i).poll(100, TimeUnit.MILLISECONDS);
++      } catch (InterruptedException e) {
++        throw DFSUtilClient.toInterruptedIOException("take interrupted, i=" + i, e);
++      }
++    }
++
++    T poll(int i) {
++      return queues.get(i).poll();
++    }
++
++    T peek(int i) {
++      return queues.get(i).peek();
++    }
++
++    void clear() {
++      for (BlockingQueue<T> q : queues) {
++        q.clear();
++      }
++    }
++  }
++
++  /** Coordinate the communication between the streamers. */
++  static class Coordinator {
++    /**
++     * The next internal block to write to for each streamers. The
++     * DFSStripedOutputStream makes the {@link ClientProtocol#addBlock} RPC to
++     * get a new block group. The block group is split to internal blocks, which
++     * are then distributed into the queue for streamers to retrieve.
++     */
++    private final MultipleBlockingQueue<LocatedBlock> followingBlocks;
++    /**
++     * Used to sync among all the streamers before allocating a new block. The
++     * DFSStripedOutputStream uses this to make sure every streamer has finished
++     * writing the previous block.
++     */
++    private final MultipleBlockingQueue<ExtendedBlock> endBlocks;
++
++    /**
++     * The following data structures are used for syncing while handling errors
++     */
++    private final MultipleBlockingQueue<LocatedBlock> newBlocks;
++    private final Map<StripedDataStreamer, Boolean> updateStreamerMap;
++    private final MultipleBlockingQueue<Boolean> streamerUpdateResult;
++
++    Coordinator(final int numAllBlocks) {
++      followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
++      endBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
++      newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
++      updateStreamerMap = Collections.synchronizedMap(
++          new HashMap<StripedDataStreamer, Boolean>(numAllBlocks));
++      streamerUpdateResult = new MultipleBlockingQueue<>(numAllBlocks, 1);
++    }
++
++    MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() {
++      return followingBlocks;
++    }
++
++    MultipleBlockingQueue<LocatedBlock> getNewBlocks() {
++      return newBlocks;
++    }
++
++    void offerEndBlock(int i, ExtendedBlock block) {
++      endBlocks.offer(i, block);
++    }
++
++    void offerStreamerUpdateResult(int i, boolean success) {
++      streamerUpdateResult.offer(i, success);
++    }
++
++    boolean takeStreamerUpdateResult(int i) throws InterruptedIOException {
++      return streamerUpdateResult.take(i);
++    }
++
++    void updateStreamer(StripedDataStreamer streamer,
++        boolean success) {
++      assert !updateStreamerMap.containsKey(streamer);
++      updateStreamerMap.put(streamer, success);
++    }
++
++    void clearFailureStates() {
++      newBlocks.clear();
++      updateStreamerMap.clear();
++      streamerUpdateResult.clear();
++    }
++  }
++
++  /** Buffers for writing the data and parity cells of a stripe. */
++  class CellBuffers {
++    private final ByteBuffer[] buffers;
++    private final byte[][] checksumArrays;
++
++    CellBuffers(int numParityBlocks) throws InterruptedException{
++      if (cellSize % bytesPerChecksum != 0) {
++        throw new HadoopIllegalArgumentException("Invalid values: "
++            + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (="
++            + bytesPerChecksum + ") must divide cell size (=" + cellSize + ").");
++      }
++
++      checksumArrays = new byte[numParityBlocks][];
++      final int size = getChecksumSize() * (cellSize / bytesPerChecksum);
++      for (int i = 0; i < checksumArrays.length; i++) {
++        checksumArrays[i] = new byte[size];
++      }
++
++      buffers = new ByteBuffer[numAllBlocks];
++      for (int i = 0; i < buffers.length; i++) {
++        buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
++      }
++    }
++
++    private ByteBuffer[] getBuffers() {
++      return buffers;
++    }
++
++    byte[] getChecksumArray(int i) {
++      return checksumArrays[i - numDataBlocks];
++    }
++
++    private int addTo(int i, byte[] b, int off, int len) {
++      final ByteBuffer buf = buffers[i];
++      final int pos = buf.position() + len;
++      Preconditions.checkState(pos <= cellSize);
++      buf.put(b, off, len);
++      return pos;
++    }
++
++    private void clear() {
++      for (int i = 0; i< numAllBlocks; i++) {
++        buffers[i].clear();
++        if (i >= numDataBlocks) {
++          Arrays.fill(buffers[i].array(), (byte) 0);
++        }
++      }
++    }
++
++    private void release() {
++      for (int i = 0; i < numAllBlocks; i++) {
++        byteArrayManager.release(buffers[i].array());
++      }
++    }
++
++    private void flipDataBuffers() {
++      for (int i = 0; i < numDataBlocks; i++) {
++        buffers[i].flip();
++      }
++    }
++  }
++
++  private final Coordinator coordinator;
++  private final CellBuffers cellBuffers;
++  private final RawErasureEncoder encoder;
++  private final List<StripedDataStreamer> streamers;
++  private final DFSPacket[] currentPackets; // current Packet of each streamer
++
++  /** Size of each striping cell, must be a multiple of bytesPerChecksum */
++  private final int cellSize;
++  private final int numAllBlocks;
++  private final int numDataBlocks;
++  private ExtendedBlock currentBlockGroup;
++  private final String[] favoredNodes;
++  private final List<StripedDataStreamer> failedStreamers;
++
++  /** Construct a new output stream for creating a file. */
++  DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
++                         EnumSet<CreateFlag> flag, Progressable progress,
++                         DataChecksum checksum, String[] favoredNodes)
++                         throws IOException {
++    super(dfsClient, src, stat, flag, progress, checksum, favoredNodes, false);
++    if (LOG.isDebugEnabled()) {
++      LOG.debug("Creating DFSStripedOutputStream for " + src);
++    }
++
++    final ErasureCodingPolicy ecPolicy = stat.getErasureCodingPolicy();
++    final int numParityBlocks = ecPolicy.getNumParityUnits();
++    cellSize = ecPolicy.getCellSize();
++    numDataBlocks = ecPolicy.getNumDataUnits();
++    numAllBlocks = numDataBlocks + numParityBlocks;
++    this.favoredNodes = favoredNodes;
++    failedStreamers = new ArrayList<>();
++
++    encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(),
++        numDataBlocks, numParityBlocks);
++
++    coordinator = new Coordinator(numAllBlocks);
++    try {
++      cellBuffers = new CellBuffers(numParityBlocks);
++    } catch (InterruptedException ie) {
++      throw DFSUtilClient.toInterruptedIOException(
++          "Failed to create cell buffers", ie);
++    }
++
++    streamers = new ArrayList<>(numAllBlocks);
++    for (short i = 0; i < numAllBlocks; i++) {
++      StripedDataStreamer streamer = new StripedDataStreamer(stat,
++          dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
++          favoredNodes, i, coordinator);
++      streamers.add(streamer);
++    }
++    currentPackets = new DFSPacket[streamers.size()];
++    setCurrentStreamer(0);
++  }
++
++  StripedDataStreamer getStripedDataStreamer(int i) {
++    return streamers.get(i);
++  }
++
++  int getCurrentIndex() {
++    return getCurrentStreamer().getIndex();
++  }
++
++  private synchronized StripedDataStreamer getCurrentStreamer() {
++    return (StripedDataStreamer) streamer;
++  }
++
++  private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) {
++    // backup currentPacket for current streamer
++    if (streamer != null) {
++      int oldIdx = streamers.indexOf(getCurrentStreamer());
++      if (oldIdx >= 0) {
++        currentPackets[oldIdx] = currentPacket;
++      }
++    }
++
++    streamer = getStripedDataStreamer(newIdx);
++    currentPacket = currentPackets[newIdx];
++    adjustChunkBoundary();
++
++    return getCurrentStreamer();
++  }
++
++  /**
++   * Encode the buffers, i.e. compute parities.
++   *
++   * @param buffers data buffers + parity buffers
++   */
++  private static void encode(RawErasureEncoder encoder, int numData,
++      ByteBuffer[] buffers) {
++    final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
++    final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
++    System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);
++    System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length);
++
++    encoder.encode(dataBuffers, parityBuffers);
++  }
++
++  /**
++   * check all the existing StripedDataStreamer and find newly failed streamers.
++   * @return The newly failed streamers.
++   * @throws IOException if less than {@link #numDataBlocks} streamers are still
++   *                     healthy.
++   */
++  private Set<StripedDataStreamer> checkStreamers() throws IOException {
++    Set<StripedDataStreamer> newFailed = new HashSet<>();
++    for(StripedDataStreamer s : streamers) {
++      if (!s.isHealthy() && !failedStreamers.contains(s)) {
++        newFailed.add(s);
++      }
++    }
++
++    final int failCount = failedStreamers.size() + newFailed.size();
++    if (LOG.isDebugEnabled()) {
++      LOG.debug("checkStreamers: " + streamers);
++      LOG.debug("healthy streamer count=" + (numAllBlocks - failCount));
++      LOG.debug("original failed streamers: " + failedStreamers);
++      LOG.debug("newly failed streamers: " + newFailed);
++    }
++    if (failCount > (numAllBlocks - numDataBlocks)) {
++      throw new IOException("Failed: the number of failed blocks = "
++          + failCount + " > the number of data blocks = "
++          + (numAllBlocks - numDataBlocks));
++    }
++    return newFailed;
++  }
++
++  private void handleStreamerFailure(String err, Exception e)
++      throws IOException {
++    LOG.warn("Failed: " + err + ", " + this, e);
++    getCurrentStreamer().getErrorState().setInternalError();
++    getCurrentStreamer().close(true);
++    checkStreamers();
++    currentPacket = null;
++  }
++
++  private void replaceFailedStreamers() {
++    assert streamers.size() == numAllBlocks;
++    for (short i = 0; i < numAllBlocks; i++) {
++      final StripedDataStreamer oldStreamer = getStripedDataStreamer(i);
++      if (!oldStreamer.isHealthy()) {
++        StripedDataStreamer streamer = new StripedDataStreamer(oldStreamer.stat,
++            dfsClient, src, oldStreamer.progress,
++            oldStreamer.checksum4WriteBlock, cachingStrategy, byteArrayManager,
++            favoredNodes, i, coordinator);
++        streamers.set(i, streamer);
++        currentPackets[i] = null;
++        if (i == 0) {
++          this.streamer = streamer;
++        }
++        streamer.start();
++      }
++    }
++  }
++
++  private void waitEndBlocks(int i) throws IOException {
++    while (getStripedDataStreamer(i).isHealthy()) {
++      final ExtendedBlock b = coordinator.endBlocks.takeWithTimeout(i);
++      if (b != null) {
++        StripedBlockUtil.checkBlocks(currentBlockGroup, i, b);
++        return;
++      }
++    }
++  }
++
++  private void allocateNewBlock() throws IOException {
++    if (currentBlockGroup != null) {
++      for (int i = 0; i < numAllBlocks; i++) {
++        // sync all the healthy streamers before writing to the new block
++        waitEndBlocks(i);
++      }
++    }
++    failedStreamers.clear();
++    // replace failed streamers
++    replaceFailedStreamers();
++
++    if (LOG.isDebugEnabled()) {
++      LOG.debug("Allocating new block group. The previous block group: "
++          + currentBlockGroup);
++    }
++
++    // TODO collect excludedNodes from all the data streamers
++    final LocatedBlock lb = addBlock(null, dfsClient, src, currentBlockGroup,
++        fileId, favoredNodes);
++    assert lb.isStriped();
++    if (lb.getLocations().length < numDataBlocks) {
++      throw new IOException("Failed to get " + numDataBlocks
++          + " nodes from namenode: blockGroupSize= " + numAllBlocks
++          + ", blocks.length= " + lb.getLocations().length);
++    }
++    // assign the new block to the current block group
++    currentBlockGroup = lb.getBlock();
++
++    final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
++        (LocatedStripedBlock) lb, cellSize, numDataBlocks,
++        numAllBlocks - numDataBlocks);
++    for (int i = 0; i < blocks.length; i++) {
++      StripedDataStreamer si = getStripedDataStreamer(i);
++      if (si.isHealthy()) { // skipping failed data streamer
++        if (blocks[i] == null) {
++          // Set exception and close streamer as there is no block locations
++          // found for the parity block.
++          LOG.warn("Failed to get block location for parity block, index=" + i);
++          si.getLastException().set(
++              new IOException("Failed to get following block, i=" + i));
++          si.getErrorState().setInternalError();
++          si.close(true);
++        } else {
++          coordinator.getFollowingBlocks().offer(i, blocks[i]);
++        }
++      }
++    }
++  }
++
++  private boolean shouldEndBlockGroup() {
++    return currentBlockGroup != null &&
++        currentBlockGroup.getNumBytes() == blockSize * numDataBlocks;
++  }
++
++  @Override
++  protected synchronized void writeChunk(byte[] bytes, int offset, int len,
++      byte[] checksum, int ckoff, int cklen) throws IOException {
++    final int index = getCurrentIndex();
++    final StripedDataStreamer current = getCurrentStreamer();
++    final int pos = cellBuffers.addTo(index, bytes, offset, len);
++    final boolean cellFull = pos == cellSize;
++
++    if (currentBlockGroup == null || shouldEndBlockGroup()) {
++      // the incoming data should belong to a new block. Allocate a new block.
++      allocateNewBlock();
++    }
++
++    currentBlockGroup.setNumBytes(currentBlockGroup.getNumBytes() + len);
++    if (current.isHealthy()) {
++      try {
++        super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
++      } catch(Exception e) {
++        handleStreamerFailure("offset=" + offset + ", length=" + len, e);
++      }
++    }
++
++    // Two extra steps are needed when a striping cell is full:
++    // 1. Forward the current index pointer
++    // 2. Generate parity packets if a full stripe of data cells are present
++    if (cellFull) {
++      int next = index + 1;
++      //When all data cells in a stripe are ready, we need to encode
++      //them and generate some parity cells. These cells will be
++      //converted to packets and put to their DataStreamer's queue.
++      if (next == numDataBlocks) {
++        cellBuffers.flipDataBuffers();
++        writeParityCells();
++        next = 0;
++        // check failure state for all the streamers. Bump GS if necessary
++        checkStreamerFailures();
++
++        // if this is the end of the block group, end each internal block
++        if (shouldEndBlockGroup()) {
++          for (int i = 0; i < numAllBlocks; i++) {
++            final StripedDataStreamer s = setCurrentStreamer(i);
++            if (s.isHealthy()) {
++              try {
++                endBlock();
++              } catch (IOException ignored) {}
++            }
++          }
++        }
++      }
++      setCurrentStreamer(next);
++    }
++  }
++
++  @Override
++  void enqueueCurrentPacketFull() throws IOException {
++    LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
++            + " appendChunk={}, {}", currentPacket, src, getStreamer()
++            .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
++        getStreamer());
++    enqueueCurrentPacket();
++    adjustChunkBoundary();
++    // no need to end block here
++  }
++
++  private Set<StripedDataStreamer> markExternalErrorOnStreamers() {
++    Set<StripedDataStreamer> healthySet = new HashSet<>();
++    for (StripedDataStreamer streamer : streamers) {
++      if (streamer.isHealthy() &&
++          streamer.getStage() == BlockConstructionStage.DATA_STREAMING) {
++        streamer.setExternalError();
++        healthySet.add(streamer);
++      }
++    }
++    return healthySet;
++  }
++
++  /**
++   * Check and handle data streamer failures. This is called only when we have
++   * written a full stripe (i.e., enqueue all packets for a full stripe), or
++   * when we're closing the outputstream.
++   */
++  private void checkStreamerFailures() throws IOException {
++    Set<StripedDataStreamer> newFailed = checkStreamers();
++    if (newFailed.size() > 0) {
++      // for healthy streamers, wait till all of them have fetched the new block
++      // and flushed out all the enqueued packets.
++      flushAllInternals();
++    }
++    // get all the current failed streamers after the flush
++    newFailed = checkStreamers();
++    while (newFailed.size() > 0) {
++      failedStreamers.addAll(newFailed);
++      coordinator.clearFailureStates();
++
++      // mark all the healthy streamers as external error
++      Set<StripedDataStreamer> healthySet = markExternalErrorOnStreamers();
++
++      // we have newly failed streamers, update block for pipeline
++      final ExtendedBlock newBG = updateBlockForPipeline(healthySet);
++
++      // wait till all the healthy streamers to
++      // 1) get the updated block info
++      // 2) create new block outputstream
++      newFailed = waitCreatingNewStreams(healthySet);
++      if (newFailed.size() + failedStreamers.size() >
++          numAllBlocks - numDataBlocks) {
++        throw new IOException(
++            "Data streamers failed while creating new block streams: "
++                + newFailed + ". There are not enough healthy streamers.");
++      }
++      for (StripedDataStreamer failedStreamer : newFailed) {
++        assert !failedStreamer.isHealthy();
++      }
++
++      // TODO we can also succeed if all the failed streamers have not taken
++      // the updated block
++      if (newFailed.size() == 0) {
++        // reset external error state of all the streamers
++        for (StripedDataStreamer streamer : healthySet) {
++          assert streamer.isHealthy();
++          streamer.getErrorState().reset();
++        }
++        updatePipeline(newBG);
++      }
++      for (int i = 0; i < numAllBlocks; i++) {
++        coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0);
++      }
++    }
++  }
++
++  private int checkStreamerUpdates(Set<StripedDataStreamer> failed,
++      Set<StripedDataStreamer> streamers) {
++    for (StripedDataStreamer streamer : streamers) {
++      if (!coordinator.updateStreamerMap.containsKey(streamer)) {
++        if (!streamer.isHealthy() &&
++            coordinator.getNewBlocks().peek(streamer.getIndex()) != null) {
++          // this streamer had internal error before getting updated block
++          failed.add(streamer);
++        }
++      }
++    }
++    return coordinator.updateStreamerMap.size() + failed.size();
++  }
++
++  private Set<StripedDataStreamer> waitCreatingNewStreams(
++      Set<StripedDataStreamer> healthyStreamers) throws IOException {
++    Set<StripedDataStreamer> failed = new HashSet<>();
++    final int expectedNum = healthyStreamers.size();
++    final long socketTimeout = dfsClient.getConf().getSocketTimeout();
++    // the total wait time should be less than the socket timeout, otherwise
++    // a slow streamer may cause other streamers to timeout. here we wait for
++    // half of the socket timeout
++    long remaingTime = socketTimeout > 0 ? socketTimeout/2 : Long.MAX_VALUE;
++    final long waitInterval = 1000;
++    synchronized (coordinator) {
++      while (checkStreamerUpdates(failed, healthyStreamers) < expectedNum
++          && remaingTime > 0) {
++        try {
++          long start = Time.monotonicNow();
++          coordinator.wait(waitInterval);
++          remaingTime -= Time.monotonicNow() - start;
++        } catch (InterruptedException e) {
++          throw DFSUtilClient.toInterruptedIOException("Interrupted when waiting" +
++              " for results of updating striped streamers", e);
++        }
++      }
++    }
++    synchronized (coordinator) {
++      for (StripedDataStreamer streamer : healthyStreamers) {
++        if (!coordinator.updateStreamerMap.containsKey(streamer)) {
++          // close the streamer if it is too slow to create new connection
++          streamer.setStreamerAsClosed();
++          failed.add(streamer);
++        }
++      }
++    }
++    for (Map.Entry<StripedDataStreamer, Boolean> entry :
++        coordinator.updateStreamerMap.entrySet()) {
++      if (!entry.getValue()) {
++        failed.add(entry.getKey());
++      }
++    }
++    for (StripedDataStreamer failedStreamer : failed) {
++      healthyStreamers.remove(failedStreamer);
++    }
++    return failed;
++  }
++
++  /**
++   * Call {@link ClientProtocol#updateBlockForPipeline} and assign updated block
++   * to healthy streamers.
++   * @param healthyStreamers The healthy data streamers. These streamers join
++   *                         the failure handling.
++   */
++  private ExtendedBlock updateBlockForPipeline(
++      Set<StripedDataStreamer> healthyStreamers) throws IOException {
++    final LocatedBlock updated = dfsClient.namenode.updateBlockForPipeline(
++        currentBlockGroup, dfsClient.clientName);
++    final long newGS = updated.getBlock().getGenerationStamp();
++    ExtendedBlock newBlock = new ExtendedBlock(currentBlockGroup);
++    newBlock.setGenerationStamp(newGS);
++    final LocatedBlock[] updatedBlks = StripedBlockUtil.parseStripedBlockGroup(
++        (LocatedStripedBlock) updated, cellSize, numDataBlocks,
++        numAllBlocks - numDataBlocks);
++
++    for (int i = 0; i < numAllBlocks; i++) {
++      StripedDataStreamer si = getStripedDataStreamer(i);
++      if (healthyStreamers.contains(si)) {
++        final LocatedBlock lb = new LocatedBlock(new ExtendedBlock(newBlock),
++            null, null, null, -1, updated.isCorrupt(), null);
++        lb.setBlockToken(updatedBlks[i].getBlockToken());
++        coordinator.getNewBlocks().offer(i, lb);
++      }
++    }
++    return newBlock;
++  }
++
++  private void updatePipeline(ExtendedBlock newBG) throws IOException {
++    final DatanodeInfo[] newNodes = new DatanodeInfo[numAllBlocks];
++    final String[] newStorageIDs = new String[numAllBlocks];
++    for (int i = 0; i < numAllBlocks; i++) {
++      final StripedDataStreamer streamer = getStripedDataStreamer(i);
++      final DatanodeInfo[] nodes = streamer.getNodes();
++      final String[] storageIDs = streamer.getStorageIDs();
++      if (streamer.isHealthy() && nodes != null && storageIDs != null) {
++        newNodes[i] = nodes[0];
++        newStorageIDs[i] = storageIDs[0];
++      } else {
++        newNodes[i] = new DatanodeInfo(DatanodeID.EMPTY_DATANODE_ID);
++        newStorageIDs[i] = "";
++      }
++    }
++    dfsClient.namenode.updatePipeline(dfsClient.clientName, currentBlockGroup,
++        newBG, newNodes, newStorageIDs);
++    currentBlockGroup = newBG;
++  }
++
++  private int stripeDataSize() {
++    return numDataBlocks * cellSize;
++  }
++
++  @Override
++  public void hflush() {
++    throw new UnsupportedOperationException();
++  }
++
++  @Override
++  public void hsync() {
++    throw new UnsupportedOperationException();
++  }
++
++  @Override
++  protected synchronized void start() {
++    for (StripedDataStreamer streamer : streamers) {
++      streamer.start();
++    }
++  }
++
++  @Override
++  synchronized void abort() throws IOException {
++    if (isClosed()) {
++      return;
++    }
++    for (StripedDataStreamer streamer : streamers) {
++      streamer.getLastException().set(new IOException("Lease timeout of "
++          + (dfsClient.getConf().getHdfsTimeout()/1000) +
++          " seconds expired."));
++    }
++    closeThreads(true);
++    dfsClient.endFileLease(fileId);
++  }
++
++  @Override
++  boolean isClosed() {
++    if (closed) {
++      return true;
++    }
++    for(StripedDataStreamer s : streamers) {
++      if (!s.streamerClosed()) {
++        return false;
++      }
++    }
++    return true;
++  }
++
++  @Override
++  protected void closeThreads(boolean force) throws IOException {
++    final MultipleIOException.Builder b = new MultipleIOException.Builder();
++    try {
++      for (StripedDataStreamer streamer : streamers) {
++        try {
++          streamer.close(force);
++          streamer.join();
++          streamer.closeSocket();
++        } catch (Exception e) {
++          try {
++            handleStreamerFailure("force=" + force, e);
++          } catch (IOException ioe) {
++            b.add(ioe);
++          }
++        } finally {
++          streamer.setSocketToNull();
++        }
++      }
++    } finally {
++      setClosed();
++    }
++    final IOException ioe = b.build();
++    if (ioe != null) {
++      throw ioe;
++    }
++  }
++
++  private boolean generateParityCellsForLastStripe() {
++    final long currentBlockGroupBytes = currentBlockGroup == null ?
++        0 : currentBlockGroup.getNumBytes();
++    final long lastStripeSize = currentBlockGroupBytes % stripeDataSize();
++    if (lastStripeSize == 0) {
++      return false;
++    }
++
++    final long parityCellSize = lastStripeSize < cellSize?
++        lastStripeSize : cellSize;
++    final ByteBuffer[] buffers = cellBuffers.getBuffers();
++
++    for (int i = 0; i < numAllBlocks; i++) {
++      // Pad zero bytes to make all cells exactly the size of parityCellSize
++      // If internal block is smaller than parity block, pad zero bytes.
++      // Also pad zero bytes to all parity cells
++      final int position = buffers[i].position();
++      assert position <= parityCellSize : "If an internal block is smaller" +
++          " than parity block, then its last cell should be small than last" +
++          " parity cell";
++      for (int j = 0; j < parityCellSize - position; j++) {
++        buffers[i].put((byte) 0);
++      }
++      buffers[i].flip();
++    }
++    return true;
++  }
++
++  void writeParityCells() throws IOException {
++    final ByteBuffer[] buffers = cellBuffers.getBuffers();
++    //encode the data cells
++    encode(encoder, numDataBlocks, buffers);
++    for (int i = numDataBlocks; i < numAllBlocks; i++) {
++      writeParity(i, buffers[i], cellBuffers.getChecksumArray(i));
++    }
++    cellBuffers.clear();
++  }
++
++  void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf)
++      throws IOException {
++    final StripedDataStreamer current = setCurrentStreamer(index);
++    final int len = buffer.limit();
++
++    final long oldBytes = current.getBytesCurBlock();
++    if (current.isHealthy()) {
++      try {
++        DataChecksum sum = getDataChecksum();
++        sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
++        for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
++          int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
++          int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
++          super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset,
++              getChecksumSize());
++        }
++      } catch(Exception e) {
++        handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
++      }
++    }
++  }
++
++  @Override
++  void setClosed() {
++    super.setClosed();
++    for (int i = 0; i < numAllBlocks; i++) {
++      getStripedDataStreamer(i).release();
++    }
++    cellBuffers.release();
++  }
++
++  @Override
++  protected synchronized void closeImpl() throws IOException {
++    if (isClosed()) {
++      final MultipleIOException.Builder b = new MultipleIOException.Builder();
++      for(int i = 0; i < streamers.size(); i++) {
++        final StripedDataStreamer si = getStripedDataStreamer(i);
++        try {
++          si.getLastException().check(true);
++        } catch (IOException e) {
++          b.add(e);
++        }
++      }
++      final IOException ioe = b.build();
++      if (ioe != null) {
++        throw ioe;
++      }
++      return;
++    }
++
++    try {
++      // flush from all upper layers
++      flushBuffer();
++      // if the last stripe is incomplete, generate and write parity cells
++      if (generateParityCellsForLastStripe()) {
++        writeParityCells();
++      }
++      enqueueAllCurrentPackets();
++
++      // flush all the data packets
++      flushAllInternals();
++      // check failures
++      checkStreamerFailures();
++
++      for (int i = 0; i < numAllBlocks; i++) {
++        final StripedDataStreamer s = setCurrentStreamer(i);
++        if (s.isHealthy()) {
++          try {
++            if (s.getBytesCurBlock() > 0) {
++              setCurrentPacketToEmpty();
++            }
++            // flush the last "close" packet to Datanode
++            flushInternal();
++          } catch(Exception e) {
++            // TODO for both close and endBlock, we currently do not handle
++            // failures when sending the last packet. We actually do not need to
++            // bump GS for this kind of failure. Thus counting the total number
++            // of failures may be good enough.
++          }
++        }
++      }
++
++      closeThreads(false);
++      TraceScope scope = dfsClient.getTracer().newScope("completeFile");
++      try {
++        completeFile(currentBlockGroup);
++      } finally {
++        scope.close();
++      }
++      dfsClient.endFileLease(fileId);
++    } catch (ClosedChannelException ignored) {
++    } finally {
++      setClosed();
++    }
++  }
++
++  private void enqueueAllCurrentPackets() throws IOException {
++    int idx = streamers.indexOf(getCurrentStreamer());
++    for(int i = 0; i < streamers.size(); i++) {
++      final StripedDataStreamer si = setCurrentStreamer(i);
++      if (si.isHealthy() && currentPacket != null) {
++        try {
++          enqueueCurrentPacket();
++        } catch (IOException e) {
++          handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e);
++        }
++      }
++    }
++    setCurrentStreamer(idx);
++  }
++
++  void flushAllInternals() throws IOException {
++    int current = getCurrentIndex();
++
++    for (int i = 0; i < numAllBlocks; i++) {
++      final StripedDataStreamer s = setCurrentStreamer(i);
++      if (s.isHealthy()) {
++        try {
++          // flush all data to Datanode
++          flushInternal();
++        } catch(Exception e) {
++          handleStreamerFailure("flushInternal " + s, e);
++        }
++      }
++    }
++    setCurrentStreamer(current);
++  }
++
++  static void sleep(long ms, String op) throws InterruptedIOException {
++    try {
++      Thread.sleep(ms);
++    } catch(InterruptedException ie) {
++      throw DFSUtilClient.toInterruptedIOException(
++          "Sleep interrupted during " + op, ie);
++    }
++  }
++
++  @Override
++  ExtendedBlock getBlock() {
++    return currentBlockGroup;
++  }
++}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 359886e,e275afb..f96ae65
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@@ -53,6 -54,6 +54,7 @@@ import org.slf4j.LoggerFactory
  
  import javax.net.SocketFactory;
  import java.io.IOException;
++import java.io.InterruptedIOException;
  import java.io.UnsupportedEncodingException;
  import java.net.InetAddress;
  import java.net.InetSocketAddress;
@@@ -628,4 -652,4 +653,11 @@@ public class DFSUtilClient 
      return URI.create(HdfsConstants.HDFS_URI_SCHEME + "://"
          + namenode.getHostName() + portString);
    }
++
++  public static InterruptedIOException toInterruptedIOException(String message,
++      InterruptedException e) {
++    final InterruptedIOException iioe = new InterruptedIOException(message);
++    iioe.initCause(e);
++    return iioe;
++  }
  }