You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/30 20:22:36 UTC

[02/52] [abbrv] hadoop git commit: Merge remote-tracking branch 'apache/trunk' into HDFS-7285

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 6093776,0000000..7b21cbe
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@@ -1,258 -1,0 +1,253 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hdfs.server.blockmanagement;
 +
 +import org.apache.hadoop.hdfs.protocol.Block;
 +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 +import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 +
 +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
 +
 +/**
 + * Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
 + *
 + * We still use triplets to store DatanodeStorageInfo for each block in the
 + * block group, as well as the previous/next block in the corresponding
 + * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
 + * are sorted and strictly mapped to the corresponding block.
 + *
 + * Normally each block belonging to group is stored in only one DataNode.
 + * However, it is possible that some block is over-replicated. Thus the triplet
 + * array's size can be larger than (m+k). Thus currently we use an extra byte
 + * array to record the block index for each triplet.
 + */
 +public class BlockInfoStriped extends BlockInfo {
 +  private final ErasureCodingPolicy ecPolicy;
 +  /**
 +   * Always the same size with triplets. Record the block index for each triplet
 +   * TODO: actually this is only necessary for over-replicated block. Thus can
 +   * be further optimized to save memory usage.
 +   */
 +  private byte[] indices;
 +
 +  public BlockInfoStriped(Block blk, ErasureCodingPolicy ecPolicy) {
 +    super(blk, (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()));
 +    indices = new byte[ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()];
 +    initIndices();
 +    this.ecPolicy = ecPolicy;
 +  }
 +
-   BlockInfoStriped(BlockInfoStriped b) {
-     this(b, b.getErasureCodingPolicy());
-     this.setBlockCollection(b.getBlockCollection());
-   }
- 
 +  public short getTotalBlockNum() {
 +    return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
 +  }
 +
 +  public short getDataBlockNum() {
 +    return (short) ecPolicy.getNumDataUnits();
 +  }
 +
 +  public short getParityBlockNum() {
 +    return (short) ecPolicy.getNumParityUnits();
 +  }
 +
 +  /**
 +   * If the block is committed/completed and its length is less than a full
 +   * stripe, it returns the the number of actual data blocks.
 +   * Otherwise it returns the number of data units specified by erasure coding policy.
 +   */
 +  public short getRealDataBlockNum() {
 +    if (isComplete() || getBlockUCState() == BlockUCState.COMMITTED) {
 +      return (short) Math.min(getDataBlockNum(),
 +          (getNumBytes() - 1) / BLOCK_STRIPED_CELL_SIZE + 1);
 +    } else {
 +      return getDataBlockNum();
 +    }
 +  }
 +
 +  public short getRealTotalBlockNum() {
 +    return (short) (getRealDataBlockNum() + getParityBlockNum());
 +  }
 +
 +  public ErasureCodingPolicy getErasureCodingPolicy() {
 +    return ecPolicy;
 +  }
 +
 +  private void initIndices() {
 +    for (int i = 0; i < indices.length; i++) {
 +      indices[i] = -1;
 +    }
 +  }
 +
 +  private int findSlot() {
 +    int i = getTotalBlockNum();
 +    for (; i < getCapacity(); i++) {
 +      if (getStorageInfo(i) == null) {
 +        return i;
 +      }
 +    }
 +    // need to expand the triplet size
 +    ensureCapacity(i + 1, true);
 +    return i;
 +  }
 +
 +  @Override
 +  boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
 +    int blockIndex = BlockIdManager.getBlockIndex(reportedBlock);
 +    int index = blockIndex;
 +    DatanodeStorageInfo old = getStorageInfo(index);
 +    if (old != null && !old.equals(storage)) { // over replicated
 +      // check if the storage has been stored
 +      int i = findStorageInfo(storage);
 +      if (i == -1) {
 +        index = findSlot();
 +      } else {
 +        return true;
 +      }
 +    }
 +    addStorage(storage, index, blockIndex);
 +    return true;
 +  }
 +
 +  private void addStorage(DatanodeStorageInfo storage, int index,
 +      int blockIndex) {
 +    setStorageInfo(index, storage);
 +    setNext(index, null);
 +    setPrevious(index, null);
 +    indices[index] = (byte) blockIndex;
 +  }
 +
 +  private int findStorageInfoFromEnd(DatanodeStorageInfo storage) {
 +    final int len = getCapacity();
 +    for(int idx = len - 1; idx >= 0; idx--) {
 +      DatanodeStorageInfo cur = getStorageInfo(idx);
 +      if (storage.equals(cur)) {
 +        return idx;
 +      }
 +    }
 +    return -1;
 +  }
 +
 +  int getStorageBlockIndex(DatanodeStorageInfo storage) {
 +    int i = this.findStorageInfo(storage);
 +    return i == -1 ? -1 : indices[i];
 +  }
 +
 +  /**
 +   * Identify the block stored in the given datanode storage. Note that
 +   * the returned block has the same block Id with the one seen/reported by the
 +   * DataNode.
 +   */
 +  Block getBlockOnStorage(DatanodeStorageInfo storage) {
 +    int index = getStorageBlockIndex(storage);
 +    if (index < 0) {
 +      return null;
 +    } else {
 +      Block block = new Block(this);
 +      block.setBlockId(this.getBlockId() + index);
 +      return block;
 +    }
 +  }
 +
 +  @Override
 +  boolean removeStorage(DatanodeStorageInfo storage) {
 +    int dnIndex = findStorageInfoFromEnd(storage);
 +    if (dnIndex < 0) { // the node is not found
 +      return false;
 +    }
 +    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
 +        "Block is still in the list and must be removed first.";
 +    // set the triplet to null
 +    setStorageInfo(dnIndex, null);
 +    setNext(dnIndex, null);
 +    setPrevious(dnIndex, null);
 +    indices[dnIndex] = -1;
 +    return true;
 +  }
 +
 +  private void ensureCapacity(int totalSize, boolean keepOld) {
 +    if (getCapacity() < totalSize) {
 +      Object[] old = triplets;
 +      byte[] oldIndices = indices;
 +      triplets = new Object[totalSize * 3];
 +      indices = new byte[totalSize];
 +      initIndices();
 +
 +      if (keepOld) {
 +        System.arraycopy(old, 0, triplets, 0, old.length);
 +        System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
 +      }
 +    }
 +  }
 +
 +  @Override
 +  void replaceBlock(BlockInfo newBlock) {
 +    assert newBlock instanceof BlockInfoStriped;
 +    BlockInfoStriped newBlockGroup = (BlockInfoStriped) newBlock;
 +    final int size = getCapacity();
 +    newBlockGroup.ensureCapacity(size, false);
 +    for (int i = 0; i < size; i++) {
 +      final DatanodeStorageInfo storage = this.getStorageInfo(i);
 +      if (storage != null) {
 +        final int blockIndex = indices[i];
 +        final boolean removed = storage.removeBlock(this);
 +        assert removed : "currentBlock not found.";
 +
 +        newBlockGroup.addStorage(storage, i, blockIndex);
 +        storage.insertToList(newBlockGroup);
 +      }
 +    }
 +  }
 +
 +  public long spaceConsumed() {
 +    // In case striped blocks, total usage by this striped blocks should
 +    // be the total of data blocks and parity blocks because
 +    // `getNumBytes` is the total of actual data block size.
 +    return StripedBlockUtil.spaceConsumedByStripedBlock(getNumBytes(),
 +        ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits(),
 +        BLOCK_STRIPED_CELL_SIZE);
 +    }
 +
 +  @Override
 +  public final boolean isStriped() {
 +    return true;
 +  }
 +
 +  @Override
 +  public int numNodes() {
 +    assert this.triplets != null : "BlockInfo is not initialized";
 +    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
 +    int num = 0;
 +    for (int idx = getCapacity()-1; idx >= 0; idx--) {
 +      if (getStorageInfo(idx) != null) {
 +        num++;
 +      }
 +    }
 +    return num;
 +  }
 +
 +  @Override
 +  final boolean hasNoStorage() {
 +    final int len = getCapacity();
 +    for(int idx = 0; idx < len; idx++) {
 +      if (getStorageInfo(idx) != null) {
 +        return false;
 +      }
 +    }
 +    return true;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index ae08825,95933d2..6c6d758
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@@ -89,12 -84,8 +88,13 @@@ import org.apache.hadoop.hdfs.server.pr
  import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
  import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
  import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+ import org.apache.hadoop.hdfs.util.LightWeightHashSet;
  import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 +
 +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
 +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
 +
  import org.apache.hadoop.metrics2.util.MBeans;
  import org.apache.hadoop.net.Node;
  import org.apache.hadoop.security.UserGroupInformation;
@@@ -216,8 -202,8 +216,8 @@@ public class BlockManager implements Bl
     * Maps a StorageID to the set of blocks that are "extra" for this
     * DataNode. We'll eventually remove these extras.
     */
 -  public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap =
 +  public final Map<String, LightWeightLinkedSet<BlockInfo>> excessReplicateMap =
-     new TreeMap<>();
+     new HashMap<>();
  
    /**
     * Store set of Blocks that need to be replicated 1 or more times.
@@@ -689,26 -662,20 +689,25 @@@
     */
    private BlockInfo completeBlock(final BlockCollection bc,
        final int blkIndex, boolean force) throws IOException {
 -    if(blkIndex < 0)
 +    if (blkIndex < 0) {
        return null;
 +    }
      BlockInfo curBlock = bc.getBlocks()[blkIndex];
 -    if(curBlock.isComplete())
 +    if (curBlock.isComplete()) {
        return curBlock;
 +    }
  
      int numNodes = curBlock.numNodes();
 -    if (!force && numNodes < minReplication)
 +    if (!force && !hasMinStorage(curBlock, numNodes)) {
        throw new IOException("Cannot complete block: " +
            "block does not satisfy minimal replication requirement.");
 -    if(!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED)
 +    }
 +    if (!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
        throw new IOException(
            "Cannot complete block: block has not been COMMITTED by the client");
 -    BlockInfo completeBlock = curBlock.convertToCompleteBlock();
 +    }
 +
 +    final BlockInfo completeBlock = curBlock.convertToCompleteBlock();
- 
      // replace penultimate block in file
      bc.setBlock(blkIndex, completeBlock);
      
@@@ -1276,9 -1186,8 +1276,9 @@@
            " corrupt as it does not belong to any file", b);
        addToInvalidates(b.corrupted, node);
        return;
--    } 
 -    short expectedReplicas = b.corrupted.getReplication();
++    }
 +    short expectedReplicas =
-         getExpectedReplicaNum(b.stored.getBlockCollection(), b.stored);
++        getExpectedReplicaNum(b.stored);
  
      // Add replica to the data-node if it is not already there
      if (storageInfo != null) {
@@@ -1446,10 -1350,10 +1446,10 @@@
      namesystem.writeLock();
      try {
        synchronized (neededReplications) {
 -        for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
 -          for (BlockInfo block : blocksToReplicate.get(priority)) {
 +        for (int priority = 0; priority < blocksToRecover.size(); priority++) {
 +          for (BlockInfo block : blocksToRecover.get(priority)) {
              // block should belong to a file
-             bc = blocksMap.getBlockCollection(block);
+             bc = getBlockCollection(block);
              // abandoned block or block reopened for append
              if (bc == null
                  || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
@@@ -1458,20 -1362,17 +1458,20 @@@
                continue;
              }
  
-             requiredReplication = getExpectedReplicaNum(bc, block);
+             requiredReplication = getExpectedReplicaNum(block);
  
              // get a source data-node
 -            containingNodes = new ArrayList<DatanodeDescriptor>();
 -            List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
 +            containingNodes = new ArrayList<>();
 +            List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
              NumberReplicas numReplicas = new NumberReplicas();
 -            srcNode = chooseSourceDatanode(
 -                block, containingNodes, liveReplicaNodes, numReplicas,
 -                priority);
 -            if(srcNode == null) { // block can not be replicated from any node
 -              LOG.debug("Block " + block + " cannot be repl from any node");
 +            List<Short> liveBlockIndices = new ArrayList<>();
 +            final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
 +                containingNodes, liveReplicaNodes, numReplicas,
 +                liveBlockIndices, priority);
 +            if(srcNodes == null || srcNodes.length == 0) {
 +              // block can not be replicated from any node
 +              LOG.debug("Block " + block + " cannot be recovered " +
 +                  "from any node");
                continue;
              }
  
@@@ -1588,32 -1474,7 +1588,32 @@@
            }
  
            // Add block to the to be replicated list
 -          rw.srcNode.addBlockToBeReplicated(block, targets);
 +          if (block.isStriped()) {
 +            assert rw instanceof ErasureCodingWork;
 +            assert rw.targets.length > 0;
-             String src = block.getBlockCollection().getName();
++            String src = getBlockCollection(block).getName();
 +            ErasureCodingZone ecZone = null;
 +            try {
 +              ecZone = namesystem.getErasureCodingZoneForPath(src);
 +            } catch (IOException e) {
 +              blockLog
 +                  .warn("Failed to get the EC zone for the file {} ", src);
 +            }
 +            if (ecZone == null) {
 +              blockLog.warn("No erasure coding policy found for the file {}. "
 +                  + "So cannot proceed for recovery", src);
 +              // TODO: we may have to revisit later for what we can do better to
 +              // handle this case.
 +              continue;
 +            }
 +            rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
 +                new ExtendedBlock(namesystem.getBlockPoolId(), block),
 +                rw.srcNodes, rw.targets,
 +                ((ErasureCodingWork) rw).liveBlockIndicies,
 +                ecZone.getErasureCodingPolicy());
 +          } else {
 +            rw.srcNodes[0].addBlockToBeReplicated(block, targets);
 +          }
            scheduledWork++;
            DatanodeStorageInfo.incrementBlocksScheduled(targets);
  
@@@ -2079,8 -1924,8 +2080,8 @@@
    private void removeZombieReplicas(BlockReportContext context,
        DatanodeStorageInfo zombie) {
      LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
--             "longer exists on the DataNode.",
--              Long.toHexString(context.getReportId()), zombie.getStorageID());
++            "longer exists on the DataNode.",
++        Long.toHexString(context.getReportId()), zombie.getStorageID());
      assert(namesystem.hasWriteLock());
      Iterator<BlockInfo> iter = zombie.getBlockIterator();
      int prevBlocks = zombie.numBlocks();
@@@ -2324,10 -2164,10 +2325,10 @@@
          // OpenFileBlocks only inside snapshots also will be added to safemode
          // threshold. So we need to update such blocks to safemode
          // refer HDFS-5283
-         if (namesystem.isInSnapshot(storedBlock.getBlockCollection())) {
+         if (namesystem.isInSnapshot(storedBlock)) {
            int numOfReplicas = storedBlock.getUnderConstructionFeature()
                .getNumExpectedLocations();
 -          namesystem.incrementSafeBlockCount(numOfReplicas);
 +          namesystem.incrementSafeBlockCount(numOfReplicas, storedBlock);
          }
          //and fall through to next clause
        }      
@@@ -2720,8 -2541,8 +2721,8 @@@
      // Now check for completion of blocks and safe block count
      int numCurrentReplica = countLiveNodes(storedBlock);
      if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
 -        && numCurrentReplica >= minReplication) {
 +        && hasMinStorage(storedBlock, numCurrentReplica)) {
-       completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
+       completeBlock(getBlockCollection(storedBlock), storedBlock, false);
      } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
        // check whether safe replication is reached for the block
        // only complete blocks are counted towards that.
@@@ -2760,10 -2580,11 +2761,11 @@@
        // it will happen in next block report otherwise.
        return block;
      }
-     BlockCollection bc = storedBlock.getBlockCollection();
+     BlockCollection bc = getBlockCollection(storedBlock);
+     assert bc != null : "Block must belong to a file";
  
      // add block to the datanode
 -    AddBlockResult result = storageInfo.addBlock(storedBlock);
 +    AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock);
  
      int curReplicaDelta;
      if (result == AddBlockResult.ADDED) {
@@@ -3817,9 -3491,9 +3809,9 @@@
     */
    public void checkReplication(BlockCollection bc) {
      for (BlockInfo block : bc.getBlocks()) {
-       short expected = getExpectedReplicaNum(bc, block);
 -      final short expected = block.getReplication();
++      short expected = getExpectedReplicaNum(block);
        final NumberReplicas n = countNodes(block);
-       if (isNeededReplication(block, expected, n.liveReplicas())) { 
+       if (isNeededReplication(block, n.liveReplicas())) {
          neededReplications.add(block, n.liveReplicas(),
              n.decommissionedAndDecommissioning(), expected);
        } else if (n.liveReplicas() > expected) {
@@@ -3978,18 -3605,15 +3968,17 @@@
     * A block needs replication if the number of replicas is less than expected
     * or if it does not have enough racks.
     */
-   boolean isNeededReplication(BlockInfo storedBlock, int expected, int current) {
+   boolean isNeededReplication(BlockInfo storedBlock, int current) {
 -    int expected = storedBlock.getReplication();
 -    return current < expected || !blockHasEnoughRacks(storedBlock);
++    int expected = getExpectedReplicaNum(storedBlock);
 +    return current < expected || !blockHasEnoughRacks(storedBlock, expected);
    }
-   
-   public short getExpectedReplicaNum(BlockCollection bc, BlockInfo block) {
-     if (block.isStriped()) {
-       return ((BlockInfoStriped) block).getRealTotalBlockNum();
-     } else {
-       return bc.getPreferredBlockReplication();
-     }
+ 
+   public short getExpectedReplicaNum(BlockInfo block) {
 -    return block.getReplication();
++    return block.isStriped() ?
++        ((BlockInfoStriped) block).getRealTotalBlockNum() :
++        block.getReplication();
    }
 -  
 +
    public long getMissingBlocksCount() {
      // not locking
      return this.neededReplications.getCorruptBlockSize();
@@@ -4005,22 -3629,13 +3994,22 @@@
      return blocksMap.addBlockCollection(block, bc);
    }
  
 -  public BlockCollection getBlockCollection(BlockInfo b) {
 -    return namesystem.getBlockCollection(b.getBlockCollectionId());
 +  /**
 +   * Do some check when adding a block to blocksmap.
 +   * For HDFS-7994 to check whether then block is a NonEcBlockUsingStripedID.
 +   *
 +   */
 +  public BlockInfo addBlockCollectionWithCheck(
 +      BlockInfo block, BlockCollection bc) {
 +    if (!hasNonEcBlockUsingStripedID && !block.isStriped() &&
 +        BlockIdManager.isStripedBlockID(block.getBlockId())) {
 +      hasNonEcBlockUsingStripedID = true;
 +    }
 +    return addBlockCollection(block, bc);
    }
  
-   public BlockCollection getBlockCollection(Block b) {
-     return blocksMap.getBlockCollection(b);
 -  /** @return an iterator of the datanodes. */
 -  public Iterable<DatanodeStorageInfo> getStorages(final Block block) {
 -    return blocksMap.getStorages(block);
++  public BlockCollection getBlockCollection(BlockInfo b) {
++    return namesystem.getBlockCollection(b.getBlockCollectionId());
    }
  
    public int numCorruptReplicas(Block block) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
index 58b455e,88cf06d..0e92779
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
@@@ -55,11 -58,11 +58,11 @@@ public class BlockUnderConstructionFeat
    private Block truncateBlock;
  
    public BlockUnderConstructionFeature(Block blk,
 -      BlockUCState state, DatanodeStorageInfo[] targets) {
 +      BlockUCState state, DatanodeStorageInfo[] targets, boolean isStriped) {
      assert getBlockUCState() != COMPLETE :
-       "BlockUnderConstructionFeature cannot be in COMPLETE state";
+         "BlockUnderConstructionFeature cannot be in COMPLETE state";
      this.blockUCState = state;
 -    setExpectedLocations(blk, targets);
 +    setExpectedLocations(blk, targets, isStriped);
    }
  
    /** Set expected locations */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index 5bfae42,33c68f3..51d62c1
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@@ -129,18 -118,14 +125,18 @@@ class BlocksMap 
      if (blockInfo == null)
        return;
  
-     blockInfo.setBlockCollection(null);
-     final int size = blockInfo instanceof BlockInfoContiguous ?
-         blockInfo.numNodes() : blockInfo.getCapacity();
+     blockInfo.setBlockCollectionId(INodeId.INVALID_INODE_ID);
 -    for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) {
++    final int size = blockInfo.isStriped() ?
++        blockInfo.getCapacity() : blockInfo.numNodes();
 +    for(int idx = size - 1; idx >= 0; idx--) {
        DatanodeDescriptor dn = blockInfo.getDatanode(idx);
 -      dn.removeBlock(blockInfo); // remove from the list and wipe the location
 +      if (dn != null) {
 +        dn.removeBlock(blockInfo); // remove from the list and wipe the location
 +      }
      }
    }
 -  
 -  /** Returns the block object it it exists in the map. */
 +
 +  /** Returns the block object if it exists in the map. */
    BlockInfo getStoredBlock(Block b) {
      return blocks.get(b);
    }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index a4d5442,7e3c59b..29e541c
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@@ -38,11 -39,10 +38,12 @@@ import org.apache.hadoop.fs.StorageType
  import org.apache.hadoop.hdfs.protocol.Block;
  import org.apache.hadoop.hdfs.protocol.DatanodeID;
  import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 +import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
  import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
  import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
  import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
  import org.apache.hadoop.hdfs.server.protocol.StorageReport;
  import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
  import org.apache.hadoop.hdfs.util.EnumCounters;
@@@ -223,12 -222,8 +224,11 @@@ public class DatanodeDescriptor extend
    /** A queue of blocks to be replicated by this datanode */
    private final BlockQueue<BlockTargetPair> replicateBlocks =
        new BlockQueue<>();
 +  /** A queue of blocks to be erasure coded by this datanode */
 +  private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks =
 +      new BlockQueue<>();
    /** A queue of blocks to be recovered by this datanode */
-   private final BlockQueue<BlockInfo> recoverBlocks =
-       new BlockQueue<>();
+   private final BlockQueue<BlockInfo> recoverBlocks = new BlockQueue<>();
    /** A set of blocks to be invalidated by this datanode */
    private final LightWeightHashSet<Block> invalidateBlocks =
        new LightWeightHashSet<>();
@@@ -696,24 -662,27 +696,34 @@@
      }
    }
  
 +  @VisibleForTesting
 +  public boolean containsInvalidateBlock(Block block) {
 +    synchronized (invalidateBlocks) {
 +      return invalidateBlocks.contains(block);
 +    }
 +  }
 +
    /**
-    * @return Approximate number of blocks currently scheduled to be written 
+    * Return the sum of remaining spaces of the specified type. If the remaining
+    * space of a storage is less than minSize, it won't be counted toward the
+    * sum.
+    *
+    * @param t The storage type. If null, the type is ignored.
+    * @param minSize The minimum free space required.
+    * @return the sum of remaining spaces that are bigger than minSize.
     */
-   public long getRemaining(StorageType t) {
+   public long getRemaining(StorageType t, long minSize) {
      long remaining = 0;
-     for(DatanodeStorageInfo s : getStorageInfos()) {
-       if (s.getStorageType() == t) {
-         remaining += s.getRemaining();
+     for (DatanodeStorageInfo s : getStorageInfos()) {
+       if (s.getState() == State.NORMAL &&
+           (t == null || s.getStorageType() == t)) {
+         long r = s.getRemaining();
+         if (r >= minSize) {
+           remaining += r;
+         }
        }
      }
-     return remaining;    
+     return remaining;
    }
  
    /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index 5e3cac2,1a20ab0..a80bfd6
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@@ -233,16 -234,16 +234,16 @@@ public class DecommissionManager 
    }
  
    /**
 -   * Checks whether a block is sufficiently replicated for decommissioning.
 -   * Full-strength replication is not always necessary, hence "sufficient".
 +   * Checks whether a block is sufficiently replicated/stored for
 +   * decommissioning. For replicated blocks or striped blocks, full-strength
 +   * replication or storage is not always necessary, hence "sufficient".
     * @return true if sufficient, else false.
     */
 -  private boolean isSufficientlyReplicated(BlockInfo block,
 -      BlockCollection bc,
 +  private boolean isSufficient(BlockInfo block, BlockCollection bc,
        NumberReplicas numberReplicas) {
-     final int numExpected = blockManager.getExpectedReplicaNum(bc, block);
 -    final int numExpected = block.getReplication();
++    final int numExpected = blockManager.getExpectedReplicaNum(block);
      final int numLive = numberReplicas.liveReplicas();
-     if (!blockManager.isNeededReplication(block, numExpected, numLive)) {
+     if (!blockManager.isNeededReplication(block, numLive)) {
        // Block doesn't need replication. Skip.
        LOG.trace("Block {} does not need replication.", block);
        return true;
@@@ -274,11 -274,12 +275,12 @@@
      return false;
    }
  
-   private void logBlockReplicationInfo(BlockInfo block, BlockCollection bc,
 -  private static void logBlockReplicationInfo(BlockInfo block,
++  private void logBlockReplicationInfo(BlockInfo block,
+       BlockCollection bc,
        DatanodeDescriptor srcNode, NumberReplicas num,
        Iterable<DatanodeStorageInfo> storages) {
      int curReplicas = num.liveReplicas();
-     int curExpectedReplicas = blockManager.getExpectedReplicaNum(bc, block);
 -    int curExpectedReplicas = block.getReplication();
++    int curExpectedReplicas = blockManager.getExpectedReplicaNum(block);
      StringBuilder nodeList = new StringBuilder();
      for (DatanodeStorageInfo storage : storages) {
        final DatanodeDescriptor node = storage.getDatanodeDescriptor();
@@@ -530,8 -536,10 +533,9 @@@
            continue;
          }
  
+         BlockCollection bc = namesystem.getBlockCollection(bcId);
          final NumberReplicas num = blockManager.countNodes(block);
          final int liveReplicas = num.liveReplicas();
 -        final int curReplicas = liveReplicas;
  
          // Schedule under-replicated blocks for replication if not already
          // pending
@@@ -542,9 -549,9 +545,9 @@@
                namesystem.isPopulatingReplQueues()) {
              // Process these blocks only when active NN is out of safe mode.
              blockManager.neededReplications.add(block,
 -                curReplicas,
 +                liveReplicas,
                  num.decommissionedAndDecommissioning(),
-                 blockManager.getExpectedReplicaNum(bc, block));
 -                block.getReplication());
++                blockManager.getExpectedReplicaNum(block));
            }
          }
  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java
index 479ee4c,0000000..7a52273
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java
@@@ -1,85 -1,0 +1,86 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hdfs.server.blockmanagement;
 +
 +import org.apache.hadoop.classification.InterfaceAudience;
 +import org.apache.hadoop.hdfs.protocol.Block;
 +import org.apache.hadoop.util.SequentialNumber;
 +
 +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BLOCK_GROUP_INDEX_MASK;
 +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_BLOCKS_IN_GROUP;
 +
 +/**
 + * Generate the next valid block group ID by incrementing the maximum block
 + * group ID allocated so far, with the first 2^10 block group IDs reserved.
 + * HDFS-EC introduces a hierarchical protocol to name blocks and groups:
 + * Contiguous: {reserved block IDs | flag | block ID}
 + * Striped: {reserved block IDs | flag | block group ID | index in group}
 + *
 + * Following n bits of reserved block IDs, The (n+1)th bit in an ID
 + * distinguishes contiguous (0) and striped (1) blocks. For a striped block,
 + * bits (n+2) to (64-m) represent the ID of its block group, while the last m
 + * bits represent its index of the group. The value m is determined by the
 + * maximum number of blocks in a group (MAX_BLOCKS_IN_GROUP).
 + *
 + * Note that the {@link #nextValue()} methods requires external lock to
 + * guarantee IDs have no conflicts.
 + */
 +@InterfaceAudience.Private
 +public class SequentialBlockGroupIdGenerator extends SequentialNumber {
 +
 +  private final BlockManager blockManager;
 +
 +  SequentialBlockGroupIdGenerator(BlockManager blockManagerRef) {
 +    super(Long.MIN_VALUE);
 +    this.blockManager = blockManagerRef;
 +  }
 +
 +  @Override // NumberGenerator
 +  public long nextValue() {
 +    skipTo((getCurrentValue() & ~BLOCK_GROUP_INDEX_MASK) + MAX_BLOCKS_IN_GROUP);
 +    // Make sure there's no conflict with existing random block IDs
 +    final Block b = new Block(getCurrentValue());
 +    while (hasValidBlockInRange(b)) {
 +      skipTo(getCurrentValue() + MAX_BLOCKS_IN_GROUP);
 +      b.setBlockId(getCurrentValue());
 +    }
 +    if (b.getBlockId() >= 0) {
 +      throw new IllegalStateException("All negative block group IDs are used, "
 +          + "growing into positive IDs, "
 +          + "which might conflict with non-erasure coded blocks.");
 +    }
 +    return getCurrentValue();
 +  }
 +
 +  /**
 +   * @param b A block object whose id is set to the starting point for check
 +   * @return true if any ID in the range
-    *      {id, id+HdfsConstants.MAX_BLOCKS_IN_GROUP} is pointed-to by a file
++   *      {id, id+HdfsConstants.MAX_BLOCKS_IN_GROUP} is pointed-to by a stored
++   *      block.
 +   */
 +  private boolean hasValidBlockInRange(Block b) {
 +    final long id = b.getBlockId();
 +    for (int i = 0; i < MAX_BLOCKS_IN_GROUP; i++) {
 +      b.setBlockId(id + i);
-       if (blockManager.getBlockCollection(b) != null) {
++      if (blockManager.getStoredBlock(b) != null) {
 +        return true;
 +      }
 +    }
 +    return false;
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java
index 6074784,f053b7b..631b435
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java
@@@ -19,6 -19,8 +19,7 @@@ package org.apache.hadoop.hdfs.server.b
  
  import org.apache.hadoop.classification.InterfaceAudience;
  import org.apache.hadoop.hdfs.protocol.Block;
 -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+ import org.apache.hadoop.hdfs.server.namenode.INodeId;
  import org.apache.hadoop.util.SequentialNumber;
  
  /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 1b695e3,5bc50b0..82a0f62
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@@ -2158,11 -2103,14 +2128,11 @@@ public class DataNode extends Reconfigu
          //
          // Header info
          //
 -        Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
 -        if (isBlockTokenEnabled) {
 -          accessToken = blockPoolTokenSecretManager.generateToken(b, 
 -              EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
 -        }
 +        Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b, 
 +            EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
  
          long writeTimeout = dnConf.socketWriteTimeout + 
-                             HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
+                             HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
          OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
          InputStream unbufIn = NetUtils.getInputStream(sock);
          DataEncryptionKeyFactory keyFactory =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
index 22d821f,0000000..a0ac033
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
@@@ -1,163 -1,0 +1,163 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hdfs.server.namenode;
 +
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Lists;
 +
 +import org.apache.hadoop.fs.XAttr;
 +import org.apache.hadoop.fs.XAttrSetFlag;
 +import org.apache.hadoop.hdfs.XAttrHelper;
 +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
 +import org.apache.hadoop.io.IOUtils;
 +import org.apache.hadoop.io.WritableUtils;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.EnumSet;
 +import java.util.List;
 +
 +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_ERASURECODING_ZONE;
 +
 +/**
 + * Manages the list of erasure coding zones in the filesystem.
 + * <p/>
 + * The ErasureCodingZoneManager has its own lock, but relies on the FSDirectory
 + * lock being held for many operations. The FSDirectory lock should not be
 + * taken if the manager lock is already held.
 + * TODO: consolidate zone logic w/ encrypt. zones {@link EncryptionZoneManager}
 + */
 +public class ErasureCodingZoneManager {
 +  private final FSDirectory dir;
 +
 +  /**
 +   * Construct a new ErasureCodingZoneManager.
 +   *
 +   * @param dir Enclosing FSDirectory
 +   */
 +  public ErasureCodingZoneManager(FSDirectory dir) {
 +    this.dir = dir;
 +  }
 +
 +  ErasureCodingPolicy getErasureCodingPolicy(INodesInPath iip) throws IOException {
 +    ErasureCodingZone ecZone = getErasureCodingZone(iip);
 +    return ecZone == null ? null : ecZone.getErasureCodingPolicy();
 +  }
 +
 +  ErasureCodingZone getErasureCodingZone(INodesInPath iip) throws IOException {
 +    assert dir.hasReadLock();
 +    Preconditions.checkNotNull(iip, "INodes cannot be null");
 +    List<INode> inodes = iip.getReadOnlyINodes();
 +    for (int i = inodes.size() - 1; i >= 0; i--) {
 +      final INode inode = inodes.get(i);
 +      if (inode == null) {
 +        continue;
 +      }
 +      // We don't allow symlinks in an EC zone, or pointing to a file/dir in
 +      // an EC. Therefore if a symlink is encountered, the dir shouldn't have
 +      // EC
 +      // TODO: properly support symlinks in EC zones
 +      if (inode.isSymlink()) {
 +        return null;
 +      }
 +      final List<XAttr> xAttrs = inode.getXAttrFeature() == null ?
 +          new ArrayList<XAttr>(0)
 +          : inode.getXAttrFeature().getXAttrs();
 +      for (XAttr xAttr : xAttrs) {
-         if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixName(xAttr))) {
++        if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixedName(xAttr))) {
 +          ByteArrayInputStream bIn=new ByteArrayInputStream(xAttr.getValue());
 +          DataInputStream dIn=new DataInputStream(bIn);
 +          String ecPolicyName = WritableUtils.readString(dIn);
 +          ErasureCodingPolicy ecPolicy = dir.getFSNamesystem()
 +              .getErasureCodingPolicyManager().getPolicy(ecPolicyName);
 +          return new ErasureCodingZone(dir.getInode(inode.getId())
 +              .getFullPathName(), ecPolicy);
 +        }
 +      }
 +    }
 +    return null;
 +  }
 +
 +  List<XAttr> createErasureCodingZone(final INodesInPath srcIIP,
 +      ErasureCodingPolicy ecPolicy) throws IOException {
 +    assert dir.hasWriteLock();
 +    Preconditions.checkNotNull(srcIIP, "INodes cannot be null");
 +    String src = srcIIP.getPath();
 +    if (dir.isNonEmptyDirectory(srcIIP)) {
 +      throw new IOException(
 +          "Attempt to create an erasure coding zone for a " +
 +              "non-empty directory " + src);
 +    }
 +    if (srcIIP.getLastINode() != null &&
 +        !srcIIP.getLastINode().isDirectory()) {
 +      throw new IOException("Attempt to create an erasure coding zone " +
 +          "for a file " + src);
 +    }
 +    if (getErasureCodingPolicy(srcIIP) != null) {
 +      throw new IOException("Directory " + src + " is already in an " +
 +          "erasure coding zone.");
 +    }
 +
 +    // System default erasure coding policy will be used since no specified.
 +    if (ecPolicy == null) {
 +      ecPolicy = ErasureCodingPolicyManager.getSystemDefaultPolicy();
 +    }
 +
 +    final XAttr ecXAttr;
 +    DataOutputStream dOut = null;
 +    try {
 +      ByteArrayOutputStream bOut = new ByteArrayOutputStream();
 +      dOut = new DataOutputStream(bOut);
 +      WritableUtils.writeString(dOut, ecPolicy.getName());
 +      ecXAttr = XAttrHelper.buildXAttr(XATTR_ERASURECODING_ZONE,
 +          bOut.toByteArray());
 +    } finally {
 +      IOUtils.closeStream(dOut);
 +    }
 +    final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
 +    xattrs.add(ecXAttr);
 +    FSDirXAttrOp.unprotectedSetXAttrs(dir, src, xattrs,
 +        EnumSet.of(XAttrSetFlag.CREATE));
 +    return xattrs;
 +  }
 +
 +  void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP, String src)
 +      throws IOException {
 +    assert dir.hasReadLock();
 +    final ErasureCodingZone srcZone = getErasureCodingZone(srcIIP);
 +    final ErasureCodingZone dstZone = getErasureCodingZone(dstIIP);
 +    if (srcZone != null && srcZone.getDir().equals(src) && dstZone == null) {
 +      return;
 +    }
 +    final ErasureCodingPolicy srcECPolicy =
 +        srcZone != null ? srcZone.getErasureCodingPolicy() : null;
 +    final ErasureCodingPolicy dstECPolicy =
 +        dstZone != null ? dstZone.getErasureCodingPolicy() : null;
 +    if (srcECPolicy != null && !srcECPolicy.equals(dstECPolicy) ||
 +        dstECPolicy != null && !dstECPolicy.equals(srcECPolicy)) {
 +      throw new IOException(
 +          src + " can't be moved because the source and destination have " +
 +              "different erasure coding policies.");
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
index 9b08092,df0bc20..4bed13e
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
@@@ -122,7 -122,7 +122,7 @@@ public class FSDirAttrOp 
                                              " does not exist.");
        }
        boolean changed = unprotectedSetTimes(fsd, inode, mtime, atime, true,
--                                            iip.getLatestSnapshotId());
++          iip.getLatestSnapshotId());
        if (changed) {
          fsd.getEditLog().logTimes(src, mtime, atime);
        }
@@@ -399,30 -397,26 +397,30 @@@
    }
  
    static BlockInfo[] unprotectedSetReplication(
-       FSDirectory fsd, String src, short replication, short[] blockRepls)
+       FSDirectory fsd, String src, short replication)
        throws QuotaExceededException, UnresolvedLinkException,
 -             SnapshotAccessControlException {
 +      SnapshotAccessControlException, UnsupportedActionException {
      assert fsd.hasWriteLock();
  
+     final BlockManager bm = fsd.getBlockManager();
      final INodesInPath iip = fsd.getINodesInPath4Write(src, true);
      final INode inode = iip.getLastINode();
      if (inode == null || !inode.isFile()) {
        return null;
      }
      INodeFile file = inode.asFile();
 +    if (file.isStriped()) {
 +      throw new UnsupportedActionException(
 +          "Cannot set replication to a file with striped blocks");
 +    }
  
-     final short oldBR = file.getPreferredBlockReplication();
+     // Make sure the directory has sufficient quotas
+     short oldBR = file.getPreferredBlockReplication();
  
-     // before setFileReplication, check for increasing block replication.
-     // if replication > oldBR, then newBR == replication.
-     // if replication < oldBR, we don't know newBR yet.
-     if (replication > oldBR) {
-       long dsDelta = file.storagespaceConsumed(null).getStorageSpace() / oldBR;
-       fsd.updateCount(iip, 0L, dsDelta, oldBR, replication, true);
+     // Ensure the quota does not exceed
+     if (oldBR < replication) {
+       long size = file.computeFileSize(true, true);
+       fsd.updateCount(iip, 0L, size, oldBR, replication, true);
      }
  
      file.setFileReplication(replication, iip.getLatestSnapshotId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 68aef76,e9d0806..e480959
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@@ -46,7 -44,6 +46,8 @@@ import org.apache.hadoop.hdfs.protocol.
  import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
  import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
  import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
++
  import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
  import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
  import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
@@@ -529,32 -516,15 +530,32 @@@ class FSDirWriteFileOp 
        final INodeFile fileINode = inodesInPath.getLastINode().asFile();
        Preconditions.checkState(fileINode.isUnderConstruction());
  
 -      // check quota limits and updated space consumed
 -      fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
 -          fileINode.getFileReplication(), true);
 -
        // associate new last block for the file
 -      BlockInfo blockInfo = new BlockInfoContiguous(block,
 -          fileINode.getFileReplication());
 -      blockInfo.convertToBlockUnderConstruction(
 -          HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
 +      final BlockInfo blockInfo;
 +      if (isStriped) {
 +        ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
 +            fsd.getFSNamesystem(), inodesInPath);
 +        ErasureCodingPolicy ecPolicy = ecZone.getErasureCodingPolicy();
 +        short numDataUnits = (short) ecPolicy.getNumDataUnits();
 +        short numParityUnits = (short) ecPolicy.getNumParityUnits();
 +        short numLocations = (short) (numDataUnits + numParityUnits);
 +
 +        // check quota limits and updated space consumed
 +        fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
 +            numLocations, true);
 +        blockInfo = new BlockInfoStriped(block, ecPolicy);
 +        blockInfo.convertToBlockUnderConstruction(
 +            HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
 +      } else {
 +        // check quota limits and updated space consumed
 +        fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
-             fileINode.getPreferredBlockReplication(), true);
++            fileINode.getFileReplication(), true);
 +
 +        short numLocations = fileINode.getFileReplication();
 +        blockInfo = new BlockInfoContiguous(block, numLocations);
 +        blockInfo.convertToBlockUnderConstruction(
 +            HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
 +      }
        fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
        fileINode.addBlock(blockInfo);
  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index a61161f,f22762c..7203316
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@@ -1077,29 -1048,19 +1077,29 @@@ public class FSEditLogLoader 
            // TODO: shouldn't this only be true for the last block?
            // what about an old-version fsync() where fsync isn't called
            // until several blocks in?
 -          newBI = new BlockInfoContiguous(newBlock,
 -              file.getPreferredBlockReplication());
 +          if (isStriped) {
 +            newBI = new BlockInfoStriped(newBlock,
 +                ecZone.getErasureCodingPolicy());
 +          } else {
 +            newBI = new BlockInfoContiguous(newBlock,
 +                file.getPreferredBlockReplication());
 +          }
-           newBI.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
-               null);
+           newBI.convertToBlockUnderConstruction(
 -              HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
++              BlockUCState.UNDER_CONSTRUCTION, null);
          } else {
            // OP_CLOSE should add finalized blocks. This code path
            // is only executed when loading edits written by prior
            // versions of Hadoop. Current versions always log
            // OP_ADD operations as each block is allocated.
 -          newBI = new BlockInfoContiguous(newBlock,
 -              file.getFileReplication());
 +          if (isStriped) {
 +            newBI = new BlockInfoStriped(newBlock,
 +                ErasureCodingPolicyManager.getSystemDefaultPolicy());
 +          } else {
 +            newBI = new BlockInfoContiguous(newBlock,
-                 file.getPreferredBlockReplication());
++                file.getFileReplication());
 +          }
          }
 -        fsNamesys.getBlockManager().addBlockCollection(newBI, file);
 +        fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBI, file);
          file.addBlock(newBI);
          fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
        }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index ffaf86b,ac88919..a115138
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@@ -43,9 -42,9 +43,10 @@@ import org.apache.hadoop.hdfs.protocol.
  import org.apache.hadoop.hdfs.protocol.HdfsConstants;
  import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
  import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
  import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
  import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
  import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
  import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
  import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 0d9d427,f4952f7..5f39446
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@@ -7423,11 -7287,30 +7435,35 @@@ public class FSNamesystem implements Na
        logger.addAppender(asyncAppender);        
      }
    }
 -
+   /**
+    * Return total number of Sync Operations on FSEditLog.
+    */
+   @Override
+   @Metric({"TotalSyncCount",
+               "Total number of sync operations performed on edit logs"})
+   public long getTotalSyncCount() {
+     return fsImage.editLog.getTotalSyncCount();
+   }
+ 
+   /**
+    * Return total time spent doing sync operations on FSEditLog.
+    */
+   @Override
+   @Metric({"TotalSyncTimes",
+               "Total time spend in sync operation on various edit logs"})
+   public String getTotalSyncTimes() {
+     JournalSet journalSet = fsImage.editLog.getJournalSet();
+     if (journalSet != null) {
+       return journalSet.getSyncTimes();
+     } else {
+       return "";
+     }
+   }
 +
 +  @Override
 +  public ErasureCodingZone getErasureCodingZoneForPath(String src)
 +      throws IOException {
 +    return FSDirErasureCodingOp.getErasureCodingZone(this, src);
 +  }
  }
  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 4fa457d,d546905..ae9b0d2
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@@ -38,7 -37,7 +38,8 @@@ import org.apache.hadoop.hdfs.protocol.
  import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
  import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
  import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
  import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
  import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
  import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index 9d43c15,7ebe859..e3363a4
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@@ -258,8 -254,7 +258,8 @@@ public class NamenodeFsck implements Da
        NumberReplicas numberReplicas= bm.countNodes(blockInfo);
        out.println("Block Id: " + blockId);
        out.println("Block belongs to: "+iNode.getFullPathName());
 -      out.println("No. of Expected Replica: " + blockInfo.getReplication());
 +      out.println("No. of Expected Replica: " +
-           bm.getExpectedReplicaNum(bc, blockInfo));
++          bm.getExpectedReplicaNum(blockInfo));
        out.println("No. of live Replica: " + numberReplicas.liveReplicas());
        out.println("No. of excess Replica: " + numberReplicas.excessReplicas());
        out.println("No. of stale Replica: " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index bae033b,4a208d8..923a335
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@@ -17,11 -17,9 +17,12 @@@
   */
  package org.apache.hadoop.hdfs.server.namenode;
  
 +import java.io.IOException;
 +
  import org.apache.hadoop.classification.InterfaceAudience;
  import org.apache.hadoop.hdfs.protocol.Block;
 +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
+ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
  import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
  import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
  import org.apache.hadoop.hdfs.util.RwLock;
@@@ -48,17 -48,7 +51,17 @@@ public interface Namesystem extends RwL
  
    void checkOperation(OperationCategory read) throws StandbyException;
  
-   boolean isInSnapshot(BlockCollection bc);
- 
 +  /**
 +   * Gets the ECZone for path
 +   * @param src
 +   *          - path
 +   * @return {@link ErasureCodingZone}
 +   * @throws IOException
 +   */
 +  ErasureCodingZone getErasureCodingZoneForPath(String src)
 +      throws IOException;
 +
+   boolean isInSnapshot(BlockInfo blockUC);
+ 
    CacheManager getCacheManager();
  }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
index 0162f85,91ebaaf..450d981
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
@@@ -245,11 -245,10 +247,11 @@@ public class FSImageFormatPBSnapshot 
          BlockInfo[] blocks = new BlockInfo[bpl.size()];
          for(int j = 0, e = bpl.size(); j < e; ++j) {
            Block blk = PBHelper.convert(bpl.get(j));
-           BlockInfo storedBlock =  fsn.getBlockManager().getStoredBlock(blk);
+           BlockInfo storedBlock = bm.getStoredBlock(blk);
            if(storedBlock == null) {
 -            storedBlock = bm.addBlockCollection(
 -                new BlockInfoContiguous(blk, copy.getFileReplication()), file);
 +            storedBlock = (BlockInfoContiguous) fsn.getBlockManager()
 +                .addBlockCollectionWithCheck(new BlockInfoContiguous(blk,
 +                    copy.getFileReplication()), file);
            }
            blocks[j] = storedBlock;
          }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index fcf2bc1,96776e4..0db56dd
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@@ -2413,24 -2403,13 +2403,34 @@@
  </property>
  
  <property>
 +  <name>dfs.datanode.stripedread.threshold.millis</name>
 +  <value>5000</value>
 +  <description>datanode striped read threshold in millisecond.
 +  </description>
 +</property>
 +
 +<property>
 +  <name>dfs.datanode.stripedread.threads</name>
 +  <value>20</value>
 +  <description>datanode striped read thread pool size.
 +  </description>
 +</property>
 +
 +<property>
 +  <name>dfs.datanode.stripedread.buffer.size</name>
 +  <value>262144</value>
 +  <description>datanode striped read buffer size.
 +  </description>
 +</property>
 +
++<property>
+   <name>dfs.namenode.quota.init-threads</name>
+   <value>4</value>
+   <description>
+     The number of concurrent threads to be used in quota initialization. The
+     speed of quota initialization also affects the namenode fail-over latency.
+     If the size of name space is big, try increasing this.
+   </description>
+ </property>
+ 
  </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 59daba4,24e0965..4bb5c64
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@@ -119,8 -119,8 +119,9 @@@ import org.apache.hadoop.security.UserG
  import org.apache.hadoop.security.authorize.ProxyUsers;
  import org.apache.hadoop.test.GenericTestUtils;
  import org.apache.hadoop.util.ExitUtil;
+ import org.apache.hadoop.util.ShutdownHookManager;
  import org.apache.hadoop.util.StringUtils;
 +import org.apache.hadoop.util.ShutdownHookManager;
  import org.apache.hadoop.util.ToolRunner;
  
  import com.google.common.base.Joiner;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index ad8f204,c1ed758..eb24fb0
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@@ -955,9 -939,9 +958,9 @@@ public class TestBalancer 
    void testBalancer1Internal(Configuration conf) throws Exception {
      initConf(conf);
      testUnevenDistribution(conf,
--        new long[] {50*CAPACITY/100, 10*CAPACITY/100},
++        new long[]{50 * CAPACITY / 100, 10 * CAPACITY / 100},
          new long[]{CAPACITY, CAPACITY},
--        new String[] {RACK0, RACK1});
++        new String[]{RACK0, RACK1});
    }
    
    @Test(expected=HadoopIllegalArgumentException.class)
@@@ -971,7 -955,7 +974,7 @@@
    public void testBalancerWithNonZeroThreadsForMove() throws Exception {
      Configuration conf = new HdfsConfiguration();
      conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 8);
--    testBalancer1Internal (conf);
++    testBalancer1Internal(conf);
    }
    
    @Test(timeout=100000)
@@@ -981,8 -965,8 +984,8 @@@
    
    void testBalancer2Internal(Configuration conf) throws Exception {
      initConf(conf);
--    testBalancerDefaultConstructor(conf, new long[] { CAPACITY, CAPACITY },
--        new String[] { RACK0, RACK1 }, CAPACITY, RACK2);
++    testBalancerDefaultConstructor(conf, new long[]{CAPACITY, CAPACITY},
++        new String[]{RACK0, RACK1}, CAPACITY, RACK2);
    }
  
    private void testBalancerDefaultConstructor(Configuration conf,
@@@ -1555,75 -1540,116 +1559,183 @@@
      }
    }
  
+   /** Balancer should not move blocks with size < minBlockSize. */
+   @Test(timeout=60000)
+   public void testMinBlockSizeAndSourceNodes() throws Exception {
+     final Configuration conf = new HdfsConfiguration();
+     initConf(conf);
+  
+     final short replication = 3;
+     final long[] lengths = {10, 10, 10, 10}; 
+     final long[] capacities = new long[replication];
+     final long totalUsed = capacities.length * sum(lengths);
+     Arrays.fill(capacities, 1000);
+ 
+     cluster = new MiniDFSCluster.Builder(conf)
+         .numDataNodes(capacities.length)
+         .simulatedCapacities(capacities)
+         .build();
+     final DistributedFileSystem dfs = cluster.getFileSystem();
+ 
+     try {
+       cluster.waitActive();
+       client = NameNodeProxies.createProxy(conf, dfs.getUri(),
+           ClientProtocol.class).getProxy();
+       
+       // fill up the cluster to be 80% full
+       for(int i = 0; i < lengths.length; i++) {
+         final long size = lengths[i];
+         final Path p = new Path("/file" + i + "_size" + size);
+         try(final OutputStream out = dfs.create(p)) {
+           for(int j = 0; j < size; j++) {
+             out.write(j);
+           }
+         }
+       }
+       
+       // start up an empty node with the same capacity
+       cluster.startDataNodes(conf, capacities.length, true, null, null, capacities);
+       LOG.info("capacities    = " + Arrays.toString(capacities));
+       LOG.info("totalUsedSpace= " + totalUsed);
+       LOG.info("lengths       = " + Arrays.toString(lengths) + ", #=" + lengths.length);
+       waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster);
+       
+       final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+ 
+       { // run Balancer with min-block-size=50
+         final Parameters p = new Parameters(
+             BalancingPolicy.Node.INSTANCE, 1,
+             NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
+             Collections.<String> emptySet(), Collections.<String> emptySet(),
+             Collections.<String> emptySet(), false);
+ 
+         conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
+         final int r = Balancer.run(namenodes, p, conf);
+         assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+       }
+       
+       conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+ 
+       { // run Balancer with empty nodes as source nodes
+         final Set<String> sourceNodes = new HashSet<>();
+         final List<DataNode> datanodes = cluster.getDataNodes();
+         for(int i = capacities.length; i < datanodes.size(); i++) {
+           sourceNodes.add(datanodes.get(i).getDisplayName());
+         }
+         final Parameters p = new Parameters(
+           BalancingPolicy.Node.INSTANCE, 1,
+           NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
+           Collections.<String> emptySet(), Collections.<String> emptySet(),
+           sourceNodes, false);
+ 
+         conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
+         final int r = Balancer.run(namenodes, p, conf);
+         assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
+       }
+ 
+       { // run Balancer with a filled node as a source node
+         final Set<String> sourceNodes = new HashSet<>();
+         final List<DataNode> datanodes = cluster.getDataNodes();
+         sourceNodes.add(datanodes.get(0).getDisplayName());
+         final Parameters p = new Parameters(
+           BalancingPolicy.Node.INSTANCE, 1,
+           NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
+           Collections.<String> emptySet(), Collections.<String> emptySet(),
+           sourceNodes, false);
+ 
+         conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+         final int r = Balancer.run(namenodes, p, conf);
+         assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
+       }
+ 
+       { // run Balancer with all filled node as source nodes
+         final Set<String> sourceNodes = new HashSet<>();
+         final List<DataNode> datanodes = cluster.getDataNodes();
+         for(int i = 0; i < capacities.length; i++) {
+           sourceNodes.add(datanodes.get(i).getDisplayName());
+         }
+         final Parameters p = new Parameters(
+           BalancingPolicy.Node.INSTANCE, 1,
+           NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
+           Collections.<String> emptySet(), Collections.<String> emptySet(),
+           sourceNodes, false);
+ 
+         conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+         final int r = Balancer.run(namenodes, p, conf);
+         assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
+       }
+     } finally {
+       cluster.shutdown();
+     }
+   }
 -  
 +  public void integrationTestWithStripedFile(Configuration conf) throws Exception {
 +    initConfWithStripe(conf);
 +    doTestBalancerWithStripedFile(conf);
 +  }
 +
 +  @Test(timeout = 100000)
 +  public void testBalancerWithStripedFile() throws Exception {
 +    Configuration conf = new Configuration();
 +    initConfWithStripe(conf);
 +    doTestBalancerWithStripedFile(conf);
 +  }
 +
 +  private void doTestBalancerWithStripedFile(Configuration conf) throws Exception {
 +    int numOfDatanodes = dataBlocks + parityBlocks + 2;
 +    int numOfRacks = dataBlocks;
 +    long capacity = 20 * DEFAULT_STRIPE_BLOCK_SIZE;
 +    long[] capacities = new long[numOfDatanodes];
 +    for (int i = 0; i < capacities.length; i++) {
 +      capacities[i] = capacity;
 +    }
 +    String[] racks = new String[numOfDatanodes];
 +    for (int i = 0; i < numOfDatanodes; i++) {
 +      racks[i] = "/rack" + (i % numOfRacks);
 +    }
 +    cluster = new MiniDFSCluster.Builder(conf)
 +        .numDataNodes(numOfDatanodes)
 +        .racks(racks)
 +        .simulatedCapacities(capacities)
 +        .build();
 +
 +    try {
 +      cluster.waitActive();
 +      client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
 +          ClientProtocol.class).getProxy();
 +      client.createErasureCodingZone("/", null);
 +
 +      long totalCapacity = sum(capacities);
 +
 +      // fill up the cluster with 30% data. It'll be 45% full plus parity.
 +      long fileLen = totalCapacity * 3 / 10;
 +      long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks;
 +      FileSystem fs = cluster.getFileSystem(0);
 +      DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong());
 +
 +      // verify locations of striped blocks
 +      LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
 +      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
 +
 +      // add one datanode
 +      String newRack = "/rack" + (++numOfRacks);
 +      cluster.startDataNodes(conf, 1, true, null,
 +          new String[]{newRack}, null, new long[]{capacity});
 +      totalCapacity += capacity;
 +      cluster.triggerHeartbeats();
 +
 +      // run balancer and validate results
 +      Balancer.Parameters p = Balancer.Parameters.DEFAULT;
 +      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
 +      runBalancer(conf, totalUsedSpace, totalCapacity, p, 0);
 +
 +      // verify locations of striped blocks
 +      locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
 +      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
- 
 +    } finally {
 +      cluster.shutdown();
 +    }
 +  }
 +
    /**
     * @param args
     */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
index bae4f1d,ceef9f2..d6213ff
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
@@@ -70,18 -71,18 +71,6 @@@ public class TestBlockInfo 
    }
  
    @Test
--  public void testCopyConstructor() {
-     BlockInfo old = new BlockInfoContiguous((short) 3);
 -    BlockInfoContiguous old = new BlockInfoContiguous((short) 3);
--    try {
-       BlockInfo copy = new BlockInfoContiguous((BlockInfoContiguous)old);
-       assertEquals(old.getBlockCollection(), copy.getBlockCollection());
 -      BlockInfoContiguous copy = new BlockInfoContiguous(old);
 -      assertEquals(old.getBlockCollectionId(), copy.getBlockCollectionId());
--      assertEquals(old.getCapacity(), copy.getCapacity());
--    } catch (Exception e) {
--      Assert.fail("Copy constructor throws exception: " + e);
--    }
--  }
--
--  @Test
    public void testReplaceStorage() throws Exception {
  
      // Create two dummy storages.