You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/30 20:22:56 UTC

[22/52] [abbrv] hadoop git commit: Merge remote-tracking branch 'apache/trunk' into HDFS-7285

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 7a7cd24,0000000..dabae2c
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@@ -1,1013 -1,0 +1,1014 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hdfs.server.datanode.erasurecode;
 +
 +import java.io.BufferedOutputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.OutputStream;
 +import java.net.InetSocketAddress;
 +import java.net.Socket;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.BitSet;
 +import java.util.Collection;
 +import java.util.EnumSet;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.CompletionService;
 +import java.util.concurrent.ExecutorCompletionService;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.LinkedBlockingQueue;
 +import java.util.concurrent.SynchronousQueue;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.StorageType;
 +import org.apache.hadoop.hdfs.BlockReader;
 +import org.apache.hadoop.hdfs.DFSConfigKeys;
 +import org.apache.hadoop.hdfs.DFSPacket;
 +import org.apache.hadoop.hdfs.DFSUtil;
++import org.apache.hadoop.hdfs.DFSUtilClient;
 +import org.apache.hadoop.hdfs.RemoteBlockReader2;
 +import org.apache.hadoop.hdfs.net.Peer;
 +import org.apache.hadoop.hdfs.net.TcpPeerServer;
 +import org.apache.hadoop.hdfs.protocol.DatanodeID;
 +import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 +import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 +import org.apache.hadoop.hdfs.server.datanode.DataNode;
 +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
 +import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 +import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
 +import org.apache.hadoop.io.IOUtils;
 +import org.apache.hadoop.io.erasurecode.CodecUtil;
 +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 +import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
 +import org.apache.hadoop.net.NetUtils;
 +import org.apache.hadoop.security.token.Token;
 +import org.apache.hadoop.util.Daemon;
 +import org.apache.hadoop.util.DataChecksum;
 +
 +import com.google.common.base.Preconditions;
 +
 +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode;
 +
 +/**
 + * ErasureCodingWorker handles the erasure coding recovery work commands. These
 + * commands would be issued from Namenode as part of Datanode's heart beat
 + * response. BPOfferService delegates the work to this class for handling EC
 + * commands.
 + */
 +public final class ErasureCodingWorker {
 +  private static final Log LOG = DataNode.LOG;
 +  
 +  private final DataNode datanode; 
 +  private final Configuration conf;
 +
 +  private ThreadPoolExecutor STRIPED_BLK_RECOVERY_THREAD_POOL;
 +  private ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
 +  private final int STRIPED_READ_TIMEOUT_MILLIS;
 +  private final int STRIPED_READ_BUFFER_SIZE;
 +
 +  public ErasureCodingWorker(Configuration conf, DataNode datanode) {
 +    this.datanode = datanode;
 +    this.conf = conf;
 +
 +    STRIPED_READ_TIMEOUT_MILLIS = conf.getInt(
 +        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_KEY,
 +        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
 +    initializeStripedReadThreadPool(conf.getInt(
 +        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_KEY, 
 +        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT));
 +    STRIPED_READ_BUFFER_SIZE = conf.getInt(
 +        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
 +        DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT);
 +
 +    initializeStripedBlkRecoveryThreadPool(conf.getInt(
 +        DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY,
 +        DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT));
 +  }
 +  
 +  private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
 +    return CodecUtil.createRSRawDecoder(conf, numDataUnits, numParityUnits);
 +  }
 +
 +  private void initializeStripedReadThreadPool(int num) {
 +    if (LOG.isDebugEnabled()) {
 +      LOG.debug("Using striped reads; pool threads=" + num);
 +    }
 +    STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
 +        TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
 +        new Daemon.DaemonFactory() {
 +      private final AtomicInteger threadIndex = new AtomicInteger(0);
 +
 +      @Override
 +      public Thread newThread(Runnable r) {
 +        Thread t = super.newThread(r);
 +        t.setName("stripedRead-" + threadIndex.getAndIncrement());
 +        return t;
 +      }
 +    }, new ThreadPoolExecutor.CallerRunsPolicy() {
 +      @Override
 +      public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
 +        LOG.info("Execution for striped reading rejected, "
 +            + "Executing in current thread");
 +        // will run in the current thread
 +        super.rejectedExecution(runnable, e);
 +      }
 +    });
 +    STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
 +  }
 +
 +  private void initializeStripedBlkRecoveryThreadPool(int num) {
 +    if (LOG.isDebugEnabled()) {
 +      LOG.debug("Using striped block recovery; pool threads=" + num);
 +    }
 +    STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60,
 +        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
 +        new Daemon.DaemonFactory() {
 +          private final AtomicInteger threadIdx = new AtomicInteger(0);
 +
 +          @Override
 +          public Thread newThread(Runnable r) {
 +            Thread t = super.newThread(r);
 +            t.setName("stripedBlockRecovery-" + threadIdx.getAndIncrement());
 +            return t;
 +          }
 +        });
 +    STRIPED_BLK_RECOVERY_THREAD_POOL.allowCoreThreadTimeOut(true);
 +  }
 +
 +  /**
 +   * Handles the Erasure Coding recovery work commands.
 +   * 
 +   * @param ecTasks
 +   *          BlockECRecoveryInfo
 +   */
 +  public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
 +    for (BlockECRecoveryInfo recoveryInfo : ecTasks) {
 +      try {
 +        STRIPED_BLK_RECOVERY_THREAD_POOL
 +            .submit(new ReconstructAndTransferBlock(recoveryInfo));
 +      } catch (Throwable e) {
 +        LOG.warn("Failed to recover striped block "
 +            + recoveryInfo.getExtendedBlock().getLocalBlock(), e);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * ReconstructAndTransferBlock recover one or more missed striped block in the
 +   * striped block group, the minimum number of live striped blocks should be
 +   * no less than data block number.
 +   * 
 +   * | <- Striped Block Group -> |
 +   *  blk_0      blk_1       blk_2(*)   blk_3   ...   <- A striped block group
 +   *    |          |           |          |  
 +   *    v          v           v          v 
 +   * +------+   +------+   +------+   +------+
 +   * |cell_0|   |cell_1|   |cell_2|   |cell_3|  ...    
 +   * +------+   +------+   +------+   +------+     
 +   * |cell_4|   |cell_5|   |cell_6|   |cell_7|  ...
 +   * +------+   +------+   +------+   +------+
 +   * |cell_8|   |cell_9|   |cell10|   |cell11|  ...
 +   * +------+   +------+   +------+   +------+
 +   *  ...         ...       ...         ...
 +   *  
 +   * 
 +   * We use following steps to recover striped block group, in each round, we
 +   * recover <code>bufferSize</code> data until finish, the 
 +   * <code>bufferSize</code> is configurable and may be less or larger than 
 +   * cell size:
 +   * step1: read <code>bufferSize</code> data from minimum number of sources 
 +   *        required by recovery.
 +   * step2: decode data for targets.
 +   * step3: transfer data to targets.
 +   * 
 +   * In step1, try to read <code>bufferSize</code> data from minimum number
 +   * of sources , if there is corrupt or stale sources, read from new source
 +   * will be scheduled. The best sources are remembered for next round and 
 +   * may be updated in each round.
 +   * 
 +   * In step2, typically if source blocks we read are all data blocks, we 
 +   * need to call encode, and if there is one parity block, we need to call
 +   * decode. Notice we only read once and recover all missed striped block 
 +   * if they are more than one.
 +   * 
 +   * In step3, send the recovered data to targets by constructing packet 
 +   * and send them directly. Same as continuous block replication, we 
 +   * don't check the packet ack. Since the datanode doing the recovery work
 +   * are one of the source datanodes, so the recovered data are sent 
 +   * remotely.
 +   * 
 +   * There are some points we can do further improvements in next phase:
 +   * 1. we can read the block file directly on the local datanode, 
 +   *    currently we use remote block reader. (Notice short-circuit is not
 +   *    a good choice, see inline comments).
 +   * 2. We need to check the packet ack for EC recovery? Since EC recovery
 +   *    is more expensive than continuous block replication, it needs to 
 +   *    read from several other datanodes, should we make sure the 
 +   *    recovered result received by targets? 
 +   */
 +  private class ReconstructAndTransferBlock implements Runnable {
 +    private final int dataBlkNum;
 +    private final int parityBlkNum;
 +    private final int cellSize;
 +    
 +    private RawErasureDecoder decoder;
 +
 +    // Striped read buffer size
 +    private int bufferSize;
 +
 +    private final ExtendedBlock blockGroup;
 +    private final int minRequiredSources;
 +    // position in striped internal block
 +    private long positionInBlock;
 +
 +    // sources
 +    private final short[] liveIndices;
 +    private final DatanodeInfo[] sources;
 +
 +    private final List<StripedReader> stripedReaders;
 +
 +    // The buffers and indices for striped blocks whose length is 0
 +    private ByteBuffer[] zeroStripeBuffers;
 +    private short[] zeroStripeIndices;
 +
 +    // targets
 +    private final DatanodeInfo[] targets;
 +    private final StorageType[] targetStorageTypes;
 +
 +    private final short[] targetIndices;
 +    private final ByteBuffer[] targetBuffers;
 +
 +    private final Socket[] targetSockets;
 +    private final DataOutputStream[] targetOutputStreams;
 +    private final DataInputStream[] targetInputStreams;
 +
 +    private final long[] blockOffset4Targets;
 +    private final long[] seqNo4Targets;
 +
 +    private final static int WRITE_PACKET_SIZE = 64 * 1024;
 +    private DataChecksum checksum;
 +    private int maxChunksPerPacket;
 +    private byte[] packetBuf;
 +    private byte[] checksumBuf;
 +    private int bytesPerChecksum;
 +    private int checksumSize;
 +
 +    private final CachingStrategy cachingStrategy;
 +
 +    private final Map<Future<Void>, Integer> futures = new HashMap<>();
 +    private final CompletionService<Void> readService =
 +        new ExecutorCompletionService<>(STRIPED_READ_THREAD_POOL);
 +
 +    ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) {
 +      ErasureCodingPolicy ecPolicy = recoveryInfo.getErasureCodingPolicy();
 +      dataBlkNum = ecPolicy.getNumDataUnits();
 +      parityBlkNum = ecPolicy.getNumParityUnits();
 +      cellSize = ecPolicy.getCellSize();
 +
 +      blockGroup = recoveryInfo.getExtendedBlock();
 +      final int cellsNum = (int)((blockGroup.getNumBytes() - 1) / cellSize + 1);
 +      minRequiredSources = Math.min(cellsNum, dataBlkNum);
 +
 +      liveIndices = recoveryInfo.getLiveBlockIndices();
 +      sources = recoveryInfo.getSourceDnInfos();
 +      stripedReaders = new ArrayList<>(sources.length);
 +
 +      Preconditions.checkArgument(liveIndices.length >= minRequiredSources,
 +          "No enough live striped blocks.");
 +      Preconditions.checkArgument(liveIndices.length == sources.length,
 +          "liveBlockIndices and source dns should match");
 +
 +      if (minRequiredSources < dataBlkNum) {
 +        zeroStripeBuffers = 
 +            new ByteBuffer[dataBlkNum - minRequiredSources];
 +        zeroStripeIndices = new short[dataBlkNum - minRequiredSources];
 +      }
 +
 +      targets = recoveryInfo.getTargetDnInfos();
 +      targetStorageTypes = recoveryInfo.getTargetStorageTypes();
 +      targetIndices = new short[targets.length];
 +      targetBuffers = new ByteBuffer[targets.length];
 +
 +      Preconditions.checkArgument(targetIndices.length <= parityBlkNum,
 +          "Too much missed striped blocks.");
 +
 +      targetSockets = new Socket[targets.length];
 +      targetOutputStreams = new DataOutputStream[targets.length];
 +      targetInputStreams = new DataInputStream[targets.length];
 +
 +      blockOffset4Targets = new long[targets.length];
 +      seqNo4Targets = new long[targets.length];
 +
 +      for (int i = 0; i < targets.length; i++) {
 +        blockOffset4Targets[i] = 0;
 +        seqNo4Targets[i] = 0;
 +      }
 +
 +      getTargetIndices();
 +      cachingStrategy = CachingStrategy.newDefaultStrategy();
 +    }
 +
 +    private ByteBuffer allocateBuffer(int length) {
 +      return ByteBuffer.allocate(length);
 +    }
 +
 +    private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) {
 +      return StripedBlockUtil.constructInternalBlock(blockGroup, cellSize,
 +          dataBlkNum, i);
 +    }
 +
 +    private long getBlockLen(ExtendedBlock blockGroup, int i) { 
 +      return StripedBlockUtil.getInternalBlockLength(blockGroup.getNumBytes(),
 +          cellSize, dataBlkNum, i);
 +    }
 +
 +    /**
 +     * StripedReader is used to read from one source DN, it contains a block
 +     * reader, buffer and striped block index.
 +     * Only allocate StripedReader once for one source, and the StripedReader
 +     * has the same array order with sources. Typically we only need to allocate
 +     * minimum number (minRequiredSources) of StripedReader, and allocate
 +     * new for new source DN if some existing DN invalid or slow.
 +     * If some source DN is corrupt, set the corresponding blockReader to 
 +     * null and will never read from it again.
 +     *  
 +     * @param i the array index of sources
 +     * @param offsetInBlock offset for the internal block
 +     * @return StripedReader
 +     */
 +    private StripedReader addStripedReader(int i, long offsetInBlock) {
 +      StripedReader reader = new StripedReader(liveIndices[i]);
 +      stripedReaders.add(reader);
 +
 +      BlockReader blockReader = newBlockReader(
 +          getBlock(blockGroup, liveIndices[i]), offsetInBlock, sources[i]);
 +      if (blockReader != null) {
 +        initChecksumAndBufferSizeIfNeeded(blockReader);
 +        reader.blockReader = blockReader;
 +      }
 +      reader.buffer = allocateBuffer(bufferSize);
 +      return reader;
 +    }
 +
 +    @Override
 +    public void run() {
 +      datanode.incrementXmitsInProgress();
 +      try {
 +        // Store the array indices of source DNs we have read successfully.
 +        // In each iteration of read, the success list may be updated if
 +        // some source DN is corrupted or slow. And use the updated success
 +        // list of DNs for next iteration read.
 +        int[] success = new int[minRequiredSources];
 +
 +        int nsuccess = 0;
 +        for (int i = 0; 
 +            i < sources.length && nsuccess < minRequiredSources; i++) {
 +          StripedReader reader = addStripedReader(i, 0);
 +          if (reader.blockReader != null) {
 +            success[nsuccess++] = i;
 +          }
 +        }
 +
 +        if (nsuccess < minRequiredSources) {
 +          String error = "Can't find minimum sources required by "
 +              + "recovery, block id: " + blockGroup.getBlockId();
 +          throw new IOException(error);
 +        }
 +
 +        if (zeroStripeBuffers != null) {
 +          for (int i = 0; i < zeroStripeBuffers.length; i++) {
 +            zeroStripeBuffers[i] = allocateBuffer(bufferSize);
 +          }
 +        }
 +
 +        for (int i = 0; i < targets.length; i++) {
 +          targetBuffers[i] = allocateBuffer(bufferSize);
 +        }
 +
 +        checksumSize = checksum.getChecksumSize();
 +        int chunkSize = bytesPerChecksum + checksumSize;
 +        maxChunksPerPacket = Math.max(
 +            (WRITE_PACKET_SIZE - PacketHeader.PKT_MAX_HEADER_LEN)/chunkSize, 1);
 +        int maxPacketSize = chunkSize * maxChunksPerPacket 
 +            + PacketHeader.PKT_MAX_HEADER_LEN;
 +
 +        packetBuf = new byte[maxPacketSize];
 +        checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)];
 +
 +        // targetsStatus store whether some target is success, it will record
 +        // any failed target once, if some target failed (invalid DN or transfer
 +        // failed), will not transfer data to it any more.
 +        boolean[] targetsStatus = new boolean[targets.length];
 +        if (initTargetStreams(targetsStatus) == 0) {
 +          String error = "All targets are failed.";
 +          throw new IOException(error);
 +        }
 +
 +        long firstStripedBlockLength = getBlockLen(blockGroup, 0);
 +        while (positionInBlock < firstStripedBlockLength) {
 +          int toRead = Math.min(
 +              bufferSize, (int)(firstStripedBlockLength - positionInBlock));
 +          // step1: read from minimum source DNs required for reconstruction.
 +          //   The returned success list is the source DNs we do real read from
 +          success = readMinimumStripedData4Recovery(success);
 +
 +          // step2: decode to reconstruct targets
 +          long remaining = firstStripedBlockLength - positionInBlock;
 +          int toRecoverLen = remaining < bufferSize ? 
 +              (int)remaining : bufferSize;
 +          recoverTargets(success, targetsStatus, toRecoverLen);
 +
 +          // step3: transfer data
 +          if (transferData2Targets(targetsStatus) == 0) {
 +            String error = "Transfer failed for all targets.";
 +            throw new IOException(error);
 +          }
 +
 +          clearBuffers();
 +          positionInBlock += toRead;
 +        }
 +
 +        endTargetBlocks(targetsStatus);
 +
 +        // Currently we don't check the acks for packets, this is similar as
 +        // block replication.
 +      } catch (Throwable e) {
 +        LOG.warn("Failed to recover striped block: " + blockGroup, e);
 +      } finally {
 +        datanode.decrementXmitsInProgress();
 +        // close block readers
 +        for (StripedReader stripedReader : stripedReaders) {
 +          closeBlockReader(stripedReader.blockReader);
 +        }
 +        for (int i = 0; i < targets.length; i++) {
 +          IOUtils.closeStream(targetOutputStreams[i]);
 +          IOUtils.closeStream(targetInputStreams[i]);
 +          IOUtils.closeStream(targetSockets[i]);
 +        }
 +      }
 +    }
 +
 +    // init checksum from block reader
 +    private void initChecksumAndBufferSizeIfNeeded(BlockReader blockReader) {
 +      if (checksum == null) {
 +        checksum = blockReader.getDataChecksum();
 +        bytesPerChecksum = checksum.getBytesPerChecksum();
 +        // The bufferSize is flat to divide bytesPerChecksum
 +        int readBufferSize = STRIPED_READ_BUFFER_SIZE;
 +        bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum :
 +          readBufferSize - readBufferSize % bytesPerChecksum;
 +      } else {
 +        assert blockReader.getDataChecksum().equals(checksum);
 +      }
 +    }
 +
 +    private void getTargetIndices() {
 +      BitSet bitset = new BitSet(dataBlkNum + parityBlkNum);
 +      for (int i = 0; i < sources.length; i++) {
 +        bitset.set(liveIndices[i]);
 +      }
 +      int m = 0;
 +      int k = 0;
 +      for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
 +        if (!bitset.get(i)) {
 +          if (getBlockLen(blockGroup, i) > 0) {
 +            if (m < targets.length) {
 +              targetIndices[m++] = (short)i;
 +            }
 +          } else {
 +            zeroStripeIndices[k++] = (short)i;
 +          }
 +        }
 +      }
 +    }
 +
 +    private long getReadLength(int index) {
 +      long blockLen = getBlockLen(blockGroup, index);
 +      long remaining = blockLen - positionInBlock;
 +      return remaining > bufferSize ? bufferSize : remaining;
 +    }
 +
 +    /**
 +     * Read from minimum source DNs required for reconstruction in the iteration.
 +     * First try the success list which we think they are the best DNs
 +     * If source DN is corrupt or slow, try to read some other source DN, 
 +     * and will update the success list. 
 +     * 
 +     * Remember the updated success list and return it for following 
 +     * operations and next iteration read.
 +     * 
 +     * @param success the initial success list of source DNs we think best
 +     * @return updated success list of source DNs we do real read
 +     * @throws IOException
 +     */
 +    private int[] readMinimumStripedData4Recovery(final int[] success)
 +        throws IOException {
 +      int nsuccess = 0;
 +      int[] newSuccess = new int[minRequiredSources];
 +      BitSet used = new BitSet(sources.length);
 +      /*
 +       * Read from minimum source DNs required, the success list contains
 +       * source DNs which we think best.
 +       */
 +      for (int i = 0; i < minRequiredSources; i++) {
 +        StripedReader reader = stripedReaders.get(success[i]);
 +        if (getReadLength(liveIndices[success[i]]) > 0) {
 +          Callable<Void> readCallable = readFromBlock(
 +              reader.blockReader, reader.buffer);
 +          Future<Void> f = readService.submit(readCallable);
 +          futures.put(f, success[i]);
 +        } else {
 +          // If the read length is 0, we don't need to do real read
 +          reader.buffer.position(0);
 +          newSuccess[nsuccess++] = success[i];
 +        }
 +        used.set(success[i]);
 +      }
 +
 +      while (!futures.isEmpty()) {
 +        try {
 +          StripingChunkReadResult result =
 +              StripedBlockUtil.getNextCompletedStripedRead(
 +                  readService, futures, STRIPED_READ_TIMEOUT_MILLIS);
 +          int resultIndex = -1;
 +          if (result.state == StripingChunkReadResult.SUCCESSFUL) {
 +            resultIndex = result.index;
 +          } else if (result.state == StripingChunkReadResult.FAILED) {
 +            // If read failed for some source DN, we should not use it anymore 
 +            // and schedule read from another source DN.
 +            StripedReader failedReader = stripedReaders.get(result.index);
 +            closeBlockReader(failedReader.blockReader);
 +            failedReader.blockReader = null;
 +            resultIndex = scheduleNewRead(used);
 +          } else if (result.state == StripingChunkReadResult.TIMEOUT) {
 +            // If timeout, we also schedule a new read.
 +            resultIndex = scheduleNewRead(used);
 +          }
 +          if (resultIndex >= 0) {
 +            newSuccess[nsuccess++] = resultIndex;
 +            if (nsuccess >= minRequiredSources) {
 +              // cancel remaining reads if we read successfully from minimum
 +              // number of source DNs required by reconstruction.
 +              cancelReads(futures.keySet());
 +              futures.clear();
 +              break;
 +            }
 +          }
 +        } catch (InterruptedException e) {
 +          LOG.info("Read data interrupted.", e);
 +          break;
 +        }
 +      }
 +
 +      if (nsuccess < minRequiredSources) {
 +        String error = "Can't read data from minimum number of sources "
 +            + "required by reconstruction, block id: " + blockGroup.getBlockId();
 +        throw new IOException(error);
 +      }
 +
 +      return newSuccess;
 +    }
 +    
 +    private void paddingBufferToLen(ByteBuffer buffer, int len) {
 +      int toPadding = len - buffer.position();
 +      for (int i = 0; i < toPadding; i++) {
 +        buffer.put((byte) 0);
 +      }
 +    }
 +    
 +    // Initialize decoder
 +    private void initDecoderIfNecessary() {
 +      if (decoder == null) {
 +        decoder = newDecoder(dataBlkNum, parityBlkNum);
 +      }
 +    }
 +
 +    private int[] getErasedIndices(boolean[] targetsStatus) {
 +      int[] result = new int[targets.length];
 +      int m = 0;
 +      for (int i = 0; i < targets.length; i++) {
 +        if (targetsStatus[i]) {
 +          result[m++] = convertIndex4Decode(targetIndices[i], 
 +              dataBlkNum, parityBlkNum);
 +        }
 +      }
 +      return Arrays.copyOf(result, m);
 +    }
 +
 +    private void recoverTargets(int[] success, boolean[] targetsStatus,
 +        int toRecoverLen) {
 +      initDecoderIfNecessary();
 +      ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum];
 +      for (int i = 0; i < success.length; i++) {
 +        StripedReader reader = stripedReaders.get(success[i]);
 +        ByteBuffer buffer = reader.buffer;
 +        paddingBufferToLen(buffer, toRecoverLen);
 +        inputs[convertIndex4Decode(reader.index, dataBlkNum, parityBlkNum)] = 
 +            (ByteBuffer)buffer.flip();
 +      }
 +      if (success.length < dataBlkNum) {
 +        for (int i = 0; i < zeroStripeBuffers.length; i++) {
 +          ByteBuffer buffer = zeroStripeBuffers[i];
 +          paddingBufferToLen(buffer, toRecoverLen);
 +          int index = convertIndex4Decode(zeroStripeIndices[i], dataBlkNum,
 +              parityBlkNum);
 +          inputs[index] = (ByteBuffer)buffer.flip();
 +        }
 +      }
 +      int[] erasedIndices = getErasedIndices(targetsStatus);
 +      ByteBuffer[] outputs = new ByteBuffer[erasedIndices.length];
 +      int m = 0;
 +      for (int i = 0; i < targetBuffers.length; i++) {
 +        if (targetsStatus[i]) {
 +          outputs[m++] = targetBuffers[i];
 +          outputs[i].limit(toRecoverLen);
 +        }
 +      }
 +      decoder.decode(inputs, erasedIndices, outputs);
 +
 +      for (int i = 0; i < targets.length; i++) {
 +        if (targetsStatus[i]) {
 +          long blockLen = getBlockLen(blockGroup, targetIndices[i]);
 +          long remaining = blockLen - positionInBlock;
 +          if (remaining < 0) {
 +            targetBuffers[i].limit(0);
 +          } else if (remaining < toRecoverLen) {
 +            targetBuffers[i].limit((int)remaining);
 +          }
 +        }
 +      }
 +    }
 +
 +    /**
 +     * Schedule a read from some new source DN if some DN is corrupted
 +     * or slow, this is called from the read iteration.
 +     * Initially we may only have <code>minRequiredSources</code> number of 
 +     * StripedReader.
 +     * If the position is at the end of target block, don't need to do 
 +     * real read, and return the array index of source DN, otherwise -1.
 +     * 
 +     * @param used the used source DNs in this iteration.
 +     * @return the array index of source DN if don't need to do real read.
 +     */
 +    private int scheduleNewRead(BitSet used) {
 +      StripedReader reader = null;
 +      // step1: initially we may only have <code>minRequiredSources</code>
 +      // number of StripedReader, and there may be some source DNs we never 
 +      // read before, so will try to create StripedReader for one new source DN
 +      // and try to read from it. If found, go to step 3.
 +      int m = stripedReaders.size();
 +      while (reader == null && m < sources.length) {
 +        reader = addStripedReader(m, positionInBlock);
 +        if (getReadLength(liveIndices[m]) > 0) {
 +          if (reader.blockReader == null) {
 +            reader = null;
 +            m++;
 +          }
 +        } else {
 +          used.set(m);
 +          return m;
 +        }
 +      }
 +
 +      // step2: if there is no new source DN we can use, try to find a source 
 +      // DN we ever read from but because some reason, e.g., slow, it
 +      // is not in the success DN list at the begin of this iteration, so 
 +      // we have not tried it in this iteration. Now we have a chance to 
 +      // revisit it again.
 +      for (int i = 0; reader == null && i < stripedReaders.size(); i++) {
 +        if (!used.get(i)) {
 +          StripedReader r = stripedReaders.get(i);
 +          if (getReadLength(liveIndices[i]) > 0) {
 +            closeBlockReader(r.blockReader);
 +            r.blockReader = newBlockReader(
 +                getBlock(blockGroup, liveIndices[i]), positionInBlock,
 +                sources[i]);
 +            if (r.blockReader != null) {
 +              m = i;
 +              reader = r;
 +            }
 +          } else {
 +            used.set(i);
 +            r.buffer.position(0);
 +            return i;
 +          }
 +        }
 +      }
 +
 +      // step3: schedule if find a correct source DN and need to do real read.
 +      if (reader != null) {
 +        Callable<Void> readCallable = readFromBlock(
 +            reader.blockReader, reader.buffer);
 +        Future<Void> f = readService.submit(readCallable);
 +        futures.put(f, m);
 +        used.set(m);
 +      }
 +
 +      return -1;
 +    }
 +
 +    // cancel all reads.
 +    private void cancelReads(Collection<Future<Void>> futures) {
 +      for (Future<Void> future : futures) {
 +        future.cancel(true);
 +      }
 +    }
 +
 +    private Callable<Void> readFromBlock(final BlockReader reader,
 +        final ByteBuffer buf) {
 +      return new Callable<Void>() {
 +
 +        @Override
 +        public Void call() throws Exception {
 +          try {
 +            actualReadFromBlock(reader, buf);
 +            return null;
 +          } catch (IOException e) {
 +            LOG.info(e.getMessage());
 +            throw e;
 +          }
 +        }
 +
 +      };
 +    }
 +
 +    /**
 +     * Read bytes from block
 +     */
 +    private void actualReadFromBlock(BlockReader reader, ByteBuffer buf)
 +        throws IOException {
 +      int len = buf.remaining();
 +      int n = 0;
 +      while (n < len) {
 +        int nread = reader.read(buf);
 +        if (nread <= 0) {
 +          break;
 +        }
 +        n += nread;
 +      }
 +    }
 +
 +    // close block reader
 +    private void closeBlockReader(BlockReader blockReader) {
 +      try {
 +        if (blockReader != null) {
 +          blockReader.close();
 +        }
 +      } catch (IOException e) {
 +        // ignore
 +      }
 +    }
 +
 +    private InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) {
 +      return NetUtils.createSocketAddr(dnInfo.getXferAddr(
 +          datanode.getDnConf().getConnectToDnViaHostname()));
 +    }
 +
 +    private BlockReader newBlockReader(final ExtendedBlock block, 
 +        long offsetInBlock, DatanodeInfo dnInfo) {
 +      if (offsetInBlock >= block.getNumBytes()) {
 +        return null;
 +      }
 +      try {
 +        InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo);
 +        Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
 +            block, EnumSet.of(BlockTokenIdentifier.AccessMode.READ));
 +        /*
 +         * This can be further improved if the replica is local, then we can
 +         * read directly from DN and need to check the replica is FINALIZED
 +         * state, notice we should not use short-circuit local read which
 +         * requires config for domain-socket in UNIX or legacy config in Windows.
 +         */
 +        return RemoteBlockReader2.newBlockReader(
 +            "dummy", block, blockToken, offsetInBlock, 
 +            block.getNumBytes() - offsetInBlock, true,
 +            "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo,
 +            null, cachingStrategy);
 +      } catch (IOException e) {
 +        return null;
 +      }
 +    }
 +
 +    private Peer newConnectedPeer(ExtendedBlock b, InetSocketAddress addr,
 +        Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
 +        throws IOException {
 +      Peer peer = null;
 +      boolean success = false;
 +      Socket sock = null;
 +      final int socketTimeout = datanode.getDnConf().getSocketTimeout(); 
 +      try {
 +        sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
 +        NetUtils.connect(sock, addr, socketTimeout);
-         peer = TcpPeerServer.peerFromSocketAndKey(datanode.getSaslClient(), 
++        peer = DFSUtilClient.peerFromSocketAndKey(datanode.getSaslClient(),
 +            sock, datanode.getDataEncryptionKeyFactoryForBlock(b),
 +            blockToken, datanodeId);
 +        peer.setReadTimeout(socketTimeout);
 +        success = true;
 +        return peer;
 +      } finally {
 +        if (!success) {
 +          IOUtils.cleanup(LOG, peer);
 +          IOUtils.closeSocket(sock);
 +        }
 +      }
 +    }
 +
 +    /**
 +     * Send data to targets
 +     */
 +    private int transferData2Targets(boolean[] targetsStatus) {
 +      int nsuccess = 0;
 +      for (int i = 0; i < targets.length; i++) {
 +        if (targetsStatus[i]) {
 +          boolean success = false;
 +          try {
 +            ByteBuffer buffer = targetBuffers[i];
 +            
 +            if (buffer.remaining() == 0) {
 +              continue;
 +            }
 +
 +            checksum.calculateChunkedSums(
 +                buffer.array(), 0, buffer.remaining(), checksumBuf, 0);
 +
 +            int ckOff = 0;
 +            while (buffer.remaining() > 0) {
 +              DFSPacket packet = new DFSPacket(packetBuf, maxChunksPerPacket,
 +                  blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, false);
 +              int maxBytesToPacket = maxChunksPerPacket * bytesPerChecksum;
 +              int toWrite = buffer.remaining() > maxBytesToPacket ?
 +                  maxBytesToPacket : buffer.remaining();
 +              int ckLen = ((toWrite - 1) / bytesPerChecksum + 1) * checksumSize;
 +              packet.writeChecksum(checksumBuf, ckOff, ckLen);
 +              ckOff += ckLen;
 +              packet.writeData(buffer, toWrite);
 +
 +              // Send packet
 +              packet.writeTo(targetOutputStreams[i]);
 +
 +              blockOffset4Targets[i] += toWrite;
 +              nsuccess++;
 +              success = true;
 +            }
 +          } catch (IOException e) {
 +            LOG.warn(e.getMessage());
 +          }
 +          targetsStatus[i] = success;
 +        }
 +      }
 +      return nsuccess;
 +    }
 +
 +    /**
 +     * clear all buffers
 +     */
 +    private void clearBuffers() {
 +      for (StripedReader stripedReader : stripedReaders) {
 +        if (stripedReader.buffer != null) {
 +          stripedReader.buffer.clear();
 +        }
 +      }
 +
 +      if (zeroStripeBuffers != null) {
 +        for (int i = 0; i < zeroStripeBuffers.length; i++) {
 +          zeroStripeBuffers[i].clear();
 +        }
 +      }
 +
 +      for (int i = 0; i < targetBuffers.length; i++) {
 +        if (targetBuffers[i] != null) {
 +          cleanBuffer(targetBuffers[i]);
 +        }
 +      }
 +    }
 +    
 +    private ByteBuffer cleanBuffer(ByteBuffer buffer) {
 +      Arrays.fill(buffer.array(), (byte) 0);
 +      return (ByteBuffer)buffer.clear();
 +    }
 +
 +    // send an empty packet to mark the end of the block
 +    private void endTargetBlocks(boolean[] targetsStatus) {
 +      for (int i = 0; i < targets.length; i++) {
 +        if (targetsStatus[i]) {
 +          try {
 +            DFSPacket packet = new DFSPacket(packetBuf, 0, 
 +                blockOffset4Targets[i], seqNo4Targets[i]++, checksumSize, true);
 +            packet.writeTo(targetOutputStreams[i]);
 +            targetOutputStreams[i].flush();
 +          } catch (IOException e) {
 +            LOG.warn(e.getMessage());
 +          }
 +        }
 +      }
 +    }
 +
 +    /**
 +     * Initialize  output/input streams for transferring data to target
 +     * and send create block request. 
 +     */
 +    private int initTargetStreams(boolean[] targetsStatus) {
 +      int nsuccess = 0;
 +      for (int i = 0; i < targets.length; i++) {
 +        Socket socket = null;
 +        DataOutputStream out = null;
 +        DataInputStream in = null;
 +        boolean success = false;
 +        try {
 +          InetSocketAddress targetAddr = 
 +              getSocketAddress4Transfer(targets[i]);
 +          socket = datanode.newSocket();
 +          NetUtils.connect(socket, targetAddr, 
 +              datanode.getDnConf().getSocketTimeout());
 +          socket.setSoTimeout(datanode.getDnConf().getSocketTimeout());
 +
 +          ExtendedBlock block = getBlock(blockGroup, targetIndices[i]);
 +          Token<BlockTokenIdentifier> blockToken = 
 +              datanode.getBlockAccessToken(block,
 +                  EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
 +
 +          long writeTimeout = datanode.getDnConf().getSocketWriteTimeout();
 +          OutputStream unbufOut = NetUtils.getOutputStream(socket, writeTimeout);
 +          InputStream unbufIn = NetUtils.getInputStream(socket);
 +          DataEncryptionKeyFactory keyFactory =
 +            datanode.getDataEncryptionKeyFactoryForBlock(block);
 +          IOStreamPair saslStreams = datanode.getSaslClient().socketSend(
 +              socket, unbufOut, unbufIn, keyFactory, blockToken, targets[i]);
 +
 +          unbufOut = saslStreams.out;
 +          unbufIn = saslStreams.in;
 +
 +          out = new DataOutputStream(new BufferedOutputStream(unbufOut,
 +              DFSUtil.getSmallBufferSize(conf)));
 +          in = new DataInputStream(unbufIn);
 +
 +          DatanodeInfo source = new DatanodeInfo(datanode.getDatanodeId());
 +          new Sender(out).writeBlock(block, targetStorageTypes[i], 
 +              blockToken, "", new DatanodeInfo[]{targets[i]}, 
 +              new StorageType[]{targetStorageTypes[i]}, source, 
 +              BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, 0, 
 +              checksum, cachingStrategy, false, false, null);
 +
 +          targetSockets[i] = socket;
 +          targetOutputStreams[i] = out;
 +          targetInputStreams[i] = in;
 +          nsuccess++;
 +          success = true;
 +        } catch (Throwable e) {
 +          LOG.warn(e.getMessage());
 +        } finally {
 +          if (!success) {
 +            IOUtils.closeStream(out);
 +            IOUtils.closeStream(in);
 +            IOUtils.closeStream(socket);
 +          }
 +        }
 +        targetsStatus[i] = success;
 +      }
 +      return nsuccess;
 +    }
 +  }
 +
 +  private static class StripedReader {
 +    private final short index; // internal block index
 +    private BlockReader blockReader;
 +    private ByteBuffer buffer;
 +
 +    private StripedReader(short index) {
 +      this.index = index;
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index a115138,0ae739c..34b28e4
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@@ -39,10 -39,8 +39,9 @@@ import org.apache.hadoop.fs.permission.
  import org.apache.hadoop.fs.StorageType;
  import org.apache.hadoop.fs.XAttr;
  import org.apache.hadoop.hdfs.protocol.Block;
 +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
  import org.apache.hadoop.hdfs.protocol.HdfsConstants;
  import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
- import org.apache.hadoop.hdfs.protocolPB.PBHelper;
  import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
  import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
  import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
@@@ -334,21 -331,13 +333,21 @@@ public final class FSImageFormatPBINod
        INodeSection.INodeFile f = n.getFile();
        List<BlockProto> bp = f.getBlocksList();
        short replication = (short) f.getReplication();
 +      boolean isStriped = f.getIsStriped();
        LoaderContext state = parent.getLoaderContext();
 +      ErasureCodingPolicy ecPolicy = ErasureCodingPolicyManager.getSystemDefaultPolicy();
  
        BlockInfo[] blocks = new BlockInfo[bp.size()];
 -      for (int i = 0, e = bp.size(); i < e; ++i) {
 -        blocks[i] =
 -            new BlockInfoContiguous(PBHelperClient.convert(bp.get(i)), replication);
 +      for (int i = 0; i < bp.size(); ++i) {
 +        BlockProto b = bp.get(i);
 +        if (isStriped) {
-           blocks[i] = new BlockInfoStriped(PBHelper.convert(b), ecPolicy);
++          blocks[i] = new BlockInfoStriped(PBHelperClient.convert(b), ecPolicy);
 +        } else {
-           blocks[i] = new BlockInfoContiguous(PBHelper.convert(b),
++          blocks[i] = new BlockInfoContiguous(PBHelperClient.convert(b),
 +              replication);
 +        }
        }
 +
        final PermissionStatus permissions = loadPermission(f.getPermission(),
            parent.getLoaderContext().getStringTable());
  
@@@ -654,11 -632,10 +653,11 @@@
      private void save(OutputStream out, INodeFile n) throws IOException {
        INodeSection.INodeFile.Builder b = buildINodeFile(n,
            parent.getSaverContext());
 +      BlockInfo[] blocks = n.getBlocks();
  
 -      if (n.getBlocks() != null) {
 +      if (blocks != null) {
          for (Block block : n.getBlocks()) {
-           b.addBlocks(PBHelper.convert(block));
+           b.addBlocks(PBHelperClient.convert(block));
          }
        }
  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index ed52ca4,75b6be9..b6b151c
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@@ -4714,26 -4654,8 +4713,8 @@@ public class FSNamesystem implements Na
        && safeMode.isOn();
    }
  
-   /**
-    * Check if replication queues are to be populated
-    * @return true when node is HAState.Active and not in the very first safemode
-    */
-   @Override
-   public boolean isPopulatingReplQueues() {
-     if (!shouldPopulateReplQueues()) {
-       return false;
-     }
-     return initializedReplQueues;
-   }
- 
-   private boolean shouldPopulateReplQueues() {
-     if(haContext == null || haContext.getState() == null)
-       return false;
-     return haContext.getState().shouldPopulateReplQueues();
-   }
- 
    @Override
 -  public void incrementSafeBlockCount(int replication) {
 +  public void incrementSafeBlockCount(int storageNum, BlockInfo storedBlock) {
      // safeMode is volatile, and may be set to null at any time
      SafeModeInfo safeMode = this.safeMode;
      if (safeMode == null)
@@@ -6233,11 -6150,11 +6222,16 @@@
      return cacheManager;
    }
  
 +  /** @return the ErasureCodingPolicyManager. */
 +  public ErasureCodingPolicyManager getErasureCodingPolicyManager() {
 +    return ecPolicyManager;
 +  }
 +
+   @Override
+   public HAContext getHAContext() {
+     return haContext;
+   }
+ 
    @Override  // NameNodeMXBean
    public String getCorruptFiles() {
      List<String> list = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 6f7b702,8565522..c765edc
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@@ -664,37 -608,12 +664,24 @@@ public class INodeFile extends INodeWit
      return counts;
    }
  
 +  /**
 +   * Compute quota of striped file. Note that currently EC files do not support
 +   * append/hflush/hsync, thus the file length recorded in snapshots should be
 +   * the same with the current file length.
 +   */
 +  public final QuotaCounts computeQuotaUsageWithStriped(
 +      BlockStoragePolicy bsp, QuotaCounts counts) {
 +    counts.addNameSpace(1);
 +    counts.add(storagespaceConsumed(bsp));
 +    return counts;
 +  }
 +
    @Override
    public final ContentSummaryComputationContext computeContentSummary(
-       final ContentSummaryComputationContext summary) {
+       int snapshotId, final ContentSummaryComputationContext summary) {
      final ContentCounts counts = summary.getCounts();
-     FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
-     final long fileLen;
-     if (sf == null) {
-       fileLen = computeFileSize();
-       counts.addContent(Content.FILE, 1);
-     } else {
-       final FileDiffList diffs = sf.getDiffs();
-       final int n = diffs.asList().size();
-       counts.addContent(Content.FILE, n);
-       if (n > 0 && sf.isCurrentFileDeleted()) {
-         fileLen =  diffs.getLast().getFileSize();
-       } else {
-         fileLen = computeFileSize();
-       }
-     }
+     counts.addContent(Content.FILE, 1);
+     final long fileLen = computeFileSize(snapshotId);
      counts.addContent(Content.LENGTH, fileLen);
      counts.addContent(Content.DISKSPACE, storagespaceConsumed(null)
          .getStorageSpace());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index e1702d9,5bc4033..b1012c2
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@@ -64,4 -52,5 +65,6 @@@ public interface Namesystem extends RwL
    boolean isInSnapshot(BlockInfo blockUC);
  
    CacheManager getCacheManager();
++
+   HAContext getHAContext();
 -}
 +}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
index 252844c,06a8219..98deed2
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
@@@ -39,15 -39,11 +39,12 @@@ public interface SafeMode 
     */
    public boolean isInStartupSafeMode();
  
-   /** Check whether replication queues are being populated. */
-   public boolean isPopulatingReplQueues();
-     
    /**
     * Increment number of blocks that reached minimal replication.
 -   * @param replication current replication 
 +   * @param replication current replication
 +   * @param storedBlock current stored Block
     */
 -  public void incrementSafeBlockCount(int replication);
 +  public void incrementSafeBlockCount(int replication, BlockInfo storedBlock);
  
    /** Decrement number of blocks that reached minimal replication. */
    public void decrementSafeBlockCount(BlockInfo b);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
index 450d981,cf21411..ae23783
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
@@@ -243,15 -242,13 +243,15 @@@ public class FSImageFormatPBSnapshot 
          FileDiff diff = new FileDiff(pbf.getSnapshotId(), copy, null,
              pbf.getFileSize());
          List<BlockProto> bpl = pbf.getBlocksList();
 +        // in file diff there can only be contiguous blocks
          BlockInfo[] blocks = new BlockInfo[bpl.size()];
          for(int j = 0, e = bpl.size(); j < e; ++j) {
-           Block blk = PBHelper.convert(bpl.get(j));
+           Block blk = PBHelperClient.convert(bpl.get(j));
            BlockInfo storedBlock = bm.getStoredBlock(blk);
            if(storedBlock == null) {
 -            storedBlock = bm.addBlockCollection(
 -                new BlockInfoContiguous(blk, copy.getFileReplication()), file);
 +            storedBlock = (BlockInfoContiguous) fsn.getBlockManager()
 +                .addBlockCollectionWithCheck(new BlockInfoContiguous(blk,
 +                    copy.getFileReplication()), file);
            }
            blocks[j] = storedBlock;
          }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java
index 6c06a8d,0000000..0499a2b
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java
@@@ -1,38 -1,0 +1,39 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * <p/>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p/>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.cli;
 +
 +import org.apache.hadoop.cli.util.CLICommandErasureCodingCli;
 +import org.apache.hadoop.cli.util.CLICommandTypes;
 +import org.apache.hadoop.cli.util.CLITestCmd;
 +import org.apache.hadoop.cli.util.CommandExecutor;
 +import org.apache.hadoop.cli.util.ErasureCodingCliCmdExecutor;
++import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.hdfs.tools.erasurecode.ECCli;
 +
 +public class CLITestCmdErasureCoding extends CLITestCmd {
 +  public CLITestCmdErasureCoding(String str, CLICommandTypes type) {
 +    super(str, type);
 +  }
 +
 +  @Override
-   public CommandExecutor getExecutor(String tag) throws IllegalArgumentException {
++  public CommandExecutor getExecutor(String tag, Configuration conf) throws IllegalArgumentException {
 +    if (getType() instanceof CLICommandErasureCodingCli)
 +      return new ErasureCodingCliCmdExecutor(tag, new ECCli());
-     return super.getExecutor(tag);
++    return super.getExecutor(tag, conf);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java
index dfefb66,0000000..29ec98e
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java
@@@ -1,115 -1,0 +1,115 @@@
 +
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.hadoop.cli;
 +
 +import org.apache.hadoop.cli.util.CLICommand;
 +import org.apache.hadoop.cli.util.CLICommandErasureCodingCli;
 +import org.apache.hadoop.cli.util.CommandExecutor.Result;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.hdfs.DFSConfigKeys;
 +import org.apache.hadoop.hdfs.MiniDFSCluster;
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.xml.sax.SAXException;
 +
 +public class TestErasureCodingCLI extends CLITestHelper {
 +  private final int NUM_OF_DATANODES = 3;
 +  private MiniDFSCluster dfsCluster = null;
 +  private FileSystem fs = null;
 +  private String namenode = null;
 +
 +  @Before
 +  @Override
 +  public void setUp() throws Exception {
 +    super.setUp();
 +
 +    dfsCluster = new MiniDFSCluster.Builder(conf)
 +        .numDataNodes(NUM_OF_DATANODES).build();
 +    dfsCluster.waitClusterUp();
 +    namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
 +
 +    username = System.getProperty("user.name");
 +
 +    fs = dfsCluster.getFileSystem();
 +  }
 +
 +  @Override
 +  protected String getTestFile() {
 +    return "testErasureCodingConf.xml";
 +  }
 +
 +  @After
 +  @Override
 +  public void tearDown() throws Exception {
 +    if (fs != null) {
 +      fs.close();
 +    }
 +    if (dfsCluster != null) {
 +      dfsCluster.shutdown();
 +    }
 +    Thread.sleep(2000);
 +    super.tearDown();
 +  }
 +
 +  @Override
 +  protected String expandCommand(final String cmd) {
 +    String expCmd = cmd;
 +    expCmd = expCmd.replaceAll("NAMENODE", namenode);
 +    expCmd = expCmd.replaceAll("#LF#", System.getProperty("line.separator"));
 +    expCmd = super.expandCommand(expCmd);
 +    return expCmd;
 +  }
 +
 +  @Override
 +  protected TestConfigFileParser getConfigParser() {
 +    return new TestErasureCodingAdmin();
 +  }
 +
 +  private class TestErasureCodingAdmin extends
 +      CLITestHelper.TestConfigFileParser {
 +    @Override
 +    public void endElement(String uri, String localName, String qName)
 +        throws SAXException {
 +      if (qName.equals("ec-admin-command")) {
 +        if (testCommands != null) {
 +          testCommands.add(new CLITestCmdErasureCoding(charString,
 +              new CLICommandErasureCodingCli()));
 +        } else if (cleanupCommands != null) {
 +          cleanupCommands.add(new CLITestCmdErasureCoding(charString,
 +              new CLICommandErasureCodingCli()));
 +        }
 +      } else {
 +        super.endElement(uri, localName, qName);
 +      }
 +    }
 +  }
 +
 +  @Override
 +  protected Result execute(CLICommand cmd) throws Exception {
-     return cmd.getExecutor(namenode).executeCommand(cmd.getCmd());
++    return cmd.getExecutor(namenode, conf).executeCommand(cmd.getCmd());
 +  }
 +
 +  @Test
 +  @Override
 +  public void testAll() {
 +    super.testAll();
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 3551055,a7e80ca..12d4811
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@@ -66,14 -66,9 +66,15 @@@ import java.util.Set
  import java.util.UUID;
  import java.util.concurrent.TimeoutException;
  import java.util.concurrent.atomic.AtomicBoolean;
 +import com.google.common.base.Charsets;
 +import com.google.common.base.Joiner;
 +import com.google.common.base.Preconditions;
 +import com.google.common.base.Supplier;
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.Maps;
  
  import org.apache.commons.io.FileUtils;
+ import org.apache.commons.lang.UnhandledException;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.conf.Configuration;
@@@ -141,10 -133,8 +142,11 @@@ import org.apache.hadoop.hdfs.server.na
  import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
  import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
  import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
  import org.apache.hadoop.hdfs.tools.DFSAdmin;
+ import org.apache.hadoop.hdfs.tools.JMXGet;
  import org.apache.hadoop.io.IOUtils;
  import org.apache.hadoop.io.nativeio.NativeIO;
  import org.apache.hadoop.net.NetUtils;
@@@ -1870,150 -1858,21 +1872,168 @@@ public class DFSTestUtil 
      }
    }
  
 +  public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
 +      Block block, BlockStatus blockStatus, DatanodeStorage storage) {
 +    ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1];
 +    receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, blockStatus, null);
 +    StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1];
 +    reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
 +    return reports;
 +  }
 +
 +  /**
 +   * Creates the metadata of a file in striped layout. This method only
 +   * manipulates the NameNode state without injecting data to DataNode.
 +   * You should disable periodical heartbeat before use this.
 +   *  @param file Path of the file to create
 +   * @param dir Parent path of the file
 +   * @param numBlocks Number of striped block groups to add to the file
 +   * @param numStripesPerBlk Number of striped cells in each block
 +   * @param toMkdir
 +   */
 +  public static void createStripedFile(MiniDFSCluster cluster, Path file, Path dir,
 +      int numBlocks, int numStripesPerBlk, boolean toMkdir) throws Exception {
 +    DistributedFileSystem dfs = cluster.getFileSystem();
 +    // If outer test already set EC policy, dir should be left as null
 +    if (toMkdir) {
 +      assert dir != null;
 +      dfs.mkdirs(dir);
 +      try {
 +        dfs.getClient().setErasureCodingPolicy(dir.toString(), null);
 +      } catch (IOException e) {
 +        if (!e.getMessage().contains("non-empty directory")) {
 +          throw e;
 +        }
 +      }
 +    }
 +
 +    FSDataOutputStream out = null;
 +    try {
 +      out = dfs.create(file, (short) 1); // create an empty file
 +
 +      FSNamesystem ns = cluster.getNamesystem();
 +      FSDirectory fsdir = ns.getFSDirectory();
 +      INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
 +
 +      ExtendedBlock previous = null;
 +      for (int i = 0; i < numBlocks; i++) {
 +        Block newBlock = addStripedBlockToFile(cluster.getDataNodes(), dfs, ns,
 +            file.toString(), fileNode, dfs.getClient().getClientName(),
 +            previous, numStripesPerBlk);
 +        previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
 +      }
 +
 +      dfs.getClient().namenode.complete(file.toString(),
 +          dfs.getClient().getClientName(), previous, fileNode.getId());
 +    } finally {
 +      IOUtils.cleanup(null, out);
 +    }
 +  }
 +
 +  /**
 +   * Adds a striped block group to a file. This method only manipulates NameNode
 +   * states of the file and the block without injecting data to DataNode.
 +   * It does mimic block reports.
 +   * You should disable periodical heartbeat before use this.
 +   * @param dataNodes List DataNodes to host the striped block group
 +   * @param previous Previous block in the file
 +   * @param numStripes Number of stripes in each block group
 +   * @return The added block group
 +   */
 +  public static Block addStripedBlockToFile(List<DataNode> dataNodes,
 +      DistributedFileSystem fs, FSNamesystem ns, String file, INodeFile fileNode,
 +      String clientName, ExtendedBlock previous, int numStripes)
 +      throws Exception {
 +    fs.getClient().namenode.addBlock(file, clientName, previous, null,
 +        fileNode.getId(), null);
 +
 +    final BlockInfo lastBlock = fileNode.getLastBlock();
 +    final int groupSize = fileNode.getPreferredBlockReplication();
 +    assert dataNodes.size() >= groupSize;
 +    // 1. RECEIVING_BLOCK IBR
 +    for (int i = 0; i < groupSize; i++) {
 +      DataNode dn = dataNodes.get(i);
 +      final Block block = new Block(lastBlock.getBlockId() + i, 0,
 +          lastBlock.getGenerationStamp());
 +      DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
 +      StorageReceivedDeletedBlocks[] reports = DFSTestUtil
 +          .makeReportForReceivedBlock(block,
 +              ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
 +      for (StorageReceivedDeletedBlocks report : reports) {
 +        ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
 +      }
 +    }
 +
 +    // 2. RECEIVED_BLOCK IBR
 +    for (int i = 0; i < groupSize; i++) {
 +      DataNode dn = dataNodes.get(i);
 +      final Block block = new Block(lastBlock.getBlockId() + i,
 +          numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp());
 +      DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
 +      StorageReceivedDeletedBlocks[] reports = DFSTestUtil
 +          .makeReportForReceivedBlock(block,
 +              ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
 +      for (StorageReceivedDeletedBlocks report : reports) {
 +        ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
 +      }
 +    }
 +
 +    lastBlock.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS);
 +    return lastBlock;
 +  }
 +
 +  /**
 +   * Because currently DFSStripedOutputStream does not support hflush/hsync,
 +   * tests can use this method to flush all the buffered data to DataNodes.
 +   */
 +  public static ExtendedBlock flushInternal(DFSStripedOutputStream out)
 +      throws IOException {
 +    out.flushInternal();
 +    return out.getBlock();
 +  }
 +
 +  /**
 +   * Verify that blocks in striped block group are on different nodes, and every
 +   * internal blocks exists.
 +   */
 +  public static void verifyLocatedStripedBlocks(LocatedBlocks lbs,
 +       int groupSize) {
 +    for (LocatedBlock lb : lbs.getLocatedBlocks()) {
 +      assert lb instanceof LocatedStripedBlock;
 +      HashSet<DatanodeInfo> locs = new HashSet<>();
 +      for (DatanodeInfo datanodeInfo : lb.getLocations()) {
 +        locs.add(datanodeInfo);
 +      }
 +      assertEquals(groupSize, lb.getLocations().length);
 +      assertEquals(groupSize, locs.size());
 +
 +      // verify that every internal blocks exists
 +      int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices();
 +      assertEquals(groupSize, blockIndices.length);
 +      HashSet<Integer> found = new HashSet<>();
 +      for (int index : blockIndices) {
 +        assert index >=0;
 +        found.add(index);
 +      }
 +      assertEquals(groupSize, found.size());
 +    }
 +  }
++
+   public static void waitForMetric(final JMXGet jmx, final String metricName, final int expectedValue)
+       throws TimeoutException, InterruptedException {
+     GenericTestUtils.waitFor(new Supplier<Boolean>() {
+       @Override
+       public Boolean get() {
+         try {
+           final int currentValue = Integer.parseInt(jmx.getValue(metricName));
+           LOG.info("Waiting for " + metricName +
+                        " to reach value " + expectedValue +
+                        ", current value = " + currentValue);
+           return currentValue == expectedValue;
+         } catch (Exception e) {
+           throw new UnhandledException("Test failed due to unexpected exception", e);
+         }
+       }
+     }, 1000, Integer.MAX_VALUE);
+   }
  }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
index 50f98a3,0000000..c28bff8
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
@@@ -1,160 -1,0 +1,163 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hdfs;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.junit.Assert;
++import org.junit.Ignore;
 +import org.junit.Test;
 +
 +import java.io.IOException;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
 +import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
 +import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
 +import static org.apache.hadoop.hdfs.StripedFileTestUtil.parityBlocks;
 +
 +public class TestWriteStripedFileWithFailure {
 +  public static final Log LOG = LogFactory
 +      .getLog(TestWriteStripedFileWithFailure.class);
 +  private static MiniDFSCluster cluster;
 +  private static FileSystem fs;
 +  private static Configuration conf = new HdfsConfiguration();
 +  private final int smallFileLength = blockSize * dataBlocks - 123;
 +  private final int largeFileLength = blockSize * dataBlocks + 123;
 +  private final int[] fileLengths = {smallFileLength, largeFileLength};
 +
 +  public void setup() throws IOException {
 +    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
 +    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
 +    cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null);
 +    fs = cluster.getFileSystem();
 +  }
 +
 +  public void tearDown() throws IOException {
 +    if (cluster != null) {
 +      cluster.shutdown();
 +    }
 +  }
 +
 +  // Test writing file with some Datanodes failure
++  // TODO: enable this test after HDFS-8704 and HDFS-9040
++  @Ignore
 +  @Test(timeout = 300000)
 +  public void testWriteStripedFileWithDNFailure() throws IOException {
 +    for (int fileLength : fileLengths) {
 +      for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) {
 +        for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) {
 +          try {
 +            // setup a new cluster with no dead datanode
 +            setup();
 +            writeFileWithDNFailure(fileLength, dataDelNum, parityDelNum);
 +          } catch (IOException ioe) {
 +            String fileType = fileLength < (blockSize * dataBlocks) ?
 +                "smallFile" : "largeFile";
 +            LOG.error("Failed to write file with DN failure:"
 +                + " fileType = "+ fileType
 +                + ", dataDelNum = " + dataDelNum
 +                + ", parityDelNum = " + parityDelNum);
 +            throw ioe;
 +          } finally {
 +            // tear down the cluster
 +            tearDown();
 +          }
 +        }
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Test writing a file with shutting down some DNs(data DNs or parity DNs or both).
 +   * @param fileLength file length
 +   * @param dataDNFailureNum the shutdown number of data DNs
 +   * @param parityDNFailureNum the shutdown number of parity DNs
 +   * @throws IOException
 +   */
 +  private void writeFileWithDNFailure(int fileLength,
 +      int dataDNFailureNum, int parityDNFailureNum) throws IOException {
 +    String fileType = fileLength < (blockSize * dataBlocks) ?
 +        "smallFile" : "largeFile";
 +    String src = "/dnFailure_" + dataDNFailureNum + "_" + parityDNFailureNum
 +        + "_" + fileType;
 +    LOG.info("writeFileWithDNFailure: file = " + src
 +        + ", fileType = " + fileType
 +        + ", dataDNFailureNum = " + dataDNFailureNum
 +        + ", parityDNFailureNum = " + parityDNFailureNum);
 +
 +    Path srcPath = new Path(src);
 +    final AtomicInteger pos = new AtomicInteger();
 +    final FSDataOutputStream out = fs.create(srcPath);
 +    final DFSStripedOutputStream stripedOut
 +        = (DFSStripedOutputStream)out.getWrappedStream();
 +
 +    int[] dataDNFailureIndices = StripedFileTestUtil.randomArray(0, dataBlocks,
 +        dataDNFailureNum);
 +    Assert.assertNotNull(dataDNFailureIndices);
 +    int[] parityDNFailureIndices = StripedFileTestUtil.randomArray(dataBlocks,
 +        dataBlocks + parityBlocks, parityDNFailureNum);
 +    Assert.assertNotNull(parityDNFailureIndices);
 +
 +    int[] failedDataNodes = new int[dataDNFailureNum + parityDNFailureNum];
 +    System.arraycopy(dataDNFailureIndices, 0, failedDataNodes,
 +        0, dataDNFailureIndices.length);
 +    System.arraycopy(parityDNFailureIndices, 0, failedDataNodes,
 +        dataDNFailureIndices.length, parityDNFailureIndices.length);
 +
 +    final int killPos = fileLength/2;
 +    for (; pos.get() < fileLength; ) {
 +      final int i = pos.getAndIncrement();
 +      if (i == killPos) {
 +        for(int failedDn : failedDataNodes) {
 +          StripedFileTestUtil.killDatanode(cluster, stripedOut, failedDn, pos);
 +        }
 +      }
 +      write(out, i);
 +    }
 +    out.close();
 +
 +    // make sure the expected number of Datanode have been killed
 +    int dnFailureNum = dataDNFailureNum + parityDNFailureNum;
 +    Assert.assertEquals(cluster.getDataNodes().size(), numDNs - dnFailureNum);
 +
 +    byte[] smallBuf = new byte[1024];
 +    byte[] largeBuf = new byte[fileLength + 100];
 +    final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
 +    StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
 +    StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);
 +    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
 +        smallBuf);
 +    StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf);
 +
 +    // delete the file
 +    fs.delete(srcPath, true);
 +  }
 +
 +  void write(FSDataOutputStream out, int i) throws IOException {
 +    try {
 +      out.write(StripedFileTestUtil.getByte(i));
 +    } catch (IOException e) {
 +      throw new IOException("Failed at i=" + i, e);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1080c373/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
index 0a27614,851e5b9..00a4575
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
@@@ -516,16 -489,16 +516,16 @@@ public class TestPBHelper 
    @Test
    public void testConvertLocatedBlock() {
      LocatedBlock lb = createLocatedBlock();
-     LocatedBlockProto lbProto = PBHelper.convertLocatedBlock(lb);
-     LocatedBlock lb2 = PBHelper.convertLocatedBlockProto(lbProto);
 -    LocatedBlockProto lbProto = PBHelperClient.convert(lb);
 -    LocatedBlock lb2 = PBHelperClient.convert(lbProto);
++    LocatedBlockProto lbProto = PBHelperClient.convertLocatedBlock(lb);
++    LocatedBlock lb2 = PBHelperClient.convertLocatedBlockProto(lbProto);
      compare(lb,lb2);
    }
  
    @Test
    public void testConvertLocatedBlockNoStorageMedia() {
      LocatedBlock lb = createLocatedBlockNoStorageMedia();
-     LocatedBlockProto lbProto = PBHelper.convertLocatedBlock(lb);
-     LocatedBlock lb2 = PBHelper.convertLocatedBlockProto(lbProto);
 -    LocatedBlockProto lbProto = PBHelperClient.convert(lb);
 -    LocatedBlock lb2 = PBHelperClient.convert(lbProto);
++    LocatedBlockProto lbProto = PBHelperClient.convertLocatedBlock(lb);
++    LocatedBlock lb2 = PBHelperClient.convertLocatedBlockProto(lbProto);
      compare(lb,lb2);
    }
  
@@@ -535,8 -508,8 +535,8 @@@
      for (int i=0;i<3;i++) {
        lbl.add(createLocatedBlock());
      }
-     List<LocatedBlockProto> lbpl = PBHelper.convertLocatedBlocks2(lbl);
-     List<LocatedBlock> lbl2 = PBHelper.convertLocatedBlocks(lbpl);
 -    List<LocatedBlockProto> lbpl = PBHelperClient.convertLocatedBlock2(lbl);
 -    List<LocatedBlock> lbl2 = PBHelperClient.convertLocatedBlock(lbpl);
++    List<LocatedBlockProto> lbpl = PBHelperClient.convertLocatedBlocks2(lbl);
++    List<LocatedBlock> lbl2 = PBHelperClient.convertLocatedBlocks(lbpl);
      assertEquals(lbl.size(), lbl2.size());
      for (int i=0;i<lbl.size();i++) {
        compare(lbl.get(i), lbl2.get(2));
@@@ -549,8 -522,8 +549,8 @@@
      for (int i=0;i<3;i++) {
        lbl[i] = createLocatedBlock();
      }
-     LocatedBlockProto [] lbpl = PBHelper.convertLocatedBlocks(lbl);
-     LocatedBlock [] lbl2 = PBHelper.convertLocatedBlocks(lbpl);
 -    LocatedBlockProto [] lbpl = PBHelperClient.convertLocatedBlock(lbl);
 -    LocatedBlock [] lbl2 = PBHelperClient.convertLocatedBlock(lbpl);
++    LocatedBlockProto [] lbpl = PBHelperClient.convertLocatedBlocks(lbl);
++    LocatedBlock [] lbl2 = PBHelperClient.convertLocatedBlocks(lbpl);
      assertEquals(lbl.length, lbl2.length);
      for (int i=0;i<lbl.length;i++) {
        compare(lbl[i], lbl2[i]);
@@@ -664,99 -637,6 +664,99 @@@
          .setType(AclEntryType.OTHER).build();
      AclStatus s = new AclStatus.Builder().owner("foo").group("bar").addEntry(e)
          .build();
-     Assert.assertEquals(s, PBHelper.convert(PBHelper.convert(s)));
+     Assert.assertEquals(s, PBHelperClient.convert(PBHelperClient.convert(s)));
    }
 +  
 +  @Test
 +  public void testBlockECRecoveryCommand() {
 +    DatanodeInfo[] dnInfos0 = new DatanodeInfo[] {
 +        DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() };
 +    DatanodeStorageInfo targetDnInfos_0 = BlockManagerTestUtil
 +        .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
 +            new DatanodeStorage("s00"));
 +    DatanodeStorageInfo targetDnInfos_1 = BlockManagerTestUtil
 +        .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
 +            new DatanodeStorage("s01"));
 +    DatanodeStorageInfo[] targetDnInfos0 = new DatanodeStorageInfo[] {
 +        targetDnInfos_0, targetDnInfos_1 };
 +    short[] liveBlkIndices0 = new short[2];
 +    BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo(
 +        new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0,
 +        liveBlkIndices0, ErasureCodingPolicyManager.getSystemDefaultPolicy());
 +    DatanodeInfo[] dnInfos1 = new DatanodeInfo[] {
 +        DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() };
 +    DatanodeStorageInfo targetDnInfos_2 = BlockManagerTestUtil
 +        .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
 +            new DatanodeStorage("s02"));
 +    DatanodeStorageInfo targetDnInfos_3 = BlockManagerTestUtil
 +        .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
 +            new DatanodeStorage("s03"));
 +    DatanodeStorageInfo[] targetDnInfos1 = new DatanodeStorageInfo[] {
 +        targetDnInfos_2, targetDnInfos_3 };
 +    short[] liveBlkIndices1 = new short[2];
 +    BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo(
 +        new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1,
 +        liveBlkIndices1, ErasureCodingPolicyManager.getSystemDefaultPolicy());
 +    List<BlockECRecoveryInfo> blkRecoveryInfosList = new ArrayList<BlockECRecoveryInfo>();
 +    blkRecoveryInfosList.add(blkECRecoveryInfo0);
 +    blkRecoveryInfosList.add(blkECRecoveryInfo1);
 +    BlockECRecoveryCommand blkECRecoveryCmd = new BlockECRecoveryCommand(
 +        DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, blkRecoveryInfosList);
 +    BlockECRecoveryCommandProto blkECRecoveryCmdProto = PBHelper
 +        .convert(blkECRecoveryCmd);
 +    blkECRecoveryCmd = PBHelper.convert(blkECRecoveryCmdProto);
 +    Iterator<BlockECRecoveryInfo> iterator = blkECRecoveryCmd.getECTasks()
 +        .iterator();
 +    assertBlockECRecoveryInfoEquals(blkECRecoveryInfo0, iterator.next());
 +    assertBlockECRecoveryInfoEquals(blkECRecoveryInfo1, iterator.next());
 +  }
 +
 +  private void assertBlockECRecoveryInfoEquals(
 +      BlockECRecoveryInfo blkECRecoveryInfo1,
 +      BlockECRecoveryInfo blkECRecoveryInfo2) {
 +    assertEquals(blkECRecoveryInfo1.getExtendedBlock(),
 +        blkECRecoveryInfo2.getExtendedBlock());
 +
 +    DatanodeInfo[] sourceDnInfos1 = blkECRecoveryInfo1.getSourceDnInfos();
 +    DatanodeInfo[] sourceDnInfos2 = blkECRecoveryInfo2.getSourceDnInfos();
 +    assertDnInfosEqual(sourceDnInfos1, sourceDnInfos2);
 +
 +    DatanodeInfo[] targetDnInfos1 = blkECRecoveryInfo1.getTargetDnInfos();
 +    DatanodeInfo[] targetDnInfos2 = blkECRecoveryInfo2.getTargetDnInfos();
 +    assertDnInfosEqual(targetDnInfos1, targetDnInfos2);
 +
 +    String[] targetStorageIDs1 = blkECRecoveryInfo1.getTargetStorageIDs();
 +    String[] targetStorageIDs2 = blkECRecoveryInfo2.getTargetStorageIDs();
 +    assertEquals(targetStorageIDs1.length, targetStorageIDs2.length);
 +    for (int i = 0; i < targetStorageIDs1.length; i++) {
 +      assertEquals(targetStorageIDs1[i], targetStorageIDs2[i]);
 +    }
 +
 +    short[] liveBlockIndices1 = blkECRecoveryInfo1.getLiveBlockIndices();
 +    short[] liveBlockIndices2 = blkECRecoveryInfo2.getLiveBlockIndices();
 +    for (int i = 0; i < liveBlockIndices1.length; i++) {
 +      assertEquals(liveBlockIndices1[i], liveBlockIndices2[i]);
 +    }
 +    
 +    ErasureCodingPolicy ecPolicy1 = blkECRecoveryInfo1.getErasureCodingPolicy();
 +    ErasureCodingPolicy ecPolicy2 = blkECRecoveryInfo2.getErasureCodingPolicy();
 +    // Compare ECPolicies same as default ECPolicy as we used system default
 +    // ECPolicy used in this test
 +    compareECPolicies(ErasureCodingPolicyManager.getSystemDefaultPolicy(), ecPolicy1);
 +    compareECPolicies(ErasureCodingPolicyManager.getSystemDefaultPolicy(), ecPolicy2);
 +  }
 +
 +  private void compareECPolicies(ErasureCodingPolicy ecPolicy1, ErasureCodingPolicy ecPolicy2) {
 +    assertEquals(ecPolicy1.getName(), ecPolicy2.getName());
 +    assertEquals(ecPolicy1.getNumDataUnits(), ecPolicy2.getNumDataUnits());
 +    assertEquals(ecPolicy1.getNumParityUnits(), ecPolicy2.getNumParityUnits());
 +  }
 +
 +  private void assertDnInfosEqual(DatanodeInfo[] dnInfos1,
 +      DatanodeInfo[] dnInfos2) {
 +    assertEquals(dnInfos1.length, dnInfos2.length);
 +    for (int i = 0; i < dnInfos1.length; i++) {
 +      compare(dnInfos1[i], dnInfos2[i]);
 +    }
 +  }
  }