You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/30 17:41:37 UTC
[37/58] [abbrv] hadoop git commit: Merge remote-tracking branch
'apache/trunk' into HDFS-7285
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 0000000,0000000..69105a0
new file mode 100644
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@@ -1,0 -1,0 +1,972 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hdfs;
++
++import com.google.common.base.Preconditions;
++import org.apache.hadoop.fs.ChecksumException;
++import org.apache.hadoop.fs.ReadOption;
++import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
++import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
++import org.apache.hadoop.hdfs.protocol.LocatedBlock;
++import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
++import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
++import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
++import org.apache.hadoop.hdfs.util.StripedBlockUtil;
++import org.apache.hadoop.io.ByteBufferPool;
++
++import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
++import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
++import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
++
++import org.apache.hadoop.io.IOUtils;
++import org.apache.hadoop.io.erasurecode.CodecUtil;
++import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
++
++import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
++import org.apache.hadoop.util.DirectBufferPool;
++
++import java.io.EOFException;
++import java.io.IOException;
++import java.io.InterruptedIOException;
++import java.nio.ByteBuffer;
++import java.util.ArrayList;
++import java.util.Arrays;
++import java.util.Collections;
++import java.util.EnumSet;
++import java.util.List;
++import java.util.Set;
++import java.util.Collection;
++import java.util.Map;
++import java.util.HashMap;
++import java.util.concurrent.CompletionService;
++import java.util.concurrent.ConcurrentHashMap;
++import java.util.concurrent.ExecutorCompletionService;
++import java.util.concurrent.Callable;
++import java.util.concurrent.Future;
++
++/**
++ * DFSStripedInputStream reads from striped block groups
++ */
++public class DFSStripedInputStream extends DFSInputStream {
++
++ private static class ReaderRetryPolicy {
++ private int fetchEncryptionKeyTimes = 1;
++ private int fetchTokenTimes = 1;
++
++ void refetchEncryptionKey() {
++ fetchEncryptionKeyTimes--;
++ }
++
++ void refetchToken() {
++ fetchTokenTimes--;
++ }
++
++ boolean shouldRefetchEncryptionKey() {
++ return fetchEncryptionKeyTimes > 0;
++ }
++
++ boolean shouldRefetchToken() {
++ return fetchTokenTimes > 0;
++ }
++ }
++
++ /** Used to indicate the buffered data's range in the block group */
++ private static class StripeRange {
++ /** start offset in the block group (inclusive) */
++ final long offsetInBlock;
++ /** length of the stripe range */
++ final long length;
++
++ StripeRange(long offsetInBlock, long length) {
++ Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
++ this.offsetInBlock = offsetInBlock;
++ this.length = length;
++ }
++
++ boolean include(long pos) {
++ return pos >= offsetInBlock && pos < offsetInBlock + length;
++ }
++ }
++
++ private static class BlockReaderInfo {
++ final BlockReader reader;
++ final DatanodeInfo datanode;
++ /**
++ * when initializing block readers, their starting offsets are set to the same
++ * number: the smallest internal block offsets among all the readers. This is
++ * because it is possible that for some internal blocks we have to read
++ * "backwards" for decoding purpose. We thus use this offset array to track
++ * offsets for all the block readers so that we can skip data if necessary.
++ */
++ long blockReaderOffset;
++ /**
++ * We use this field to indicate whether we should use this reader. In case
++ * we hit any issue with this reader, we set this field to true and avoid
++ * using it for the next stripe.
++ */
++ boolean shouldSkip = false;
++
++ BlockReaderInfo(BlockReader reader, DatanodeInfo dn, long offset) {
++ this.reader = reader;
++ this.datanode = dn;
++ this.blockReaderOffset = offset;
++ }
++
++ void setOffset(long offset) {
++ this.blockReaderOffset = offset;
++ }
++
++ void skip() {
++ this.shouldSkip = true;
++ }
++ }
++
++ private static final DirectBufferPool bufferPool = new DirectBufferPool();
++
++ private final BlockReaderInfo[] blockReaders;
++ private final int cellSize;
++ private final short dataBlkNum;
++ private final short parityBlkNum;
++ private final int groupSize;
++ /** the buffer for a complete stripe */
++ private ByteBuffer curStripeBuf;
++ private ByteBuffer parityBuf;
++ private final ErasureCodingPolicy ecPolicy;
++ private final RawErasureDecoder decoder;
++
++ /**
++ * indicate the start/end offset of the current buffered stripe in the
++ * block group
++ */
++ private StripeRange curStripeRange;
++ private final CompletionService<Void> readingService;
++
++ /**
++ * When warning the user of a lost block in striping mode, we remember the
++ * dead nodes we've logged. All other striping blocks on these nodes can be
++ * considered lost too, and we don't want to log a warning for each of them.
++ * This is to prevent the log from being too verbose. Refer to HDFS-8920.
++ *
++ * To minimize the overhead, we only store the datanodeUuid in this set
++ */
++ private final Set<String> warnedNodes = Collections.newSetFromMap(
++ new ConcurrentHashMap<String, Boolean>());
++
++ DFSStripedInputStream(DFSClient dfsClient, String src,
++ boolean verifyChecksum, ErasureCodingPolicy ecPolicy,
++ LocatedBlocks locatedBlocks) throws IOException {
++ super(dfsClient, src, verifyChecksum, locatedBlocks);
++
++ assert ecPolicy != null;
++ this.ecPolicy = ecPolicy;
++ this.cellSize = ecPolicy.getCellSize();
++ dataBlkNum = (short) ecPolicy.getNumDataUnits();
++ parityBlkNum = (short) ecPolicy.getNumParityUnits();
++ groupSize = dataBlkNum + parityBlkNum;
++ blockReaders = new BlockReaderInfo[groupSize];
++ curStripeRange = new StripeRange(0, 0);
++ readingService =
++ new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
++ decoder = CodecUtil.createRSRawDecoder(dfsClient.getConfiguration(),
++ dataBlkNum, parityBlkNum);
++ if (DFSClient.LOG.isDebugEnabled()) {
++ DFSClient.LOG.debug("Creating an striped input stream for file " + src);
++ }
++ }
++
++ private void resetCurStripeBuffer() {
++ if (curStripeBuf == null) {
++ curStripeBuf = bufferPool.getBuffer(cellSize * dataBlkNum);
++ }
++ curStripeBuf.clear();
++ curStripeRange = new StripeRange(0, 0);
++ }
++
++ private ByteBuffer getParityBuffer() {
++ if (parityBuf == null) {
++ parityBuf = bufferPool.getBuffer(cellSize * parityBlkNum);
++ }
++ parityBuf.clear();
++ return parityBuf;
++ }
++
++ /**
++ * When seeking into a new block group, create blockReader for each internal
++ * block in the group.
++ */
++ private synchronized void blockSeekTo(long target) throws IOException {
++ if (target >= getFileLength()) {
++ throw new IOException("Attempted to read past end of file");
++ }
++
++ // Will be getting a new BlockReader.
++ closeCurrentBlockReaders();
++
++ // Compute desired striped block group
++ LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target);
++ // Update current position
++ this.pos = target;
++ this.blockEnd = targetBlockGroup.getStartOffset() +
++ targetBlockGroup.getBlockSize() - 1;
++ currentLocatedBlock = targetBlockGroup;
++ }
++
++ @Override
++ public synchronized void close() throws IOException {
++ super.close();
++ if (curStripeBuf != null) {
++ bufferPool.returnBuffer(curStripeBuf);
++ curStripeBuf = null;
++ }
++ if (parityBuf != null) {
++ bufferPool.returnBuffer(parityBuf);
++ parityBuf = null;
++ }
++ }
++
++ /**
++ * Extend the super method with the logic of switching between cells.
++ * When reaching the end of a cell, proceed to the next cell and read it
++ * with the next blockReader.
++ */
++ @Override
++ protected void closeCurrentBlockReaders() {
++ resetCurStripeBuffer();
++ if (blockReaders == null || blockReaders.length == 0) {
++ return;
++ }
++ for (int i = 0; i < groupSize; i++) {
++ closeReader(blockReaders[i]);
++ blockReaders[i] = null;
++ }
++ blockEnd = -1;
++ }
++
++ private void closeReader(BlockReaderInfo readerInfo) {
++ if (readerInfo != null) {
++// IOUtils.cleanup(null, readerInfo.reader);
++ readerInfo.skip();
++ }
++ }
++
++ private long getOffsetInBlockGroup() {
++ return getOffsetInBlockGroup(pos);
++ }
++
++ private long getOffsetInBlockGroup(long pos) {
++ return pos - currentLocatedBlock.getStartOffset();
++ }
++
++ /**
++ * Read a new stripe covering the current position, and store the data in the
++ * {@link #curStripeBuf}.
++ */
++ private void readOneStripe(
++ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
++ throws IOException {
++ resetCurStripeBuffer();
++
++ // compute stripe range based on pos
++ final long offsetInBlockGroup = getOffsetInBlockGroup();
++ final long stripeLen = cellSize * dataBlkNum;
++ final int stripeIndex = (int) (offsetInBlockGroup / stripeLen);
++ final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
++ final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
++ - (stripeIndex * stripeLen), stripeLen);
++ StripeRange stripeRange = new StripeRange(offsetInBlockGroup,
++ stripeLimit - stripeBufOffset);
++
++ LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
++ AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(ecPolicy, cellSize,
++ blockGroup, offsetInBlockGroup,
++ offsetInBlockGroup + stripeRange.length - 1, curStripeBuf);
++ final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
++ blockGroup, cellSize, dataBlkNum, parityBlkNum);
++ // read the whole stripe
++ for (AlignedStripe stripe : stripes) {
++ // Parse group to get chosen DN location
++ StripeReader sreader = new StatefulStripeReader(readingService, stripe,
++ blks, blockReaders, corruptedBlockMap);
++ sreader.readStripe();
++ }
++ curStripeBuf.position(stripeBufOffset);
++ curStripeBuf.limit(stripeLimit);
++ curStripeRange = stripeRange;
++ }
++
++ private Callable<Void> readCells(final BlockReader reader,
++ final DatanodeInfo datanode, final long currentReaderOffset,
++ final long targetReaderOffset, final ByteBufferStrategy[] strategies,
++ final ExtendedBlock currentBlock,
++ final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
++ return new Callable<Void>() {
++ @Override
++ public Void call() throws Exception {
++ // reader can be null if getBlockReaderWithRetry failed or
++ // the reader hit exception before
++ if (reader == null) {
++ throw new IOException("The BlockReader is null. " +
++ "The BlockReader creation failed or the reader hit exception.");
++ }
++ Preconditions.checkState(currentReaderOffset <= targetReaderOffset);
++ if (currentReaderOffset < targetReaderOffset) {
++ long skipped = reader.skip(targetReaderOffset - currentReaderOffset);
++ Preconditions.checkState(
++ skipped == targetReaderOffset - currentReaderOffset);
++ }
++ int result = 0;
++ for (ByteBufferStrategy strategy : strategies) {
++ result += readToBuffer(reader, datanode, strategy, currentBlock,
++ corruptedBlockMap);
++ }
++ return null;
++ }
++ };
++ }
++
++ private int readToBuffer(BlockReader blockReader,
++ DatanodeInfo currentNode, ByteBufferStrategy strategy,
++ ExtendedBlock currentBlock,
++ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
++ throws IOException {
++ final int targetLength = strategy.buf.remaining();
++ int length = 0;
++ try {
++ while (length < targetLength) {
++ int ret = strategy.doRead(blockReader, 0, 0);
++ if (ret < 0) {
++ throw new IOException("Unexpected EOS from the reader");
++ }
++ length += ret;
++ }
++ return length;
++ } catch (ChecksumException ce) {
++ DFSClient.LOG.warn("Found Checksum error for "
++ + currentBlock + " from " + currentNode
++ + " at " + ce.getPos());
++ // we want to remember which block replicas we have tried
++ addIntoCorruptedBlockMap(currentBlock, currentNode,
++ corruptedBlockMap);
++ throw ce;
++ } catch (IOException e) {
++ DFSClient.LOG.warn("Exception while reading from "
++ + currentBlock + " of " + src + " from "
++ + currentNode, e);
++ throw e;
++ }
++ }
++
++ /**
++ * Seek to a new arbitrary location
++ */
++ @Override
++ public synchronized void seek(long targetPos) throws IOException {
++ if (targetPos > getFileLength()) {
++ throw new EOFException("Cannot seek after EOF");
++ }
++ if (targetPos < 0) {
++ throw new EOFException("Cannot seek to negative offset");
++ }
++ if (closed.get()) {
++ throw new IOException("Stream is closed!");
++ }
++ if (targetPos <= blockEnd) {
++ final long targetOffsetInBlk = getOffsetInBlockGroup(targetPos);
++ if (curStripeRange.include(targetOffsetInBlk)) {
++ int bufOffset = getStripedBufOffset(targetOffsetInBlk);
++ curStripeBuf.position(bufOffset);
++ pos = targetPos;
++ return;
++ }
++ }
++ pos = targetPos;
++ blockEnd = -1;
++ }
++
++ private int getStripedBufOffset(long offsetInBlockGroup) {
++ final long stripeLen = cellSize * dataBlkNum;
++ // compute the position in the curStripeBuf based on "pos"
++ return (int) (offsetInBlockGroup % stripeLen);
++ }
++
++ @Override
++ public synchronized boolean seekToNewSource(long targetPos)
++ throws IOException {
++ return false;
++ }
++
++ @Override
++ protected synchronized int readWithStrategy(ReaderStrategy strategy,
++ int off, int len) throws IOException {
++ dfsClient.checkOpen();
++ if (closed.get()) {
++ throw new IOException("Stream closed");
++ }
++ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap =
++ new ConcurrentHashMap<>();
++ if (pos < getFileLength()) {
++ try {
++ if (pos > blockEnd) {
++ blockSeekTo(pos);
++ }
++ int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
++ synchronized (infoLock) {
++ if (locatedBlocks.isLastBlockComplete()) {
++ realLen = (int) Math.min(realLen,
++ locatedBlocks.getFileLength() - pos);
++ }
++ }
++
++ /** Number of bytes already read into buffer */
++ int result = 0;
++ while (result < realLen) {
++ if (!curStripeRange.include(getOffsetInBlockGroup())) {
++ readOneStripe(corruptedBlockMap);
++ }
++ int ret = copyToTargetBuf(strategy, off + result, realLen - result);
++ result += ret;
++ pos += ret;
++ }
++ if (dfsClient.stats != null) {
++ dfsClient.stats.incrementBytesRead(result);
++ }
++ return result;
++ } finally {
++ // Check if need to report block replicas corruption either read
++ // was successful or ChecksumException occured.
++ reportCheckSumFailure(corruptedBlockMap,
++ currentLocatedBlock.getLocations().length);
++ }
++ }
++ return -1;
++ }
++
++ /**
++ * Copy the data from {@link #curStripeBuf} into the given buffer
++ * @param strategy the ReaderStrategy containing the given buffer
++ * @param offset the offset of the given buffer. Used only when strategy is
++ * a ByteArrayStrategy
++ * @param length target length
++ * @return number of bytes copied
++ */
++ private int copyToTargetBuf(ReaderStrategy strategy, int offset, int length) {
++ final long offsetInBlk = getOffsetInBlockGroup();
++ int bufOffset = getStripedBufOffset(offsetInBlk);
++ curStripeBuf.position(bufOffset);
++ return strategy.copyFrom(curStripeBuf, offset,
++ Math.min(length, curStripeBuf.remaining()));
++ }
++
++ /**
++ * The super method {@link DFSInputStream#refreshLocatedBlock} refreshes
++ * cached LocatedBlock by executing {@link DFSInputStream#getBlockAt} again.
++ * This method extends the logic by first remembering the index of the
++ * internal block, and re-parsing the refreshed block group with the same
++ * index.
++ */
++ @Override
++ protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
++ throws IOException {
++ int idx = StripedBlockUtil.getBlockIndex(block.getBlock().getLocalBlock());
++ LocatedBlock lb = getBlockGroupAt(block.getStartOffset());
++ // If indexing information is returned, iterate through the index array
++ // to find the entry for position idx in the group
++ LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
++ int i = 0;
++ for (; i < lsb.getBlockIndices().length; i++) {
++ if (lsb.getBlockIndices()[i] == idx) {
++ break;
++ }
++ }
++ if (DFSClient.LOG.isDebugEnabled()) {
++ DFSClient.LOG.debug("refreshLocatedBlock for striped blocks, offset="
++ + block.getStartOffset() + ". Obtained block " + lb + ", idx=" + idx);
++ }
++ return StripedBlockUtil.constructInternalBlock(
++ lsb, i, cellSize, dataBlkNum, idx);
++ }
++
++ private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException {
++ LocatedBlock lb = super.getBlockAt(offset);
++ assert lb instanceof LocatedStripedBlock : "NameNode" +
++ " should return a LocatedStripedBlock for a striped file";
++ return (LocatedStripedBlock)lb;
++ }
++
++ /**
++ * Real implementation of pread.
++ */
++ @Override
++ protected void fetchBlockByteRange(LocatedBlock block, long start,
++ long end, byte[] buf, int offset,
++ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
++ throws IOException {
++ // Refresh the striped block group
++ LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset());
++
++ AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes(
++ ecPolicy, cellSize, blockGroup, start, end, buf, offset);
++ CompletionService<Void> readService = new ExecutorCompletionService<>(
++ dfsClient.getStripedReadsThreadPool());
++ final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
++ blockGroup, cellSize, dataBlkNum, parityBlkNum);
++ final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize];
++ try {
++ for (AlignedStripe stripe : stripes) {
++ // Parse group to get chosen DN location
++ StripeReader preader = new PositionStripeReader(readService, stripe,
++ blks, preaderInfos, corruptedBlockMap);
++ preader.readStripe();
++ }
++ } finally {
++ for (BlockReaderInfo preaderInfo : preaderInfos) {
++ closeReader(preaderInfo);
++ }
++ }
++ }
++
++ @Override
++ protected void reportLostBlock(LocatedBlock lostBlock,
++ Collection<DatanodeInfo> ignoredNodes) {
++ DatanodeInfo[] nodes = lostBlock.getLocations();
++ if (nodes != null && nodes.length > 0) {
++ List<String> dnUUIDs = new ArrayList<>();
++ for (DatanodeInfo node : nodes) {
++ dnUUIDs.add(node.getDatanodeUuid());
++ }
++ if (!warnedNodes.containsAll(dnUUIDs)) {
++ DFSClient.LOG.warn(Arrays.toString(nodes) + " are unavailable and " +
++ "all striping blocks on them are lost. " +
++ "IgnoredNodes = " + ignoredNodes);
++ warnedNodes.addAll(dnUUIDs);
++ }
++ } else {
++ super.reportLostBlock(lostBlock, ignoredNodes);
++ }
++ }
++
++ /**
++ * The reader for reading a complete {@link AlignedStripe}. Note that an
++ * {@link AlignedStripe} may cross multiple stripes with cellSize width.
++ */
++ private abstract class StripeReader {
++ final Map<Future<Void>, Integer> futures = new HashMap<>();
++ final AlignedStripe alignedStripe;
++ final CompletionService<Void> service;
++ final LocatedBlock[] targetBlocks;
++ final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap;
++ final BlockReaderInfo[] readerInfos;
++
++ StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe,
++ LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
++ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
++ this.service = service;
++ this.alignedStripe = alignedStripe;
++ this.targetBlocks = targetBlocks;
++ this.readerInfos = readerInfos;
++ this.corruptedBlockMap = corruptedBlockMap;
++ }
++
++ /** prepare all the data chunks */
++ abstract void prepareDecodeInputs();
++
++ /** prepare the parity chunk and block reader if necessary */
++ abstract boolean prepareParityChunk(int index) throws IOException;
++
++ abstract void decode();
++
++ void updateState4SuccessRead(StripingChunkReadResult result) {
++ Preconditions.checkArgument(
++ result.state == StripingChunkReadResult.SUCCESSFUL);
++ readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock()
++ + alignedStripe.getSpanInBlock());
++ }
++
++ private void checkMissingBlocks() throws IOException {
++ if (alignedStripe.missingChunksNum > parityBlkNum) {
++ clearFutures(futures.keySet());
++ throw new IOException(alignedStripe.missingChunksNum
++ + " missing blocks, the stripe is: " + alignedStripe);
++ }
++ }
++
++ /**
++ * We need decoding. Thus go through all the data chunks and make sure we
++ * submit read requests for all of them.
++ */
++ private void readDataForDecoding() throws IOException {
++ prepareDecodeInputs();
++ for (int i = 0; i < dataBlkNum; i++) {
++ Preconditions.checkNotNull(alignedStripe.chunks[i]);
++ if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
++ if (!readChunk(targetBlocks[i], i)) {
++ alignedStripe.missingChunksNum++;
++ }
++ }
++ }
++ checkMissingBlocks();
++ }
++
++ void readParityChunks(int num) throws IOException {
++ for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
++ i++) {
++ if (alignedStripe.chunks[i] == null) {
++ if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) {
++ j++;
++ } else {
++ alignedStripe.missingChunksNum++;
++ }
++ }
++ }
++ checkMissingBlocks();
++ }
++
++ boolean createBlockReader(LocatedBlock block, int chunkIndex)
++ throws IOException {
++ BlockReader reader = null;
++ final ReaderRetryPolicy retry = new ReaderRetryPolicy();
++ DNAddrPair dnInfo = new DNAddrPair(null, null, null);
++
++ while(true) {
++ try {
++ // the cached block location might have been re-fetched, so always
++ // get it from cache.
++ block = refreshLocatedBlock(block);
++ targetBlocks[chunkIndex] = block;
++
++ // internal block has one location, just rule out the deadNodes
++ dnInfo = getBestNodeDNAddrPair(block, null);
++ if (dnInfo == null) {
++ break;
++ }
++ reader = getBlockReader(block, alignedStripe.getOffsetInBlock(),
++ block.getBlockSize() - alignedStripe.getOffsetInBlock(),
++ dnInfo.addr, dnInfo.storageType, dnInfo.info);
++ } catch (IOException e) {
++ if (e instanceof InvalidEncryptionKeyException &&
++ retry.shouldRefetchEncryptionKey()) {
++ DFSClient.LOG.info("Will fetch a new encryption key and retry, "
++ + "encryption key was invalid when connecting to " + dnInfo.addr
++ + " : " + e);
++ dfsClient.clearDataEncryptionKey();
++ retry.refetchEncryptionKey();
++ } else if (retry.shouldRefetchToken() &&
++ tokenRefetchNeeded(e, dnInfo.addr)) {
++ fetchBlockAt(block.getStartOffset());
++ retry.refetchToken();
++ } else {
++ //TODO: handles connection issues
++ DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
++ "block" + block.getBlock(), e);
++ // re-fetch the block in case the block has been moved
++ fetchBlockAt(block.getStartOffset());
++ addToDeadNodes(dnInfo.info);
++ }
++ }
++ if (reader != null) {
++ readerInfos[chunkIndex] = new BlockReaderInfo(reader, dnInfo.info,
++ alignedStripe.getOffsetInBlock());
++ return true;
++ }
++ }
++ return false;
++ }
++
++ private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
++ if (chunk.byteBuffer != null) {
++ ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer);
++ return new ByteBufferStrategy[]{strategy};
++ } else {
++ ByteBufferStrategy[] strategies =
++ new ByteBufferStrategy[chunk.byteArray.getOffsets().length];
++ for (int i = 0; i < strategies.length; i++) {
++ ByteBuffer buffer = ByteBuffer.wrap(chunk.byteArray.buf(),
++ chunk.byteArray.getOffsets()[i], chunk.byteArray.getLengths()[i]);
++ strategies[i] = new ByteBufferStrategy(buffer);
++ }
++ return strategies;
++ }
++ }
++
++ boolean readChunk(final LocatedBlock block, int chunkIndex)
++ throws IOException {
++ final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
++ if (block == null) {
++ chunk.state = StripingChunk.MISSING;
++ return false;
++ }
++ if (readerInfos[chunkIndex] == null) {
++ if (!createBlockReader(block, chunkIndex)) {
++ chunk.state = StripingChunk.MISSING;
++ return false;
++ }
++ } else if (readerInfos[chunkIndex].shouldSkip) {
++ chunk.state = StripingChunk.MISSING;
++ return false;
++ }
++
++ chunk.state = StripingChunk.PENDING;
++ Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
++ readerInfos[chunkIndex].datanode,
++ readerInfos[chunkIndex].blockReaderOffset,
++ alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
++ block.getBlock(), corruptedBlockMap);
++
++ Future<Void> request = service.submit(readCallable);
++ futures.put(request, chunkIndex);
++ return true;
++ }
++
++ /** read the whole stripe. do decoding if necessary */
++ void readStripe() throws IOException {
++ for (int i = 0; i < dataBlkNum; i++) {
++ if (alignedStripe.chunks[i] != null &&
++ alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
++ if (!readChunk(targetBlocks[i], i)) {
++ alignedStripe.missingChunksNum++;
++ }
++ }
++ }
++ // There are missing block locations at this stage. Thus we need to read
++ // the full stripe and one more parity block.
++ if (alignedStripe.missingChunksNum > 0) {
++ checkMissingBlocks();
++ readDataForDecoding();
++ // read parity chunks
++ readParityChunks(alignedStripe.missingChunksNum);
++ }
++ // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
++
++ // Input buffers for potential decode operation, which remains null until
++ // first read failure
++ while (!futures.isEmpty()) {
++ try {
++ StripingChunkReadResult r = StripedBlockUtil
++ .getNextCompletedStripedRead(service, futures, 0);
++ if (DFSClient.LOG.isDebugEnabled()) {
++ DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
++ + alignedStripe);
++ }
++ StripingChunk returnedChunk = alignedStripe.chunks[r.index];
++ Preconditions.checkNotNull(returnedChunk);
++ Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
++
++ if (r.state == StripingChunkReadResult.SUCCESSFUL) {
++ returnedChunk.state = StripingChunk.FETCHED;
++ alignedStripe.fetchedChunksNum++;
++ updateState4SuccessRead(r);
++ if (alignedStripe.fetchedChunksNum == dataBlkNum) {
++ clearFutures(futures.keySet());
++ break;
++ }
++ } else {
++ returnedChunk.state = StripingChunk.MISSING;
++ // close the corresponding reader
++ closeReader(readerInfos[r.index]);
++
++ final int missing = alignedStripe.missingChunksNum;
++ alignedStripe.missingChunksNum++;
++ checkMissingBlocks();
++
++ readDataForDecoding();
++ readParityChunks(alignedStripe.missingChunksNum - missing);
++ }
++ } catch (InterruptedException ie) {
++ String err = "Read request interrupted";
++ DFSClient.LOG.error(err);
++ clearFutures(futures.keySet());
++ // Don't decode if read interrupted
++ throw new InterruptedIOException(err);
++ }
++ }
++
++ if (alignedStripe.missingChunksNum > 0) {
++ decode();
++ }
++ }
++ }
++
++ class PositionStripeReader extends StripeReader {
++ private byte[][] decodeInputs = null;
++
++ PositionStripeReader(CompletionService<Void> service,
++ AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
++ BlockReaderInfo[] readerInfos,
++ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
++ super(service, alignedStripe, targetBlocks, readerInfos,
++ corruptedBlockMap);
++ }
++
++ @Override
++ void prepareDecodeInputs() {
++ if (decodeInputs == null) {
++ decodeInputs = StripedBlockUtil.initDecodeInputs(alignedStripe,
++ dataBlkNum, parityBlkNum);
++ }
++ }
++
++ @Override
++ boolean prepareParityChunk(int index) {
++ Preconditions.checkState(index >= dataBlkNum &&
++ alignedStripe.chunks[index] == null);
++ final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index,
++ dataBlkNum, parityBlkNum);
++ alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]);
++ alignedStripe.chunks[index].addByteArraySlice(0,
++ (int) alignedStripe.getSpanInBlock());
++ return true;
++ }
++
++ @Override
++ void decode() {
++ StripedBlockUtil.finalizeDecodeInputs(decodeInputs, dataBlkNum,
++ parityBlkNum, alignedStripe);
++ StripedBlockUtil.decodeAndFillBuffer(decodeInputs, alignedStripe,
++ dataBlkNum, parityBlkNum, decoder);
++ }
++ }
++
++ class StatefulStripeReader extends StripeReader {
++ ByteBuffer[] decodeInputs;
++
++ StatefulStripeReader(CompletionService<Void> service,
++ AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
++ BlockReaderInfo[] readerInfos,
++ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
++ super(service, alignedStripe, targetBlocks, readerInfos,
++ corruptedBlockMap);
++ }
++
++ @Override
++ void prepareDecodeInputs() {
++ if (decodeInputs == null) {
++ decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
++ final ByteBuffer cur;
++ synchronized (DFSStripedInputStream.this) {
++ cur = curStripeBuf.duplicate();
++ }
++ StripedBlockUtil.VerticalRange range = alignedStripe.range;
++ for (int i = 0; i < dataBlkNum; i++) {
++ cur.limit(cur.capacity());
++ int pos = (int) (range.offsetInBlock % cellSize + cellSize * i);
++ cur.position(pos);
++ cur.limit((int) (pos + range.spanInBlock));
++ final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i,
++ dataBlkNum, parityBlkNum);
++ decodeInputs[decodeIndex] = cur.slice();
++ if (alignedStripe.chunks[i] == null) {
++ alignedStripe.chunks[i] = new StripingChunk(
++ decodeInputs[decodeIndex]);
++ }
++ }
++ }
++ }
++
++ @Override
++ boolean prepareParityChunk(int index) throws IOException {
++ Preconditions.checkState(index >= dataBlkNum
++ && alignedStripe.chunks[index] == null);
++ if (blockReaders[index] != null && blockReaders[index].shouldSkip) {
++ alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
++ // we have failed the block reader before
++ return false;
++ }
++ final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index,
++ dataBlkNum, parityBlkNum);
++ ByteBuffer buf = getParityBuffer().duplicate();
++ buf.position(cellSize * decodeIndex);
++ buf.limit(cellSize * decodeIndex + (int) alignedStripe.range.spanInBlock);
++ decodeInputs[decodeIndex] = buf.slice();
++ alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]);
++ return true;
++ }
++
++ @Override
++ void decode() {
++ // TODO no copy for data chunks. this depends on HADOOP-12047
++ final int span = (int) alignedStripe.getSpanInBlock();
++ for (int i = 0; i < alignedStripe.chunks.length; i++) {
++ final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i,
++ dataBlkNum, parityBlkNum);
++ if (alignedStripe.chunks[i] != null &&
++ alignedStripe.chunks[i].state == StripingChunk.ALLZERO) {
++ for (int j = 0; j < span; j++) {
++ decodeInputs[decodeIndex].put((byte) 0);
++ }
++ decodeInputs[decodeIndex].flip();
++ } else if (alignedStripe.chunks[i] != null &&
++ alignedStripe.chunks[i].state == StripingChunk.FETCHED) {
++ decodeInputs[decodeIndex].position(0);
++ decodeInputs[decodeIndex].limit(span);
++ }
++ }
++ int[] decodeIndices = new int[parityBlkNum];
++ int pos = 0;
++ for (int i = 0; i < alignedStripe.chunks.length; i++) {
++ if (alignedStripe.chunks[i] != null &&
++ alignedStripe.chunks[i].state == StripingChunk.MISSING) {
++ int decodeIndex = StripedBlockUtil.convertIndex4Decode(i,
++ dataBlkNum, parityBlkNum);
++ if (i < dataBlkNum) {
++ decodeIndices[pos++] = decodeIndex;
++ } else {
++ decodeInputs[decodeIndex] = null;
++ }
++ }
++ }
++ decodeIndices = Arrays.copyOf(decodeIndices, pos);
++
++ final int decodeChunkNum = decodeIndices.length;
++ ByteBuffer[] outputs = new ByteBuffer[decodeChunkNum];
++ for (int i = 0; i < decodeChunkNum; i++) {
++ outputs[i] = decodeInputs[decodeIndices[i]];
++ outputs[i].position(0);
++ outputs[i].limit((int) alignedStripe.range.spanInBlock);
++ decodeInputs[decodeIndices[i]] = null;
++ }
++
++ decoder.decode(decodeInputs, decodeIndices, outputs);
++ }
++ }
++
++ /**
++ * May need online read recovery, zero-copy read doesn't make
++ * sense, so don't support it.
++ */
++ @Override
++ public synchronized ByteBuffer read(ByteBufferPool bufferPool,
++ int maxLength, EnumSet<ReadOption> opts)
++ throws IOException, UnsupportedOperationException {
++ throw new UnsupportedOperationException(
++ "Not support enhanced byte buffer access.");
++ }
++
++ @Override
++ public synchronized void releaseBuffer(ByteBuffer buffer) {
++ throw new UnsupportedOperationException(
++ "Not support enhanced byte buffer access.");
++ }
++
++ /** A variation to {@link DFSInputStream#cancelAll} */
++ private void clearFutures(Collection<Future<Void>> futures) {
++ for (Future<Void> future : futures) {
++ future.cancel(false);
++ }
++ futures.clear();
++ }
++}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index 0000000,0000000..bf4e10e
new file mode 100644
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@@ -1,0 -1,0 +1,953 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hdfs;
++
++import java.io.IOException;
++import java.io.InterruptedIOException;
++import java.nio.ByteBuffer;
++import java.nio.channels.ClosedChannelException;
++import java.util.ArrayList;
++import java.util.Arrays;
++import java.util.Collections;
++import java.util.EnumSet;
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.List;
++import java.util.Map;
++import java.util.Set;
++import java.util.concurrent.BlockingQueue;
++import java.util.concurrent.LinkedBlockingQueue;
++import java.util.concurrent.TimeUnit;
++
++import org.apache.hadoop.HadoopIllegalArgumentException;
++import org.apache.hadoop.classification.InterfaceAudience;
++import org.apache.hadoop.fs.CreateFlag;
++import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
++import org.apache.hadoop.hdfs.protocol.ClientProtocol;
++import org.apache.hadoop.hdfs.protocol.DatanodeID;
++import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
++import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
++import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
++import org.apache.hadoop.hdfs.protocol.LocatedBlock;
++import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
++import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
++import org.apache.hadoop.hdfs.util.StripedBlockUtil;
++import org.apache.hadoop.io.MultipleIOException;
++import org.apache.hadoop.io.erasurecode.CodecUtil;
++import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
++import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
++import org.apache.hadoop.util.DataChecksum;
++import org.apache.hadoop.util.Progressable;
++import org.apache.hadoop.util.Time;
++
++import com.google.common.base.Preconditions;
++import org.apache.htrace.core.TraceScope;
++
++
++/**
++ * This class supports writing files in striped layout and erasure coded format.
++ * Each stripe contains a sequence of cells.
++ */
++@InterfaceAudience.Private
++public class DFSStripedOutputStream extends DFSOutputStream {
++ static class MultipleBlockingQueue<T> {
++ private final List<BlockingQueue<T>> queues;
++
++ MultipleBlockingQueue(int numQueue, int queueSize) {
++ List<BlockingQueue<T>> list = new ArrayList<>(numQueue);
++ for (int i = 0; i < numQueue; i++) {
++ list.add(new LinkedBlockingQueue<T>(queueSize));
++ }
++ queues = Collections.synchronizedList(list);
++ }
++
++ void offer(int i, T object) {
++ final boolean b = queues.get(i).offer(object);
++ Preconditions.checkState(b, "Failed to offer " + object
++ + " to queue, i=" + i);
++ }
++
++ T take(int i) throws InterruptedIOException {
++ try {
++ return queues.get(i).take();
++ } catch(InterruptedException ie) {
++ throw DFSUtilClient.toInterruptedIOException("take interrupted, i=" + i, ie);
++ }
++ }
++
++ T takeWithTimeout(int i) throws InterruptedIOException {
++ try {
++ return queues.get(i).poll(100, TimeUnit.MILLISECONDS);
++ } catch (InterruptedException e) {
++ throw DFSUtilClient.toInterruptedIOException("take interrupted, i=" + i, e);
++ }
++ }
++
++ T poll(int i) {
++ return queues.get(i).poll();
++ }
++
++ T peek(int i) {
++ return queues.get(i).peek();
++ }
++
++ void clear() {
++ for (BlockingQueue<T> q : queues) {
++ q.clear();
++ }
++ }
++ }
++
++ /** Coordinate the communication between the streamers. */
++ static class Coordinator {
++ /**
++ * The next internal block to write to for each streamers. The
++ * DFSStripedOutputStream makes the {@link ClientProtocol#addBlock} RPC to
++ * get a new block group. The block group is split to internal blocks, which
++ * are then distributed into the queue for streamers to retrieve.
++ */
++ private final MultipleBlockingQueue<LocatedBlock> followingBlocks;
++ /**
++ * Used to sync among all the streamers before allocating a new block. The
++ * DFSStripedOutputStream uses this to make sure every streamer has finished
++ * writing the previous block.
++ */
++ private final MultipleBlockingQueue<ExtendedBlock> endBlocks;
++
++ /**
++ * The following data structures are used for syncing while handling errors
++ */
++ private final MultipleBlockingQueue<LocatedBlock> newBlocks;
++ private final Map<StripedDataStreamer, Boolean> updateStreamerMap;
++ private final MultipleBlockingQueue<Boolean> streamerUpdateResult;
++
++ Coordinator(final int numAllBlocks) {
++ followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
++ endBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
++ newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
++ updateStreamerMap = Collections.synchronizedMap(
++ new HashMap<StripedDataStreamer, Boolean>(numAllBlocks));
++ streamerUpdateResult = new MultipleBlockingQueue<>(numAllBlocks, 1);
++ }
++
++ MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() {
++ return followingBlocks;
++ }
++
++ MultipleBlockingQueue<LocatedBlock> getNewBlocks() {
++ return newBlocks;
++ }
++
++ void offerEndBlock(int i, ExtendedBlock block) {
++ endBlocks.offer(i, block);
++ }
++
++ void offerStreamerUpdateResult(int i, boolean success) {
++ streamerUpdateResult.offer(i, success);
++ }
++
++ boolean takeStreamerUpdateResult(int i) throws InterruptedIOException {
++ return streamerUpdateResult.take(i);
++ }
++
++ void updateStreamer(StripedDataStreamer streamer,
++ boolean success) {
++ assert !updateStreamerMap.containsKey(streamer);
++ updateStreamerMap.put(streamer, success);
++ }
++
++ void clearFailureStates() {
++ newBlocks.clear();
++ updateStreamerMap.clear();
++ streamerUpdateResult.clear();
++ }
++ }
++
++ /** Buffers for writing the data and parity cells of a stripe. */
++ class CellBuffers {
++ private final ByteBuffer[] buffers;
++ private final byte[][] checksumArrays;
++
++ CellBuffers(int numParityBlocks) throws InterruptedException{
++ if (cellSize % bytesPerChecksum != 0) {
++ throw new HadoopIllegalArgumentException("Invalid values: "
++ + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (="
++ + bytesPerChecksum + ") must divide cell size (=" + cellSize + ").");
++ }
++
++ checksumArrays = new byte[numParityBlocks][];
++ final int size = getChecksumSize() * (cellSize / bytesPerChecksum);
++ for (int i = 0; i < checksumArrays.length; i++) {
++ checksumArrays[i] = new byte[size];
++ }
++
++ buffers = new ByteBuffer[numAllBlocks];
++ for (int i = 0; i < buffers.length; i++) {
++ buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
++ }
++ }
++
++ private ByteBuffer[] getBuffers() {
++ return buffers;
++ }
++
++ byte[] getChecksumArray(int i) {
++ return checksumArrays[i - numDataBlocks];
++ }
++
++ private int addTo(int i, byte[] b, int off, int len) {
++ final ByteBuffer buf = buffers[i];
++ final int pos = buf.position() + len;
++ Preconditions.checkState(pos <= cellSize);
++ buf.put(b, off, len);
++ return pos;
++ }
++
++ private void clear() {
++ for (int i = 0; i< numAllBlocks; i++) {
++ buffers[i].clear();
++ if (i >= numDataBlocks) {
++ Arrays.fill(buffers[i].array(), (byte) 0);
++ }
++ }
++ }
++
++ private void release() {
++ for (int i = 0; i < numAllBlocks; i++) {
++ byteArrayManager.release(buffers[i].array());
++ }
++ }
++
++ private void flipDataBuffers() {
++ for (int i = 0; i < numDataBlocks; i++) {
++ buffers[i].flip();
++ }
++ }
++ }
++
++ private final Coordinator coordinator;
++ private final CellBuffers cellBuffers;
++ private final RawErasureEncoder encoder;
++ private final List<StripedDataStreamer> streamers;
++ private final DFSPacket[] currentPackets; // current Packet of each streamer
++
++ /** Size of each striping cell, must be a multiple of bytesPerChecksum */
++ private final int cellSize;
++ private final int numAllBlocks;
++ private final int numDataBlocks;
++ private ExtendedBlock currentBlockGroup;
++ private final String[] favoredNodes;
++ private final List<StripedDataStreamer> failedStreamers;
++
++ /** Construct a new output stream for creating a file. */
++ DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
++ EnumSet<CreateFlag> flag, Progressable progress,
++ DataChecksum checksum, String[] favoredNodes)
++ throws IOException {
++ super(dfsClient, src, stat, flag, progress, checksum, favoredNodes, false);
++ if (LOG.isDebugEnabled()) {
++ LOG.debug("Creating DFSStripedOutputStream for " + src);
++ }
++
++ final ErasureCodingPolicy ecPolicy = stat.getErasureCodingPolicy();
++ final int numParityBlocks = ecPolicy.getNumParityUnits();
++ cellSize = ecPolicy.getCellSize();
++ numDataBlocks = ecPolicy.getNumDataUnits();
++ numAllBlocks = numDataBlocks + numParityBlocks;
++ this.favoredNodes = favoredNodes;
++ failedStreamers = new ArrayList<>();
++
++ encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(),
++ numDataBlocks, numParityBlocks);
++
++ coordinator = new Coordinator(numAllBlocks);
++ try {
++ cellBuffers = new CellBuffers(numParityBlocks);
++ } catch (InterruptedException ie) {
++ throw DFSUtilClient.toInterruptedIOException(
++ "Failed to create cell buffers", ie);
++ }
++
++ streamers = new ArrayList<>(numAllBlocks);
++ for (short i = 0; i < numAllBlocks; i++) {
++ StripedDataStreamer streamer = new StripedDataStreamer(stat,
++ dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
++ favoredNodes, i, coordinator);
++ streamers.add(streamer);
++ }
++ currentPackets = new DFSPacket[streamers.size()];
++ setCurrentStreamer(0);
++ }
++
++ StripedDataStreamer getStripedDataStreamer(int i) {
++ return streamers.get(i);
++ }
++
++ int getCurrentIndex() {
++ return getCurrentStreamer().getIndex();
++ }
++
++ private synchronized StripedDataStreamer getCurrentStreamer() {
++ return (StripedDataStreamer) streamer;
++ }
++
++ private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) {
++ // backup currentPacket for current streamer
++ if (streamer != null) {
++ int oldIdx = streamers.indexOf(getCurrentStreamer());
++ if (oldIdx >= 0) {
++ currentPackets[oldIdx] = currentPacket;
++ }
++ }
++
++ streamer = getStripedDataStreamer(newIdx);
++ currentPacket = currentPackets[newIdx];
++ adjustChunkBoundary();
++
++ return getCurrentStreamer();
++ }
++
++ /**
++ * Encode the buffers, i.e. compute parities.
++ *
++ * @param buffers data buffers + parity buffers
++ */
++ private static void encode(RawErasureEncoder encoder, int numData,
++ ByteBuffer[] buffers) {
++ final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
++ final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
++ System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);
++ System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length);
++
++ encoder.encode(dataBuffers, parityBuffers);
++ }
++
++ /**
++ * check all the existing StripedDataStreamer and find newly failed streamers.
++ * @return The newly failed streamers.
++ * @throws IOException if less than {@link #numDataBlocks} streamers are still
++ * healthy.
++ */
++ private Set<StripedDataStreamer> checkStreamers() throws IOException {
++ Set<StripedDataStreamer> newFailed = new HashSet<>();
++ for(StripedDataStreamer s : streamers) {
++ if (!s.isHealthy() && !failedStreamers.contains(s)) {
++ newFailed.add(s);
++ }
++ }
++
++ final int failCount = failedStreamers.size() + newFailed.size();
++ if (LOG.isDebugEnabled()) {
++ LOG.debug("checkStreamers: " + streamers);
++ LOG.debug("healthy streamer count=" + (numAllBlocks - failCount));
++ LOG.debug("original failed streamers: " + failedStreamers);
++ LOG.debug("newly failed streamers: " + newFailed);
++ }
++ if (failCount > (numAllBlocks - numDataBlocks)) {
++ throw new IOException("Failed: the number of failed blocks = "
++ + failCount + " > the number of data blocks = "
++ + (numAllBlocks - numDataBlocks));
++ }
++ return newFailed;
++ }
++
++ private void handleStreamerFailure(String err, Exception e)
++ throws IOException {
++ LOG.warn("Failed: " + err + ", " + this, e);
++ getCurrentStreamer().getErrorState().setInternalError();
++ getCurrentStreamer().close(true);
++ checkStreamers();
++ currentPacket = null;
++ }
++
++ private void replaceFailedStreamers() {
++ assert streamers.size() == numAllBlocks;
++ for (short i = 0; i < numAllBlocks; i++) {
++ final StripedDataStreamer oldStreamer = getStripedDataStreamer(i);
++ if (!oldStreamer.isHealthy()) {
++ StripedDataStreamer streamer = new StripedDataStreamer(oldStreamer.stat,
++ dfsClient, src, oldStreamer.progress,
++ oldStreamer.checksum4WriteBlock, cachingStrategy, byteArrayManager,
++ favoredNodes, i, coordinator);
++ streamers.set(i, streamer);
++ currentPackets[i] = null;
++ if (i == 0) {
++ this.streamer = streamer;
++ }
++ streamer.start();
++ }
++ }
++ }
++
++ private void waitEndBlocks(int i) throws IOException {
++ while (getStripedDataStreamer(i).isHealthy()) {
++ final ExtendedBlock b = coordinator.endBlocks.takeWithTimeout(i);
++ if (b != null) {
++ StripedBlockUtil.checkBlocks(currentBlockGroup, i, b);
++ return;
++ }
++ }
++ }
++
++ private void allocateNewBlock() throws IOException {
++ if (currentBlockGroup != null) {
++ for (int i = 0; i < numAllBlocks; i++) {
++ // sync all the healthy streamers before writing to the new block
++ waitEndBlocks(i);
++ }
++ }
++ failedStreamers.clear();
++ // replace failed streamers
++ replaceFailedStreamers();
++
++ if (LOG.isDebugEnabled()) {
++ LOG.debug("Allocating new block group. The previous block group: "
++ + currentBlockGroup);
++ }
++
++ // TODO collect excludedNodes from all the data streamers
++ final LocatedBlock lb = addBlock(null, dfsClient, src, currentBlockGroup,
++ fileId, favoredNodes);
++ assert lb.isStriped();
++ if (lb.getLocations().length < numDataBlocks) {
++ throw new IOException("Failed to get " + numDataBlocks
++ + " nodes from namenode: blockGroupSize= " + numAllBlocks
++ + ", blocks.length= " + lb.getLocations().length);
++ }
++ // assign the new block to the current block group
++ currentBlockGroup = lb.getBlock();
++
++ final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
++ (LocatedStripedBlock) lb, cellSize, numDataBlocks,
++ numAllBlocks - numDataBlocks);
++ for (int i = 0; i < blocks.length; i++) {
++ StripedDataStreamer si = getStripedDataStreamer(i);
++ if (si.isHealthy()) { // skipping failed data streamer
++ if (blocks[i] == null) {
++ // Set exception and close streamer as there is no block locations
++ // found for the parity block.
++ LOG.warn("Failed to get block location for parity block, index=" + i);
++ si.getLastException().set(
++ new IOException("Failed to get following block, i=" + i));
++ si.getErrorState().setInternalError();
++ si.close(true);
++ } else {
++ coordinator.getFollowingBlocks().offer(i, blocks[i]);
++ }
++ }
++ }
++ }
++
++ private boolean shouldEndBlockGroup() {
++ return currentBlockGroup != null &&
++ currentBlockGroup.getNumBytes() == blockSize * numDataBlocks;
++ }
++
++ @Override
++ protected synchronized void writeChunk(byte[] bytes, int offset, int len,
++ byte[] checksum, int ckoff, int cklen) throws IOException {
++ final int index = getCurrentIndex();
++ final StripedDataStreamer current = getCurrentStreamer();
++ final int pos = cellBuffers.addTo(index, bytes, offset, len);
++ final boolean cellFull = pos == cellSize;
++
++ if (currentBlockGroup == null || shouldEndBlockGroup()) {
++ // the incoming data should belong to a new block. Allocate a new block.
++ allocateNewBlock();
++ }
++
++ currentBlockGroup.setNumBytes(currentBlockGroup.getNumBytes() + len);
++ if (current.isHealthy()) {
++ try {
++ super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
++ } catch(Exception e) {
++ handleStreamerFailure("offset=" + offset + ", length=" + len, e);
++ }
++ }
++
++ // Two extra steps are needed when a striping cell is full:
++ // 1. Forward the current index pointer
++ // 2. Generate parity packets if a full stripe of data cells are present
++ if (cellFull) {
++ int next = index + 1;
++ //When all data cells in a stripe are ready, we need to encode
++ //them and generate some parity cells. These cells will be
++ //converted to packets and put to their DataStreamer's queue.
++ if (next == numDataBlocks) {
++ cellBuffers.flipDataBuffers();
++ writeParityCells();
++ next = 0;
++ // check failure state for all the streamers. Bump GS if necessary
++ checkStreamerFailures();
++
++ // if this is the end of the block group, end each internal block
++ if (shouldEndBlockGroup()) {
++ for (int i = 0; i < numAllBlocks; i++) {
++ final StripedDataStreamer s = setCurrentStreamer(i);
++ if (s.isHealthy()) {
++ try {
++ endBlock();
++ } catch (IOException ignored) {}
++ }
++ }
++ }
++ }
++ setCurrentStreamer(next);
++ }
++ }
++
++ @Override
++ void enqueueCurrentPacketFull() throws IOException {
++ LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
++ + " appendChunk={}, {}", currentPacket, src, getStreamer()
++ .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
++ getStreamer());
++ enqueueCurrentPacket();
++ adjustChunkBoundary();
++ // no need to end block here
++ }
++
++ private Set<StripedDataStreamer> markExternalErrorOnStreamers() {
++ Set<StripedDataStreamer> healthySet = new HashSet<>();
++ for (StripedDataStreamer streamer : streamers) {
++ if (streamer.isHealthy() &&
++ streamer.getStage() == BlockConstructionStage.DATA_STREAMING) {
++ streamer.setExternalError();
++ healthySet.add(streamer);
++ }
++ }
++ return healthySet;
++ }
++
++ /**
++ * Check and handle data streamer failures. This is called only when we have
++ * written a full stripe (i.e., enqueue all packets for a full stripe), or
++ * when we're closing the outputstream.
++ */
++ private void checkStreamerFailures() throws IOException {
++ Set<StripedDataStreamer> newFailed = checkStreamers();
++ if (newFailed.size() > 0) {
++ // for healthy streamers, wait till all of them have fetched the new block
++ // and flushed out all the enqueued packets.
++ flushAllInternals();
++ }
++ // get all the current failed streamers after the flush
++ newFailed = checkStreamers();
++ while (newFailed.size() > 0) {
++ failedStreamers.addAll(newFailed);
++ coordinator.clearFailureStates();
++
++ // mark all the healthy streamers as external error
++ Set<StripedDataStreamer> healthySet = markExternalErrorOnStreamers();
++
++ // we have newly failed streamers, update block for pipeline
++ final ExtendedBlock newBG = updateBlockForPipeline(healthySet);
++
++ // wait till all the healthy streamers to
++ // 1) get the updated block info
++ // 2) create new block outputstream
++ newFailed = waitCreatingNewStreams(healthySet);
++ if (newFailed.size() + failedStreamers.size() >
++ numAllBlocks - numDataBlocks) {
++ throw new IOException(
++ "Data streamers failed while creating new block streams: "
++ + newFailed + ". There are not enough healthy streamers.");
++ }
++ for (StripedDataStreamer failedStreamer : newFailed) {
++ assert !failedStreamer.isHealthy();
++ }
++
++ // TODO we can also succeed if all the failed streamers have not taken
++ // the updated block
++ if (newFailed.size() == 0) {
++ // reset external error state of all the streamers
++ for (StripedDataStreamer streamer : healthySet) {
++ assert streamer.isHealthy();
++ streamer.getErrorState().reset();
++ }
++ updatePipeline(newBG);
++ }
++ for (int i = 0; i < numAllBlocks; i++) {
++ coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0);
++ }
++ }
++ }
++
++ private int checkStreamerUpdates(Set<StripedDataStreamer> failed,
++ Set<StripedDataStreamer> streamers) {
++ for (StripedDataStreamer streamer : streamers) {
++ if (!coordinator.updateStreamerMap.containsKey(streamer)) {
++ if (!streamer.isHealthy() &&
++ coordinator.getNewBlocks().peek(streamer.getIndex()) != null) {
++ // this streamer had internal error before getting updated block
++ failed.add(streamer);
++ }
++ }
++ }
++ return coordinator.updateStreamerMap.size() + failed.size();
++ }
++
++ private Set<StripedDataStreamer> waitCreatingNewStreams(
++ Set<StripedDataStreamer> healthyStreamers) throws IOException {
++ Set<StripedDataStreamer> failed = new HashSet<>();
++ final int expectedNum = healthyStreamers.size();
++ final long socketTimeout = dfsClient.getConf().getSocketTimeout();
++ // the total wait time should be less than the socket timeout, otherwise
++ // a slow streamer may cause other streamers to timeout. here we wait for
++ // half of the socket timeout
++ long remaingTime = socketTimeout > 0 ? socketTimeout/2 : Long.MAX_VALUE;
++ final long waitInterval = 1000;
++ synchronized (coordinator) {
++ while (checkStreamerUpdates(failed, healthyStreamers) < expectedNum
++ && remaingTime > 0) {
++ try {
++ long start = Time.monotonicNow();
++ coordinator.wait(waitInterval);
++ remaingTime -= Time.monotonicNow() - start;
++ } catch (InterruptedException e) {
++ throw DFSUtilClient.toInterruptedIOException("Interrupted when waiting" +
++ " for results of updating striped streamers", e);
++ }
++ }
++ }
++ synchronized (coordinator) {
++ for (StripedDataStreamer streamer : healthyStreamers) {
++ if (!coordinator.updateStreamerMap.containsKey(streamer)) {
++ // close the streamer if it is too slow to create new connection
++ streamer.setStreamerAsClosed();
++ failed.add(streamer);
++ }
++ }
++ }
++ for (Map.Entry<StripedDataStreamer, Boolean> entry :
++ coordinator.updateStreamerMap.entrySet()) {
++ if (!entry.getValue()) {
++ failed.add(entry.getKey());
++ }
++ }
++ for (StripedDataStreamer failedStreamer : failed) {
++ healthyStreamers.remove(failedStreamer);
++ }
++ return failed;
++ }
++
++ /**
++ * Call {@link ClientProtocol#updateBlockForPipeline} and assign updated block
++ * to healthy streamers.
++ * @param healthyStreamers The healthy data streamers. These streamers join
++ * the failure handling.
++ */
++ private ExtendedBlock updateBlockForPipeline(
++ Set<StripedDataStreamer> healthyStreamers) throws IOException {
++ final LocatedBlock updated = dfsClient.namenode.updateBlockForPipeline(
++ currentBlockGroup, dfsClient.clientName);
++ final long newGS = updated.getBlock().getGenerationStamp();
++ ExtendedBlock newBlock = new ExtendedBlock(currentBlockGroup);
++ newBlock.setGenerationStamp(newGS);
++ final LocatedBlock[] updatedBlks = StripedBlockUtil.parseStripedBlockGroup(
++ (LocatedStripedBlock) updated, cellSize, numDataBlocks,
++ numAllBlocks - numDataBlocks);
++
++ for (int i = 0; i < numAllBlocks; i++) {
++ StripedDataStreamer si = getStripedDataStreamer(i);
++ if (healthyStreamers.contains(si)) {
++ final LocatedBlock lb = new LocatedBlock(new ExtendedBlock(newBlock),
++ null, null, null, -1, updated.isCorrupt(), null);
++ lb.setBlockToken(updatedBlks[i].getBlockToken());
++ coordinator.getNewBlocks().offer(i, lb);
++ }
++ }
++ return newBlock;
++ }
++
++ private void updatePipeline(ExtendedBlock newBG) throws IOException {
++ final DatanodeInfo[] newNodes = new DatanodeInfo[numAllBlocks];
++ final String[] newStorageIDs = new String[numAllBlocks];
++ for (int i = 0; i < numAllBlocks; i++) {
++ final StripedDataStreamer streamer = getStripedDataStreamer(i);
++ final DatanodeInfo[] nodes = streamer.getNodes();
++ final String[] storageIDs = streamer.getStorageIDs();
++ if (streamer.isHealthy() && nodes != null && storageIDs != null) {
++ newNodes[i] = nodes[0];
++ newStorageIDs[i] = storageIDs[0];
++ } else {
++ newNodes[i] = new DatanodeInfo(DatanodeID.EMPTY_DATANODE_ID);
++ newStorageIDs[i] = "";
++ }
++ }
++ dfsClient.namenode.updatePipeline(dfsClient.clientName, currentBlockGroup,
++ newBG, newNodes, newStorageIDs);
++ currentBlockGroup = newBG;
++ }
++
++ private int stripeDataSize() {
++ return numDataBlocks * cellSize;
++ }
++
++ @Override
++ public void hflush() {
++ throw new UnsupportedOperationException();
++ }
++
++ @Override
++ public void hsync() {
++ throw new UnsupportedOperationException();
++ }
++
++ @Override
++ protected synchronized void start() {
++ for (StripedDataStreamer streamer : streamers) {
++ streamer.start();
++ }
++ }
++
++ @Override
++ synchronized void abort() throws IOException {
++ if (isClosed()) {
++ return;
++ }
++ for (StripedDataStreamer streamer : streamers) {
++ streamer.getLastException().set(new IOException("Lease timeout of "
++ + (dfsClient.getConf().getHdfsTimeout()/1000) +
++ " seconds expired."));
++ }
++ closeThreads(true);
++ dfsClient.endFileLease(fileId);
++ }
++
++ @Override
++ boolean isClosed() {
++ if (closed) {
++ return true;
++ }
++ for(StripedDataStreamer s : streamers) {
++ if (!s.streamerClosed()) {
++ return false;
++ }
++ }
++ return true;
++ }
++
++ @Override
++ protected void closeThreads(boolean force) throws IOException {
++ final MultipleIOException.Builder b = new MultipleIOException.Builder();
++ try {
++ for (StripedDataStreamer streamer : streamers) {
++ try {
++ streamer.close(force);
++ streamer.join();
++ streamer.closeSocket();
++ } catch (Exception e) {
++ try {
++ handleStreamerFailure("force=" + force, e);
++ } catch (IOException ioe) {
++ b.add(ioe);
++ }
++ } finally {
++ streamer.setSocketToNull();
++ }
++ }
++ } finally {
++ setClosed();
++ }
++ final IOException ioe = b.build();
++ if (ioe != null) {
++ throw ioe;
++ }
++ }
++
++ private boolean generateParityCellsForLastStripe() {
++ final long currentBlockGroupBytes = currentBlockGroup == null ?
++ 0 : currentBlockGroup.getNumBytes();
++ final long lastStripeSize = currentBlockGroupBytes % stripeDataSize();
++ if (lastStripeSize == 0) {
++ return false;
++ }
++
++ final long parityCellSize = lastStripeSize < cellSize?
++ lastStripeSize : cellSize;
++ final ByteBuffer[] buffers = cellBuffers.getBuffers();
++
++ for (int i = 0; i < numAllBlocks; i++) {
++ // Pad zero bytes to make all cells exactly the size of parityCellSize
++ // If internal block is smaller than parity block, pad zero bytes.
++ // Also pad zero bytes to all parity cells
++ final int position = buffers[i].position();
++ assert position <= parityCellSize : "If an internal block is smaller" +
++ " than parity block, then its last cell should be small than last" +
++ " parity cell";
++ for (int j = 0; j < parityCellSize - position; j++) {
++ buffers[i].put((byte) 0);
++ }
++ buffers[i].flip();
++ }
++ return true;
++ }
++
++ void writeParityCells() throws IOException {
++ final ByteBuffer[] buffers = cellBuffers.getBuffers();
++ //encode the data cells
++ encode(encoder, numDataBlocks, buffers);
++ for (int i = numDataBlocks; i < numAllBlocks; i++) {
++ writeParity(i, buffers[i], cellBuffers.getChecksumArray(i));
++ }
++ cellBuffers.clear();
++ }
++
++ void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf)
++ throws IOException {
++ final StripedDataStreamer current = setCurrentStreamer(index);
++ final int len = buffer.limit();
++
++ final long oldBytes = current.getBytesCurBlock();
++ if (current.isHealthy()) {
++ try {
++ DataChecksum sum = getDataChecksum();
++ sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
++ for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
++ int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
++ int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
++ super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset,
++ getChecksumSize());
++ }
++ } catch(Exception e) {
++ handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
++ }
++ }
++ }
++
++ @Override
++ void setClosed() {
++ super.setClosed();
++ for (int i = 0; i < numAllBlocks; i++) {
++ getStripedDataStreamer(i).release();
++ }
++ cellBuffers.release();
++ }
++
++ @Override
++ protected synchronized void closeImpl() throws IOException {
++ if (isClosed()) {
++ final MultipleIOException.Builder b = new MultipleIOException.Builder();
++ for(int i = 0; i < streamers.size(); i++) {
++ final StripedDataStreamer si = getStripedDataStreamer(i);
++ try {
++ si.getLastException().check(true);
++ } catch (IOException e) {
++ b.add(e);
++ }
++ }
++ final IOException ioe = b.build();
++ if (ioe != null) {
++ throw ioe;
++ }
++ return;
++ }
++
++ try {
++ // flush from all upper layers
++ flushBuffer();
++ // if the last stripe is incomplete, generate and write parity cells
++ if (generateParityCellsForLastStripe()) {
++ writeParityCells();
++ }
++ enqueueAllCurrentPackets();
++
++ // flush all the data packets
++ flushAllInternals();
++ // check failures
++ checkStreamerFailures();
++
++ for (int i = 0; i < numAllBlocks; i++) {
++ final StripedDataStreamer s = setCurrentStreamer(i);
++ if (s.isHealthy()) {
++ try {
++ if (s.getBytesCurBlock() > 0) {
++ setCurrentPacketToEmpty();
++ }
++ // flush the last "close" packet to Datanode
++ flushInternal();
++ } catch(Exception e) {
++ // TODO for both close and endBlock, we currently do not handle
++ // failures when sending the last packet. We actually do not need to
++ // bump GS for this kind of failure. Thus counting the total number
++ // of failures may be good enough.
++ }
++ }
++ }
++
++ closeThreads(false);
++ TraceScope scope = dfsClient.getTracer().newScope("completeFile");
++ try {
++ completeFile(currentBlockGroup);
++ } finally {
++ scope.close();
++ }
++ dfsClient.endFileLease(fileId);
++ } catch (ClosedChannelException ignored) {
++ } finally {
++ setClosed();
++ }
++ }
++
++ private void enqueueAllCurrentPackets() throws IOException {
++ int idx = streamers.indexOf(getCurrentStreamer());
++ for(int i = 0; i < streamers.size(); i++) {
++ final StripedDataStreamer si = setCurrentStreamer(i);
++ if (si.isHealthy() && currentPacket != null) {
++ try {
++ enqueueCurrentPacket();
++ } catch (IOException e) {
++ handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e);
++ }
++ }
++ }
++ setCurrentStreamer(idx);
++ }
++
++ void flushAllInternals() throws IOException {
++ int current = getCurrentIndex();
++
++ for (int i = 0; i < numAllBlocks; i++) {
++ final StripedDataStreamer s = setCurrentStreamer(i);
++ if (s.isHealthy()) {
++ try {
++ // flush all data to Datanode
++ flushInternal();
++ } catch(Exception e) {
++ handleStreamerFailure("flushInternal " + s, e);
++ }
++ }
++ }
++ setCurrentStreamer(current);
++ }
++
++ static void sleep(long ms, String op) throws InterruptedIOException {
++ try {
++ Thread.sleep(ms);
++ } catch(InterruptedException ie) {
++ throw DFSUtilClient.toInterruptedIOException(
++ "Sleep interrupted during " + op, ie);
++ }
++ }
++
++ @Override
++ ExtendedBlock getBlock() {
++ return currentBlockGroup;
++ }
++}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 359886e,e275afb..f96ae65
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@@ -53,6 -54,6 +54,7 @@@ import org.slf4j.LoggerFactory
import javax.net.SocketFactory;
import java.io.IOException;
++import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@@ -628,4 -652,4 +653,11 @@@ public class DFSUtilClient
return URI.create(HdfsConstants.HDFS_URI_SCHEME + "://"
+ namenode.getHostName() + portString);
}
++
++ public static InterruptedIOException toInterruptedIOException(String message,
++ InterruptedException e) {
++ final InterruptedIOException iioe = new InterruptedIOException(message);
++ iioe.initCause(e);
++ return iioe;
++ }
}