You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/30 20:22:36 UTC
[02/52] [abbrv] hadoop git commit: Merge remote-tracking branch
'apache/trunk' into HDFS-7285
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 6093776,0000000..7b21cbe
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@@ -1,258 -1,0 +1,253 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+
+/**
+ * Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
+ *
+ * We still use triplets to store DatanodeStorageInfo for each block in the
+ * block group, as well as the previous/next block in the corresponding
+ * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
+ * are sorted and strictly mapped to the corresponding block.
+ *
+ * Normally each block belonging to group is stored in only one DataNode.
+ * However, it is possible that some block is over-replicated. Thus the triplet
+ * array's size can be larger than (m+k). Thus currently we use an extra byte
+ * array to record the block index for each triplet.
+ */
+public class BlockInfoStriped extends BlockInfo {
+ private final ErasureCodingPolicy ecPolicy;
+ /**
+ * Always the same size with triplets. Record the block index for each triplet
+ * TODO: actually this is only necessary for over-replicated block. Thus can
+ * be further optimized to save memory usage.
+ */
+ private byte[] indices;
+
+ public BlockInfoStriped(Block blk, ErasureCodingPolicy ecPolicy) {
+ super(blk, (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()));
+ indices = new byte[ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()];
+ initIndices();
+ this.ecPolicy = ecPolicy;
+ }
+
- BlockInfoStriped(BlockInfoStriped b) {
- this(b, b.getErasureCodingPolicy());
- this.setBlockCollection(b.getBlockCollection());
- }
-
+ public short getTotalBlockNum() {
+ return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
+ }
+
+ public short getDataBlockNum() {
+ return (short) ecPolicy.getNumDataUnits();
+ }
+
+ public short getParityBlockNum() {
+ return (short) ecPolicy.getNumParityUnits();
+ }
+
+ /**
+ * If the block is committed/completed and its length is less than a full
+ * stripe, it returns the the number of actual data blocks.
+ * Otherwise it returns the number of data units specified by erasure coding policy.
+ */
+ public short getRealDataBlockNum() {
+ if (isComplete() || getBlockUCState() == BlockUCState.COMMITTED) {
+ return (short) Math.min(getDataBlockNum(),
+ (getNumBytes() - 1) / BLOCK_STRIPED_CELL_SIZE + 1);
+ } else {
+ return getDataBlockNum();
+ }
+ }
+
+ public short getRealTotalBlockNum() {
+ return (short) (getRealDataBlockNum() + getParityBlockNum());
+ }
+
+ public ErasureCodingPolicy getErasureCodingPolicy() {
+ return ecPolicy;
+ }
+
+ private void initIndices() {
+ for (int i = 0; i < indices.length; i++) {
+ indices[i] = -1;
+ }
+ }
+
+ private int findSlot() {
+ int i = getTotalBlockNum();
+ for (; i < getCapacity(); i++) {
+ if (getStorageInfo(i) == null) {
+ return i;
+ }
+ }
+ // need to expand the triplet size
+ ensureCapacity(i + 1, true);
+ return i;
+ }
+
+ @Override
+ boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
+ int blockIndex = BlockIdManager.getBlockIndex(reportedBlock);
+ int index = blockIndex;
+ DatanodeStorageInfo old = getStorageInfo(index);
+ if (old != null && !old.equals(storage)) { // over replicated
+ // check if the storage has been stored
+ int i = findStorageInfo(storage);
+ if (i == -1) {
+ index = findSlot();
+ } else {
+ return true;
+ }
+ }
+ addStorage(storage, index, blockIndex);
+ return true;
+ }
+
+ private void addStorage(DatanodeStorageInfo storage, int index,
+ int blockIndex) {
+ setStorageInfo(index, storage);
+ setNext(index, null);
+ setPrevious(index, null);
+ indices[index] = (byte) blockIndex;
+ }
+
+ private int findStorageInfoFromEnd(DatanodeStorageInfo storage) {
+ final int len = getCapacity();
+ for(int idx = len - 1; idx >= 0; idx--) {
+ DatanodeStorageInfo cur = getStorageInfo(idx);
+ if (storage.equals(cur)) {
+ return idx;
+ }
+ }
+ return -1;
+ }
+
+ int getStorageBlockIndex(DatanodeStorageInfo storage) {
+ int i = this.findStorageInfo(storage);
+ return i == -1 ? -1 : indices[i];
+ }
+
+ /**
+ * Identify the block stored in the given datanode storage. Note that
+ * the returned block has the same block Id with the one seen/reported by the
+ * DataNode.
+ */
+ Block getBlockOnStorage(DatanodeStorageInfo storage) {
+ int index = getStorageBlockIndex(storage);
+ if (index < 0) {
+ return null;
+ } else {
+ Block block = new Block(this);
+ block.setBlockId(this.getBlockId() + index);
+ return block;
+ }
+ }
+
+ @Override
+ boolean removeStorage(DatanodeStorageInfo storage) {
+ int dnIndex = findStorageInfoFromEnd(storage);
+ if (dnIndex < 0) { // the node is not found
+ return false;
+ }
+ assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
+ "Block is still in the list and must be removed first.";
+ // set the triplet to null
+ setStorageInfo(dnIndex, null);
+ setNext(dnIndex, null);
+ setPrevious(dnIndex, null);
+ indices[dnIndex] = -1;
+ return true;
+ }
+
+ private void ensureCapacity(int totalSize, boolean keepOld) {
+ if (getCapacity() < totalSize) {
+ Object[] old = triplets;
+ byte[] oldIndices = indices;
+ triplets = new Object[totalSize * 3];
+ indices = new byte[totalSize];
+ initIndices();
+
+ if (keepOld) {
+ System.arraycopy(old, 0, triplets, 0, old.length);
+ System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
+ }
+ }
+ }
+
+ @Override
+ void replaceBlock(BlockInfo newBlock) {
+ assert newBlock instanceof BlockInfoStriped;
+ BlockInfoStriped newBlockGroup = (BlockInfoStriped) newBlock;
+ final int size = getCapacity();
+ newBlockGroup.ensureCapacity(size, false);
+ for (int i = 0; i < size; i++) {
+ final DatanodeStorageInfo storage = this.getStorageInfo(i);
+ if (storage != null) {
+ final int blockIndex = indices[i];
+ final boolean removed = storage.removeBlock(this);
+ assert removed : "currentBlock not found.";
+
+ newBlockGroup.addStorage(storage, i, blockIndex);
+ storage.insertToList(newBlockGroup);
+ }
+ }
+ }
+
+ public long spaceConsumed() {
+ // In case striped blocks, total usage by this striped blocks should
+ // be the total of data blocks and parity blocks because
+ // `getNumBytes` is the total of actual data block size.
+ return StripedBlockUtil.spaceConsumedByStripedBlock(getNumBytes(),
+ ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits(),
+ BLOCK_STRIPED_CELL_SIZE);
+ }
+
+ @Override
+ public final boolean isStriped() {
+ return true;
+ }
+
+ @Override
+ public int numNodes() {
+ assert this.triplets != null : "BlockInfo is not initialized";
+ assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+ int num = 0;
+ for (int idx = getCapacity()-1; idx >= 0; idx--) {
+ if (getStorageInfo(idx) != null) {
+ num++;
+ }
+ }
+ return num;
+ }
+
+ @Override
+ final boolean hasNoStorage() {
+ final int len = getCapacity();
+ for(int idx = 0; idx < len; idx++) {
+ if (getStorageInfo(idx) != null) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index ae08825,95933d2..6c6d758
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@@ -89,12 -84,8 +88,13 @@@ import org.apache.hadoop.hdfs.server.pr
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+ import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
+
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
@@@ -216,8 -202,8 +216,8 @@@ public class BlockManager implements Bl
* Maps a StorageID to the set of blocks that are "extra" for this
* DataNode. We'll eventually remove these extras.
*/
- public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap =
+ public final Map<String, LightWeightLinkedSet<BlockInfo>> excessReplicateMap =
- new TreeMap<>();
+ new HashMap<>();
/**
* Store set of Blocks that need to be replicated 1 or more times.
@@@ -689,26 -662,20 +689,25 @@@
*/
private BlockInfo completeBlock(final BlockCollection bc,
final int blkIndex, boolean force) throws IOException {
- if(blkIndex < 0)
+ if (blkIndex < 0) {
return null;
+ }
BlockInfo curBlock = bc.getBlocks()[blkIndex];
- if(curBlock.isComplete())
+ if (curBlock.isComplete()) {
return curBlock;
+ }
int numNodes = curBlock.numNodes();
- if (!force && numNodes < minReplication)
+ if (!force && !hasMinStorage(curBlock, numNodes)) {
throw new IOException("Cannot complete block: " +
"block does not satisfy minimal replication requirement.");
- if(!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED)
+ }
+ if (!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
throw new IOException(
"Cannot complete block: block has not been COMMITTED by the client");
- BlockInfo completeBlock = curBlock.convertToCompleteBlock();
+ }
+
+ final BlockInfo completeBlock = curBlock.convertToCompleteBlock();
-
// replace penultimate block in file
bc.setBlock(blkIndex, completeBlock);
@@@ -1276,9 -1186,8 +1276,9 @@@
" corrupt as it does not belong to any file", b);
addToInvalidates(b.corrupted, node);
return;
-- }
- short expectedReplicas = b.corrupted.getReplication();
++ }
+ short expectedReplicas =
- getExpectedReplicaNum(b.stored.getBlockCollection(), b.stored);
++ getExpectedReplicaNum(b.stored);
// Add replica to the data-node if it is not already there
if (storageInfo != null) {
@@@ -1446,10 -1350,10 +1446,10 @@@
namesystem.writeLock();
try {
synchronized (neededReplications) {
- for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
- for (BlockInfo block : blocksToReplicate.get(priority)) {
+ for (int priority = 0; priority < blocksToRecover.size(); priority++) {
+ for (BlockInfo block : blocksToRecover.get(priority)) {
// block should belong to a file
- bc = blocksMap.getBlockCollection(block);
+ bc = getBlockCollection(block);
// abandoned block or block reopened for append
if (bc == null
|| (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
@@@ -1458,20 -1362,17 +1458,20 @@@
continue;
}
- requiredReplication = getExpectedReplicaNum(bc, block);
+ requiredReplication = getExpectedReplicaNum(block);
// get a source data-node
- containingNodes = new ArrayList<DatanodeDescriptor>();
- List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
+ containingNodes = new ArrayList<>();
+ List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
NumberReplicas numReplicas = new NumberReplicas();
- srcNode = chooseSourceDatanode(
- block, containingNodes, liveReplicaNodes, numReplicas,
- priority);
- if(srcNode == null) { // block can not be replicated from any node
- LOG.debug("Block " + block + " cannot be repl from any node");
+ List<Short> liveBlockIndices = new ArrayList<>();
+ final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
+ containingNodes, liveReplicaNodes, numReplicas,
+ liveBlockIndices, priority);
+ if(srcNodes == null || srcNodes.length == 0) {
+ // block can not be replicated from any node
+ LOG.debug("Block " + block + " cannot be recovered " +
+ "from any node");
continue;
}
@@@ -1588,32 -1474,7 +1588,32 @@@
}
// Add block to the to be replicated list
- rw.srcNode.addBlockToBeReplicated(block, targets);
+ if (block.isStriped()) {
+ assert rw instanceof ErasureCodingWork;
+ assert rw.targets.length > 0;
- String src = block.getBlockCollection().getName();
++ String src = getBlockCollection(block).getName();
+ ErasureCodingZone ecZone = null;
+ try {
+ ecZone = namesystem.getErasureCodingZoneForPath(src);
+ } catch (IOException e) {
+ blockLog
+ .warn("Failed to get the EC zone for the file {} ", src);
+ }
+ if (ecZone == null) {
+ blockLog.warn("No erasure coding policy found for the file {}. "
+ + "So cannot proceed for recovery", src);
+ // TODO: we may have to revisit later for what we can do better to
+ // handle this case.
+ continue;
+ }
+ rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
+ new ExtendedBlock(namesystem.getBlockPoolId(), block),
+ rw.srcNodes, rw.targets,
+ ((ErasureCodingWork) rw).liveBlockIndicies,
+ ecZone.getErasureCodingPolicy());
+ } else {
+ rw.srcNodes[0].addBlockToBeReplicated(block, targets);
+ }
scheduledWork++;
DatanodeStorageInfo.incrementBlocksScheduled(targets);
@@@ -2079,8 -1924,8 +2080,8 @@@
private void removeZombieReplicas(BlockReportContext context,
DatanodeStorageInfo zombie) {
LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
-- "longer exists on the DataNode.",
-- Long.toHexString(context.getReportId()), zombie.getStorageID());
++ "longer exists on the DataNode.",
++ Long.toHexString(context.getReportId()), zombie.getStorageID());
assert(namesystem.hasWriteLock());
Iterator<BlockInfo> iter = zombie.getBlockIterator();
int prevBlocks = zombie.numBlocks();
@@@ -2324,10 -2164,10 +2325,10 @@@
// OpenFileBlocks only inside snapshots also will be added to safemode
// threshold. So we need to update such blocks to safemode
// refer HDFS-5283
- if (namesystem.isInSnapshot(storedBlock.getBlockCollection())) {
+ if (namesystem.isInSnapshot(storedBlock)) {
int numOfReplicas = storedBlock.getUnderConstructionFeature()
.getNumExpectedLocations();
- namesystem.incrementSafeBlockCount(numOfReplicas);
+ namesystem.incrementSafeBlockCount(numOfReplicas, storedBlock);
}
//and fall through to next clause
}
@@@ -2720,8 -2541,8 +2721,8 @@@
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
- && numCurrentReplica >= minReplication) {
+ && hasMinStorage(storedBlock, numCurrentReplica)) {
- completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
+ completeBlock(getBlockCollection(storedBlock), storedBlock, false);
} else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
// check whether safe replication is reached for the block
// only complete blocks are counted towards that.
@@@ -2760,10 -2580,11 +2761,11 @@@
// it will happen in next block report otherwise.
return block;
}
- BlockCollection bc = storedBlock.getBlockCollection();
+ BlockCollection bc = getBlockCollection(storedBlock);
+ assert bc != null : "Block must belong to a file";
// add block to the datanode
- AddBlockResult result = storageInfo.addBlock(storedBlock);
+ AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock);
int curReplicaDelta;
if (result == AddBlockResult.ADDED) {
@@@ -3817,9 -3491,9 +3809,9 @@@
*/
public void checkReplication(BlockCollection bc) {
for (BlockInfo block : bc.getBlocks()) {
- short expected = getExpectedReplicaNum(bc, block);
- final short expected = block.getReplication();
++ short expected = getExpectedReplicaNum(block);
final NumberReplicas n = countNodes(block);
- if (isNeededReplication(block, expected, n.liveReplicas())) {
+ if (isNeededReplication(block, n.liveReplicas())) {
neededReplications.add(block, n.liveReplicas(),
n.decommissionedAndDecommissioning(), expected);
} else if (n.liveReplicas() > expected) {
@@@ -3978,18 -3605,15 +3968,17 @@@
* A block needs replication if the number of replicas is less than expected
* or if it does not have enough racks.
*/
- boolean isNeededReplication(BlockInfo storedBlock, int expected, int current) {
+ boolean isNeededReplication(BlockInfo storedBlock, int current) {
- int expected = storedBlock.getReplication();
- return current < expected || !blockHasEnoughRacks(storedBlock);
++ int expected = getExpectedReplicaNum(storedBlock);
+ return current < expected || !blockHasEnoughRacks(storedBlock, expected);
}
-
- public short getExpectedReplicaNum(BlockCollection bc, BlockInfo block) {
- if (block.isStriped()) {
- return ((BlockInfoStriped) block).getRealTotalBlockNum();
- } else {
- return bc.getPreferredBlockReplication();
- }
+
+ public short getExpectedReplicaNum(BlockInfo block) {
- return block.getReplication();
++ return block.isStriped() ?
++ ((BlockInfoStriped) block).getRealTotalBlockNum() :
++ block.getReplication();
}
-
+
public long getMissingBlocksCount() {
// not locking
return this.neededReplications.getCorruptBlockSize();
@@@ -4005,22 -3629,13 +3994,22 @@@
return blocksMap.addBlockCollection(block, bc);
}
- public BlockCollection getBlockCollection(BlockInfo b) {
- return namesystem.getBlockCollection(b.getBlockCollectionId());
+ /**
+ * Do some check when adding a block to blocksmap.
+ * For HDFS-7994 to check whether then block is a NonEcBlockUsingStripedID.
+ *
+ */
+ public BlockInfo addBlockCollectionWithCheck(
+ BlockInfo block, BlockCollection bc) {
+ if (!hasNonEcBlockUsingStripedID && !block.isStriped() &&
+ BlockIdManager.isStripedBlockID(block.getBlockId())) {
+ hasNonEcBlockUsingStripedID = true;
+ }
+ return addBlockCollection(block, bc);
}
- public BlockCollection getBlockCollection(Block b) {
- return blocksMap.getBlockCollection(b);
- /** @return an iterator of the datanodes. */
- public Iterable<DatanodeStorageInfo> getStorages(final Block block) {
- return blocksMap.getStorages(block);
++ public BlockCollection getBlockCollection(BlockInfo b) {
++ return namesystem.getBlockCollection(b.getBlockCollectionId());
}
public int numCorruptReplicas(Block block) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
index 58b455e,88cf06d..0e92779
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java
@@@ -55,11 -58,11 +58,11 @@@ public class BlockUnderConstructionFeat
private Block truncateBlock;
public BlockUnderConstructionFeature(Block blk,
- BlockUCState state, DatanodeStorageInfo[] targets) {
+ BlockUCState state, DatanodeStorageInfo[] targets, boolean isStriped) {
assert getBlockUCState() != COMPLETE :
- "BlockUnderConstructionFeature cannot be in COMPLETE state";
+ "BlockUnderConstructionFeature cannot be in COMPLETE state";
this.blockUCState = state;
- setExpectedLocations(blk, targets);
+ setExpectedLocations(blk, targets, isStriped);
}
/** Set expected locations */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index 5bfae42,33c68f3..51d62c1
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@@ -129,18 -118,14 +125,18 @@@ class BlocksMap
if (blockInfo == null)
return;
- blockInfo.setBlockCollection(null);
- final int size = blockInfo instanceof BlockInfoContiguous ?
- blockInfo.numNodes() : blockInfo.getCapacity();
+ blockInfo.setBlockCollectionId(INodeId.INVALID_INODE_ID);
- for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) {
++ final int size = blockInfo.isStriped() ?
++ blockInfo.getCapacity() : blockInfo.numNodes();
+ for(int idx = size - 1; idx >= 0; idx--) {
DatanodeDescriptor dn = blockInfo.getDatanode(idx);
- dn.removeBlock(blockInfo); // remove from the list and wipe the location
+ if (dn != null) {
+ dn.removeBlock(blockInfo); // remove from the list and wipe the location
+ }
}
}
-
- /** Returns the block object it it exists in the map. */
+
+ /** Returns the block object if it exists in the map. */
BlockInfo getStoredBlock(Block b) {
return blocks.get(b);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index a4d5442,7e3c59b..29e541c
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@@ -38,11 -39,10 +38,12 @@@ import org.apache.hadoop.fs.StorageType
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.util.EnumCounters;
@@@ -223,12 -222,8 +224,11 @@@ public class DatanodeDescriptor extend
/** A queue of blocks to be replicated by this datanode */
private final BlockQueue<BlockTargetPair> replicateBlocks =
new BlockQueue<>();
+ /** A queue of blocks to be erasure coded by this datanode */
+ private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks =
+ new BlockQueue<>();
/** A queue of blocks to be recovered by this datanode */
- private final BlockQueue<BlockInfo> recoverBlocks =
- new BlockQueue<>();
+ private final BlockQueue<BlockInfo> recoverBlocks = new BlockQueue<>();
/** A set of blocks to be invalidated by this datanode */
private final LightWeightHashSet<Block> invalidateBlocks =
new LightWeightHashSet<>();
@@@ -696,24 -662,27 +696,34 @@@
}
}
+ @VisibleForTesting
+ public boolean containsInvalidateBlock(Block block) {
+ synchronized (invalidateBlocks) {
+ return invalidateBlocks.contains(block);
+ }
+ }
+
/**
- * @return Approximate number of blocks currently scheduled to be written
+ * Return the sum of remaining spaces of the specified type. If the remaining
+ * space of a storage is less than minSize, it won't be counted toward the
+ * sum.
+ *
+ * @param t The storage type. If null, the type is ignored.
+ * @param minSize The minimum free space required.
+ * @return the sum of remaining spaces that are bigger than minSize.
*/
- public long getRemaining(StorageType t) {
+ public long getRemaining(StorageType t, long minSize) {
long remaining = 0;
- for(DatanodeStorageInfo s : getStorageInfos()) {
- if (s.getStorageType() == t) {
- remaining += s.getRemaining();
+ for (DatanodeStorageInfo s : getStorageInfos()) {
+ if (s.getState() == State.NORMAL &&
+ (t == null || s.getStorageType() == t)) {
+ long r = s.getRemaining();
+ if (r >= minSize) {
+ remaining += r;
+ }
}
}
- return remaining;
+ return remaining;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index 5e3cac2,1a20ab0..a80bfd6
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@@ -233,16 -234,16 +234,16 @@@ public class DecommissionManager
}
/**
- * Checks whether a block is sufficiently replicated for decommissioning.
- * Full-strength replication is not always necessary, hence "sufficient".
+ * Checks whether a block is sufficiently replicated/stored for
+ * decommissioning. For replicated blocks or striped blocks, full-strength
+ * replication or storage is not always necessary, hence "sufficient".
* @return true if sufficient, else false.
*/
- private boolean isSufficientlyReplicated(BlockInfo block,
- BlockCollection bc,
+ private boolean isSufficient(BlockInfo block, BlockCollection bc,
NumberReplicas numberReplicas) {
- final int numExpected = blockManager.getExpectedReplicaNum(bc, block);
- final int numExpected = block.getReplication();
++ final int numExpected = blockManager.getExpectedReplicaNum(block);
final int numLive = numberReplicas.liveReplicas();
- if (!blockManager.isNeededReplication(block, numExpected, numLive)) {
+ if (!blockManager.isNeededReplication(block, numLive)) {
// Block doesn't need replication. Skip.
LOG.trace("Block {} does not need replication.", block);
return true;
@@@ -274,11 -274,12 +275,12 @@@
return false;
}
- private void logBlockReplicationInfo(BlockInfo block, BlockCollection bc,
- private static void logBlockReplicationInfo(BlockInfo block,
++ private void logBlockReplicationInfo(BlockInfo block,
+ BlockCollection bc,
DatanodeDescriptor srcNode, NumberReplicas num,
Iterable<DatanodeStorageInfo> storages) {
int curReplicas = num.liveReplicas();
- int curExpectedReplicas = blockManager.getExpectedReplicaNum(bc, block);
- int curExpectedReplicas = block.getReplication();
++ int curExpectedReplicas = blockManager.getExpectedReplicaNum(block);
StringBuilder nodeList = new StringBuilder();
for (DatanodeStorageInfo storage : storages) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
@@@ -530,8 -536,10 +533,9 @@@
continue;
}
+ BlockCollection bc = namesystem.getBlockCollection(bcId);
final NumberReplicas num = blockManager.countNodes(block);
final int liveReplicas = num.liveReplicas();
- final int curReplicas = liveReplicas;
// Schedule under-replicated blocks for replication if not already
// pending
@@@ -542,9 -549,9 +545,9 @@@
namesystem.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode.
blockManager.neededReplications.add(block,
- curReplicas,
+ liveReplicas,
num.decommissionedAndDecommissioning(),
- blockManager.getExpectedReplicaNum(bc, block));
- block.getReplication());
++ blockManager.getExpectedReplicaNum(block));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java
index 479ee4c,0000000..7a52273
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java
@@@ -1,85 -1,0 +1,86 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.util.SequentialNumber;
+
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BLOCK_GROUP_INDEX_MASK;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_BLOCKS_IN_GROUP;
+
+/**
+ * Generate the next valid block group ID by incrementing the maximum block
+ * group ID allocated so far, with the first 2^10 block group IDs reserved.
+ * HDFS-EC introduces a hierarchical protocol to name blocks and groups:
+ * Contiguous: {reserved block IDs | flag | block ID}
+ * Striped: {reserved block IDs | flag | block group ID | index in group}
+ *
+ * Following n bits of reserved block IDs, The (n+1)th bit in an ID
+ * distinguishes contiguous (0) and striped (1) blocks. For a striped block,
+ * bits (n+2) to (64-m) represent the ID of its block group, while the last m
+ * bits represent its index of the group. The value m is determined by the
+ * maximum number of blocks in a group (MAX_BLOCKS_IN_GROUP).
+ *
+ * Note that the {@link #nextValue()} methods requires external lock to
+ * guarantee IDs have no conflicts.
+ */
+@InterfaceAudience.Private
+public class SequentialBlockGroupIdGenerator extends SequentialNumber {
+
+ private final BlockManager blockManager;
+
+ SequentialBlockGroupIdGenerator(BlockManager blockManagerRef) {
+ super(Long.MIN_VALUE);
+ this.blockManager = blockManagerRef;
+ }
+
+ @Override // NumberGenerator
+ public long nextValue() {
+ skipTo((getCurrentValue() & ~BLOCK_GROUP_INDEX_MASK) + MAX_BLOCKS_IN_GROUP);
+ // Make sure there's no conflict with existing random block IDs
+ final Block b = new Block(getCurrentValue());
+ while (hasValidBlockInRange(b)) {
+ skipTo(getCurrentValue() + MAX_BLOCKS_IN_GROUP);
+ b.setBlockId(getCurrentValue());
+ }
+ if (b.getBlockId() >= 0) {
+ throw new IllegalStateException("All negative block group IDs are used, "
+ + "growing into positive IDs, "
+ + "which might conflict with non-erasure coded blocks.");
+ }
+ return getCurrentValue();
+ }
+
+ /**
+ * @param b A block object whose id is set to the starting point for check
+ * @return true if any ID in the range
- * {id, id+HdfsConstants.MAX_BLOCKS_IN_GROUP} is pointed-to by a file
++ * {id, id+HdfsConstants.MAX_BLOCKS_IN_GROUP} is pointed-to by a stored
++ * block.
+ */
+ private boolean hasValidBlockInRange(Block b) {
+ final long id = b.getBlockId();
+ for (int i = 0; i < MAX_BLOCKS_IN_GROUP; i++) {
+ b.setBlockId(id + i);
- if (blockManager.getBlockCollection(b) != null) {
++ if (blockManager.getStoredBlock(b) != null) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java
index 6074784,f053b7b..631b435
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java
@@@ -19,6 -19,8 +19,7 @@@ package org.apache.hadoop.hdfs.server.b
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+ import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.util.SequentialNumber;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 1b695e3,5bc50b0..82a0f62
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@@ -2158,11 -2103,14 +2128,11 @@@ public class DataNode extends Reconfigu
//
// Header info
//
- Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
- if (isBlockTokenEnabled) {
- accessToken = blockPoolTokenSecretManager.generateToken(b,
- EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
- }
+ Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
+ EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
long writeTimeout = dnConf.socketWriteTimeout +
- HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
+ HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock);
DataEncryptionKeyFactory keyFactory =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
index 22d821f,0000000..a0ac033
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java
@@@ -1,163 -1,0 +1,163 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.hdfs.XAttrHelper;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.WritableUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_ERASURECODING_ZONE;
+
+/**
+ * Manages the list of erasure coding zones in the filesystem.
+ * <p/>
+ * The ErasureCodingZoneManager has its own lock, but relies on the FSDirectory
+ * lock being held for many operations. The FSDirectory lock should not be
+ * taken if the manager lock is already held.
+ * TODO: consolidate zone logic w/ encrypt. zones {@link EncryptionZoneManager}
+ */
+public class ErasureCodingZoneManager {
+ private final FSDirectory dir;
+
+ /**
+ * Construct a new ErasureCodingZoneManager.
+ *
+ * @param dir Enclosing FSDirectory
+ */
+ public ErasureCodingZoneManager(FSDirectory dir) {
+ this.dir = dir;
+ }
+
+ ErasureCodingPolicy getErasureCodingPolicy(INodesInPath iip) throws IOException {
+ ErasureCodingZone ecZone = getErasureCodingZone(iip);
+ return ecZone == null ? null : ecZone.getErasureCodingPolicy();
+ }
+
+ ErasureCodingZone getErasureCodingZone(INodesInPath iip) throws IOException {
+ assert dir.hasReadLock();
+ Preconditions.checkNotNull(iip, "INodes cannot be null");
+ List<INode> inodes = iip.getReadOnlyINodes();
+ for (int i = inodes.size() - 1; i >= 0; i--) {
+ final INode inode = inodes.get(i);
+ if (inode == null) {
+ continue;
+ }
+ // We don't allow symlinks in an EC zone, or pointing to a file/dir in
+ // an EC. Therefore if a symlink is encountered, the dir shouldn't have
+ // EC
+ // TODO: properly support symlinks in EC zones
+ if (inode.isSymlink()) {
+ return null;
+ }
+ final List<XAttr> xAttrs = inode.getXAttrFeature() == null ?
+ new ArrayList<XAttr>(0)
+ : inode.getXAttrFeature().getXAttrs();
+ for (XAttr xAttr : xAttrs) {
- if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixName(xAttr))) {
++ if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixedName(xAttr))) {
+ ByteArrayInputStream bIn=new ByteArrayInputStream(xAttr.getValue());
+ DataInputStream dIn=new DataInputStream(bIn);
+ String ecPolicyName = WritableUtils.readString(dIn);
+ ErasureCodingPolicy ecPolicy = dir.getFSNamesystem()
+ .getErasureCodingPolicyManager().getPolicy(ecPolicyName);
+ return new ErasureCodingZone(dir.getInode(inode.getId())
+ .getFullPathName(), ecPolicy);
+ }
+ }
+ }
+ return null;
+ }
+
+ List<XAttr> createErasureCodingZone(final INodesInPath srcIIP,
+ ErasureCodingPolicy ecPolicy) throws IOException {
+ assert dir.hasWriteLock();
+ Preconditions.checkNotNull(srcIIP, "INodes cannot be null");
+ String src = srcIIP.getPath();
+ if (dir.isNonEmptyDirectory(srcIIP)) {
+ throw new IOException(
+ "Attempt to create an erasure coding zone for a " +
+ "non-empty directory " + src);
+ }
+ if (srcIIP.getLastINode() != null &&
+ !srcIIP.getLastINode().isDirectory()) {
+ throw new IOException("Attempt to create an erasure coding zone " +
+ "for a file " + src);
+ }
+ if (getErasureCodingPolicy(srcIIP) != null) {
+ throw new IOException("Directory " + src + " is already in an " +
+ "erasure coding zone.");
+ }
+
+ // System default erasure coding policy will be used since no specified.
+ if (ecPolicy == null) {
+ ecPolicy = ErasureCodingPolicyManager.getSystemDefaultPolicy();
+ }
+
+ final XAttr ecXAttr;
+ DataOutputStream dOut = null;
+ try {
+ ByteArrayOutputStream bOut = new ByteArrayOutputStream();
+ dOut = new DataOutputStream(bOut);
+ WritableUtils.writeString(dOut, ecPolicy.getName());
+ ecXAttr = XAttrHelper.buildXAttr(XATTR_ERASURECODING_ZONE,
+ bOut.toByteArray());
+ } finally {
+ IOUtils.closeStream(dOut);
+ }
+ final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
+ xattrs.add(ecXAttr);
+ FSDirXAttrOp.unprotectedSetXAttrs(dir, src, xattrs,
+ EnumSet.of(XAttrSetFlag.CREATE));
+ return xattrs;
+ }
+
+ void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP, String src)
+ throws IOException {
+ assert dir.hasReadLock();
+ final ErasureCodingZone srcZone = getErasureCodingZone(srcIIP);
+ final ErasureCodingZone dstZone = getErasureCodingZone(dstIIP);
+ if (srcZone != null && srcZone.getDir().equals(src) && dstZone == null) {
+ return;
+ }
+ final ErasureCodingPolicy srcECPolicy =
+ srcZone != null ? srcZone.getErasureCodingPolicy() : null;
+ final ErasureCodingPolicy dstECPolicy =
+ dstZone != null ? dstZone.getErasureCodingPolicy() : null;
+ if (srcECPolicy != null && !srcECPolicy.equals(dstECPolicy) ||
+ dstECPolicy != null && !dstECPolicy.equals(srcECPolicy)) {
+ throw new IOException(
+ src + " can't be moved because the source and destination have " +
+ "different erasure coding policies.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
index 9b08092,df0bc20..4bed13e
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
@@@ -122,7 -122,7 +122,7 @@@ public class FSDirAttrOp
" does not exist.");
}
boolean changed = unprotectedSetTimes(fsd, inode, mtime, atime, true,
-- iip.getLatestSnapshotId());
++ iip.getLatestSnapshotId());
if (changed) {
fsd.getEditLog().logTimes(src, mtime, atime);
}
@@@ -399,30 -397,26 +397,30 @@@
}
static BlockInfo[] unprotectedSetReplication(
- FSDirectory fsd, String src, short replication, short[] blockRepls)
+ FSDirectory fsd, String src, short replication)
throws QuotaExceededException, UnresolvedLinkException,
- SnapshotAccessControlException {
+ SnapshotAccessControlException, UnsupportedActionException {
assert fsd.hasWriteLock();
+ final BlockManager bm = fsd.getBlockManager();
final INodesInPath iip = fsd.getINodesInPath4Write(src, true);
final INode inode = iip.getLastINode();
if (inode == null || !inode.isFile()) {
return null;
}
INodeFile file = inode.asFile();
+ if (file.isStriped()) {
+ throw new UnsupportedActionException(
+ "Cannot set replication to a file with striped blocks");
+ }
- final short oldBR = file.getPreferredBlockReplication();
+ // Make sure the directory has sufficient quotas
+ short oldBR = file.getPreferredBlockReplication();
- // before setFileReplication, check for increasing block replication.
- // if replication > oldBR, then newBR == replication.
- // if replication < oldBR, we don't know newBR yet.
- if (replication > oldBR) {
- long dsDelta = file.storagespaceConsumed(null).getStorageSpace() / oldBR;
- fsd.updateCount(iip, 0L, dsDelta, oldBR, replication, true);
+ // Ensure the quota does not exceed
+ if (oldBR < replication) {
+ long size = file.computeFileSize(true, true);
+ fsd.updateCount(iip, 0L, size, oldBR, replication, true);
}
file.setFileReplication(replication, iip.getLatestSnapshotId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 68aef76,e9d0806..e480959
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@@ -46,7 -44,6 +46,8 @@@ import org.apache.hadoop.hdfs.protocol.
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
++
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
@@@ -529,32 -516,15 +530,32 @@@ class FSDirWriteFileOp
final INodeFile fileINode = inodesInPath.getLastINode().asFile();
Preconditions.checkState(fileINode.isUnderConstruction());
- // check quota limits and updated space consumed
- fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
- fileINode.getFileReplication(), true);
-
// associate new last block for the file
- BlockInfo blockInfo = new BlockInfoContiguous(block,
- fileINode.getFileReplication());
- blockInfo.convertToBlockUnderConstruction(
- HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
+ final BlockInfo blockInfo;
+ if (isStriped) {
+ ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
+ fsd.getFSNamesystem(), inodesInPath);
+ ErasureCodingPolicy ecPolicy = ecZone.getErasureCodingPolicy();
+ short numDataUnits = (short) ecPolicy.getNumDataUnits();
+ short numParityUnits = (short) ecPolicy.getNumParityUnits();
+ short numLocations = (short) (numDataUnits + numParityUnits);
+
+ // check quota limits and updated space consumed
+ fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
+ numLocations, true);
+ blockInfo = new BlockInfoStriped(block, ecPolicy);
+ blockInfo.convertToBlockUnderConstruction(
+ HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
+ } else {
+ // check quota limits and updated space consumed
+ fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
- fileINode.getPreferredBlockReplication(), true);
++ fileINode.getFileReplication(), true);
+
+ short numLocations = fileINode.getFileReplication();
+ blockInfo = new BlockInfoContiguous(block, numLocations);
+ blockInfo.convertToBlockUnderConstruction(
+ HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
+ }
fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
fileINode.addBlock(blockInfo);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index a61161f,f22762c..7203316
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@@ -1077,29 -1048,19 +1077,29 @@@ public class FSEditLogLoader
// TODO: shouldn't this only be true for the last block?
// what about an old-version fsync() where fsync isn't called
// until several blocks in?
- newBI = new BlockInfoContiguous(newBlock,
- file.getPreferredBlockReplication());
+ if (isStriped) {
+ newBI = new BlockInfoStriped(newBlock,
+ ecZone.getErasureCodingPolicy());
+ } else {
+ newBI = new BlockInfoContiguous(newBlock,
+ file.getPreferredBlockReplication());
+ }
- newBI.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
- null);
+ newBI.convertToBlockUnderConstruction(
- HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null);
++ BlockUCState.UNDER_CONSTRUCTION, null);
} else {
// OP_CLOSE should add finalized blocks. This code path
// is only executed when loading edits written by prior
// versions of Hadoop. Current versions always log
// OP_ADD operations as each block is allocated.
- newBI = new BlockInfoContiguous(newBlock,
- file.getFileReplication());
+ if (isStriped) {
+ newBI = new BlockInfoStriped(newBlock,
+ ErasureCodingPolicyManager.getSystemDefaultPolicy());
+ } else {
+ newBI = new BlockInfoContiguous(newBlock,
- file.getPreferredBlockReplication());
++ file.getFileReplication());
+ }
}
- fsNamesys.getBlockManager().addBlockCollection(newBI, file);
+ fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBI, file);
file.addBlock(newBI);
fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index ffaf86b,ac88919..a115138
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@@ -43,9 -42,9 +43,10 @@@ import org.apache.hadoop.hdfs.protocol.
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 0d9d427,f4952f7..5f39446
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@@ -7423,11 -7287,30 +7435,35 @@@ public class FSNamesystem implements Na
logger.addAppender(asyncAppender);
}
}
-
+ /**
+ * Return total number of Sync Operations on FSEditLog.
+ */
+ @Override
+ @Metric({"TotalSyncCount",
+ "Total number of sync operations performed on edit logs"})
+ public long getTotalSyncCount() {
+ return fsImage.editLog.getTotalSyncCount();
+ }
+
+ /**
+ * Return total time spent doing sync operations on FSEditLog.
+ */
+ @Override
+ @Metric({"TotalSyncTimes",
+ "Total time spend in sync operation on various edit logs"})
+ public String getTotalSyncTimes() {
+ JournalSet journalSet = fsImage.editLog.getJournalSet();
+ if (journalSet != null) {
+ return journalSet.getSyncTimes();
+ } else {
+ return "";
+ }
+ }
+
+ @Override
+ public ErasureCodingZone getErasureCodingZoneForPath(String src)
+ throws IOException {
+ return FSDirErasureCodingOp.getErasureCodingZone(this, src);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 4fa457d,d546905..ae9b0d2
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@@ -38,7 -37,7 +38,8 @@@ import org.apache.hadoop.hdfs.protocol.
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index 9d43c15,7ebe859..e3363a4
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@@ -258,8 -254,7 +258,8 @@@ public class NamenodeFsck implements Da
NumberReplicas numberReplicas= bm.countNodes(blockInfo);
out.println("Block Id: " + blockId);
out.println("Block belongs to: "+iNode.getFullPathName());
- out.println("No. of Expected Replica: " + blockInfo.getReplication());
+ out.println("No. of Expected Replica: " +
- bm.getExpectedReplicaNum(bc, blockInfo));
++ bm.getExpectedReplicaNum(blockInfo));
out.println("No. of live Replica: " + numberReplicas.liveReplicas());
out.println("No. of excess Replica: " + numberReplicas.excessReplicas());
out.println("No. of stale Replica: " +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index bae033b,4a208d8..923a335
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@@ -17,11 -17,9 +17,12 @@@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import java.io.IOException;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
+ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.util.RwLock;
@@@ -48,17 -48,7 +51,17 @@@ public interface Namesystem extends RwL
void checkOperation(OperationCategory read) throws StandbyException;
- boolean isInSnapshot(BlockCollection bc);
-
+ /**
+ * Gets the ECZone for path
+ * @param src
+ * - path
+ * @return {@link ErasureCodingZone}
+ * @throws IOException
+ */
+ ErasureCodingZone getErasureCodingZoneForPath(String src)
+ throws IOException;
+
+ boolean isInSnapshot(BlockInfo blockUC);
+
CacheManager getCacheManager();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
index 0162f85,91ebaaf..450d981
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
@@@ -245,11 -245,10 +247,11 @@@ public class FSImageFormatPBSnapshot
BlockInfo[] blocks = new BlockInfo[bpl.size()];
for(int j = 0, e = bpl.size(); j < e; ++j) {
Block blk = PBHelper.convert(bpl.get(j));
- BlockInfo storedBlock = fsn.getBlockManager().getStoredBlock(blk);
+ BlockInfo storedBlock = bm.getStoredBlock(blk);
if(storedBlock == null) {
- storedBlock = bm.addBlockCollection(
- new BlockInfoContiguous(blk, copy.getFileReplication()), file);
+ storedBlock = (BlockInfoContiguous) fsn.getBlockManager()
+ .addBlockCollectionWithCheck(new BlockInfoContiguous(blk,
+ copy.getFileReplication()), file);
}
blocks[j] = storedBlock;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index fcf2bc1,96776e4..0db56dd
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@@ -2413,24 -2403,13 +2403,34 @@@
</property>
<property>
+ <name>dfs.datanode.stripedread.threshold.millis</name>
+ <value>5000</value>
+ <description>datanode striped read threshold in millisecond.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.stripedread.threads</name>
+ <value>20</value>
+ <description>datanode striped read thread pool size.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.stripedread.buffer.size</name>
+ <value>262144</value>
+ <description>datanode striped read buffer size.
+ </description>
+</property>
+
++<property>
+ <name>dfs.namenode.quota.init-threads</name>
+ <value>4</value>
+ <description>
+ The number of concurrent threads to be used in quota initialization. The
+ speed of quota initialization also affects the namenode fail-over latency.
+ If the size of name space is big, try increasing this.
+ </description>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 59daba4,24e0965..4bb5c64
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@@ -119,8 -119,8 +119,9 @@@ import org.apache.hadoop.security.UserG
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ExitUtil;
+ import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Joiner;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index ad8f204,c1ed758..eb24fb0
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@@ -955,9 -939,9 +958,9 @@@ public class TestBalancer
void testBalancer1Internal(Configuration conf) throws Exception {
initConf(conf);
testUnevenDistribution(conf,
-- new long[] {50*CAPACITY/100, 10*CAPACITY/100},
++ new long[]{50 * CAPACITY / 100, 10 * CAPACITY / 100},
new long[]{CAPACITY, CAPACITY},
-- new String[] {RACK0, RACK1});
++ new String[]{RACK0, RACK1});
}
@Test(expected=HadoopIllegalArgumentException.class)
@@@ -971,7 -955,7 +974,7 @@@
public void testBalancerWithNonZeroThreadsForMove() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 8);
-- testBalancer1Internal (conf);
++ testBalancer1Internal(conf);
}
@Test(timeout=100000)
@@@ -981,8 -965,8 +984,8 @@@
void testBalancer2Internal(Configuration conf) throws Exception {
initConf(conf);
-- testBalancerDefaultConstructor(conf, new long[] { CAPACITY, CAPACITY },
-- new String[] { RACK0, RACK1 }, CAPACITY, RACK2);
++ testBalancerDefaultConstructor(conf, new long[]{CAPACITY, CAPACITY},
++ new String[]{RACK0, RACK1}, CAPACITY, RACK2);
}
private void testBalancerDefaultConstructor(Configuration conf,
@@@ -1555,75 -1540,116 +1559,183 @@@
}
}
+ /** Balancer should not move blocks with size < minBlockSize. */
+ @Test(timeout=60000)
+ public void testMinBlockSizeAndSourceNodes() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ initConf(conf);
+
+ final short replication = 3;
+ final long[] lengths = {10, 10, 10, 10};
+ final long[] capacities = new long[replication];
+ final long totalUsed = capacities.length * sum(lengths);
+ Arrays.fill(capacities, 1000);
+
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(capacities.length)
+ .simulatedCapacities(capacities)
+ .build();
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+
+ try {
+ cluster.waitActive();
+ client = NameNodeProxies.createProxy(conf, dfs.getUri(),
+ ClientProtocol.class).getProxy();
+
+ // fill up the cluster to be 80% full
+ for(int i = 0; i < lengths.length; i++) {
+ final long size = lengths[i];
+ final Path p = new Path("/file" + i + "_size" + size);
+ try(final OutputStream out = dfs.create(p)) {
+ for(int j = 0; j < size; j++) {
+ out.write(j);
+ }
+ }
+ }
+
+ // start up an empty node with the same capacity
+ cluster.startDataNodes(conf, capacities.length, true, null, null, capacities);
+ LOG.info("capacities = " + Arrays.toString(capacities));
+ LOG.info("totalUsedSpace= " + totalUsed);
+ LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" + lengths.length);
+ waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster);
+
+ final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+
+ { // run Balancer with min-block-size=50
+ final Parameters p = new Parameters(
+ BalancingPolicy.Node.INSTANCE, 1,
+ NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
+ Collections.<String> emptySet(), Collections.<String> emptySet(),
+ Collections.<String> emptySet(), false);
+
+ conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
+ final int r = Balancer.run(namenodes, p, conf);
+ assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+ }
+
+ conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+
+ { // run Balancer with empty nodes as source nodes
+ final Set<String> sourceNodes = new HashSet<>();
+ final List<DataNode> datanodes = cluster.getDataNodes();
+ for(int i = capacities.length; i < datanodes.size(); i++) {
+ sourceNodes.add(datanodes.get(i).getDisplayName());
+ }
+ final Parameters p = new Parameters(
+ BalancingPolicy.Node.INSTANCE, 1,
+ NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
+ Collections.<String> emptySet(), Collections.<String> emptySet(),
+ sourceNodes, false);
+
+ conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
+ final int r = Balancer.run(namenodes, p, conf);
+ assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
+ }
+
+ { // run Balancer with a filled node as a source node
+ final Set<String> sourceNodes = new HashSet<>();
+ final List<DataNode> datanodes = cluster.getDataNodes();
+ sourceNodes.add(datanodes.get(0).getDisplayName());
+ final Parameters p = new Parameters(
+ BalancingPolicy.Node.INSTANCE, 1,
+ NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
+ Collections.<String> emptySet(), Collections.<String> emptySet(),
+ sourceNodes, false);
+
+ conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+ final int r = Balancer.run(namenodes, p, conf);
+ assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
+ }
+
+ { // run Balancer with all filled node as source nodes
+ final Set<String> sourceNodes = new HashSet<>();
+ final List<DataNode> datanodes = cluster.getDataNodes();
+ for(int i = 0; i < capacities.length; i++) {
+ sourceNodes.add(datanodes.get(i).getDisplayName());
+ }
+ final Parameters p = new Parameters(
+ BalancingPolicy.Node.INSTANCE, 1,
+ NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
+ Collections.<String> emptySet(), Collections.<String> emptySet(),
+ sourceNodes, false);
+
+ conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+ final int r = Balancer.run(namenodes, p, conf);
+ assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
-
+ public void integrationTestWithStripedFile(Configuration conf) throws Exception {
+ initConfWithStripe(conf);
+ doTestBalancerWithStripedFile(conf);
+ }
+
+ @Test(timeout = 100000)
+ public void testBalancerWithStripedFile() throws Exception {
+ Configuration conf = new Configuration();
+ initConfWithStripe(conf);
+ doTestBalancerWithStripedFile(conf);
+ }
+
+ private void doTestBalancerWithStripedFile(Configuration conf) throws Exception {
+ int numOfDatanodes = dataBlocks + parityBlocks + 2;
+ int numOfRacks = dataBlocks;
+ long capacity = 20 * DEFAULT_STRIPE_BLOCK_SIZE;
+ long[] capacities = new long[numOfDatanodes];
+ for (int i = 0; i < capacities.length; i++) {
+ capacities[i] = capacity;
+ }
+ String[] racks = new String[numOfDatanodes];
+ for (int i = 0; i < numOfDatanodes; i++) {
+ racks[i] = "/rack" + (i % numOfRacks);
+ }
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numOfDatanodes)
+ .racks(racks)
+ .simulatedCapacities(capacities)
+ .build();
+
+ try {
+ cluster.waitActive();
+ client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
+ ClientProtocol.class).getProxy();
+ client.createErasureCodingZone("/", null);
+
+ long totalCapacity = sum(capacities);
+
+ // fill up the cluster with 30% data. It'll be 45% full plus parity.
+ long fileLen = totalCapacity * 3 / 10;
+ long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks;
+ FileSystem fs = cluster.getFileSystem(0);
+ DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong());
+
+ // verify locations of striped blocks
+ LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
+ DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+
+ // add one datanode
+ String newRack = "/rack" + (++numOfRacks);
+ cluster.startDataNodes(conf, 1, true, null,
+ new String[]{newRack}, null, new long[]{capacity});
+ totalCapacity += capacity;
+ cluster.triggerHeartbeats();
+
+ // run balancer and validate results
+ Balancer.Parameters p = Balancer.Parameters.DEFAULT;
+ Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+ runBalancer(conf, totalUsedSpace, totalCapacity, p, 0);
+
+ // verify locations of striped blocks
+ locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
+ DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
-
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
/**
* @param args
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
index bae4f1d,ceef9f2..d6213ff
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
@@@ -70,18 -71,18 +71,6 @@@ public class TestBlockInfo
}
@Test
-- public void testCopyConstructor() {
- BlockInfo old = new BlockInfoContiguous((short) 3);
- BlockInfoContiguous old = new BlockInfoContiguous((short) 3);
-- try {
- BlockInfo copy = new BlockInfoContiguous((BlockInfoContiguous)old);
- assertEquals(old.getBlockCollection(), copy.getBlockCollection());
- BlockInfoContiguous copy = new BlockInfoContiguous(old);
- assertEquals(old.getBlockCollectionId(), copy.getBlockCollectionId());
-- assertEquals(old.getCapacity(), copy.getCapacity());
-- } catch (Exception e) {
-- Assert.fail("Copy constructor throws exception: " + e);
-- }
-- }
--
-- @Test
public void testReplaceStorage() throws Exception {
// Create two dummy storages.