You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/30 17:42:03 UTC
[03/50] [abbrv] hadoop git commit: Merge commit
'456e901a4c5c639267ee87b8e5f1319f256d20c2' (HDFS-6407. Add sorting and
pagination in the datanode tab of the NN Web UI. Contributed by Haohui Mai.)
into HDFS-7285-merge
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 36ce133,508da85..dfea5f3
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@@ -42,7 -44,8 +44,9 @@@ import javax.management.ObjectName
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.StorageType;
+ import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
@@@ -52,12 -55,10 +56,11 @@@ import org.apache.hadoop.hdfs.protocol.
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
--import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@@ -77,22 -79,17 +81,24 @@@ import org.apache.hadoop.hdfs.server.pr
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
+
+ import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.Time;
@@@ -818,11 -786,12 +835,11 @@@ public class BlockManager implements Bl
}
return locations;
}
--
- private List<LocatedBlock> createLocatedBlockList(
- final BlockInfo[] blocks,
++
+ private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
final long offset, final long length, final int nrBlocksToReturn,
final AccessMode mode) throws IOException {
- int curBlk = 0;
+ int curBlk;
long curPos = 0, blkSize = 0;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
@@@ -875,25 -844,19 +892,26 @@@
}
/** @return a LocatedBlock for the given block */
- private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) {
- private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
- ) throws IOException {
- if (blk instanceof BlockInfoContiguousUnderConstruction) {
- if (blk.isComplete()) {
- throw new IOException(
- "blk instanceof BlockInfoUnderConstruction && blk.isComplete()"
- + ", blk=" + blk);
++ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos)
++ throws IOException {
+ if (!blk.isComplete()) {
+ if (blk.isStriped()) {
- final BlockInfoUnderConstructionStriped uc =
- (BlockInfoUnderConstructionStriped) blk;
++ final BlockInfoStripedUnderConstruction uc =
++ (BlockInfoStripedUnderConstruction) blk;
+ final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
+ final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
+ blk);
+ return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
+ false);
+ } else {
- assert blk instanceof BlockInfoUnderConstructionContiguous;
- final BlockInfoUnderConstructionContiguous uc =
- (BlockInfoUnderConstructionContiguous) blk;
++ assert blk instanceof BlockInfoContiguousUnderConstruction;
++ final BlockInfoContiguousUnderConstruction uc =
++ (BlockInfoContiguousUnderConstruction) blk;
+ final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
+ final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
+ blk);
+ return newLocatedBlock(eb, storages, pos, false);
}
- final BlockInfoContiguousUnderConstruction uc =
- (BlockInfoContiguousUnderConstruction) blk;
- final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
- final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
- return newLocatedBlock(eb, storages, pos, false);
}
// get block locations
@@@ -1188,17 -1121,13 +1206,17 @@@
return;
}
StringBuilder datanodes = new StringBuilder();
- for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
+ for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock,
+ State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
- invalidateBlocks.add(b, node, false);
- datanodes.append(node).append(" ");
+ final Block b = getBlockOnStorage(storedBlock, storage);
+ if (b != null) {
+ invalidateBlocks.add(b, node, false);
+ datanodes.append(node).append(" ");
+ }
}
if (datanodes.length() != 0) {
- blockLog.info("BLOCK* addToInvalidates: {} {}", storedBlock,
- blockLog.debug("BLOCK* addToInvalidates: {} {}", b,
++ blockLog.debug("BLOCK* addToInvalidates: {} {}", storedBlock,
datanodes.toString());
}
}
@@@ -1267,8 -1188,8 +1285,8 @@@
DatanodeStorageInfo storageInfo,
DatanodeDescriptor node) throws IOException {
- if (b.corrupted.isDeleted()) {
+ if (b.stored.isDeleted()) {
- blockLog.info("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
+ blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
" corrupt as it does not belong to any file", b);
addToInvalidates(b.corrupted, node);
return;
@@@ -1323,9 -1237,9 +1341,9 @@@
* @return true if the block was successfully invalidated and no longer
* present in the BlocksMap
*/
- private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
- ) throws IOException {
+ private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn,
+ NumberReplicas nr) throws IOException {
- blockLog.info("BLOCK* invalidateBlock: {} on {}", b, dn);
+ blockLog.debug("BLOCK* invalidateBlock: {} on {}", b, dn);
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
throw new IOException("Cannot invalidate " + b
@@@ -1333,8 -1247,9 +1351,8 @@@
}
// Check how many copies we have of the block
- NumberReplicas nr = countNodes(b.stored);
if (nr.replicasOnStaleNodes() > 0) {
- blockLog.info("BLOCK* invalidateBlocks: postponing " +
+ blockLog.debug("BLOCK* invalidateBlocks: postponing " +
"invalidation of {} on {} because {} replica(s) are located on " +
"nodes with potentially out-of-date block reports", b, dn,
nr.replicasOnStaleNodes());
@@@ -1478,12 -1391,12 +1496,12 @@@
// do not schedule more if enough replicas is already pending
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
-
+
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
- (blockHasEnoughRacks(block)) ) {
+ (blockHasEnoughRacks(block, requiredReplication)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
- blockLog.info("BLOCK* Removing {} from neededReplications as" +
+ blockLog.debug("BLOCK* Removing {} from neededReplications as" +
" it has enough replicas", block);
continue;
}
@@@ -1565,10 -1463,10 +1583,10 @@@
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
- (blockHasEnoughRacks(block)) ) {
+ (blockHasEnoughRacks(block, requiredReplication)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
rw.targets = null;
- blockLog.info("BLOCK* Removing {} from neededReplications as" +
+ blockLog.debug("BLOCK* Removing {} from neededReplications as" +
" it has enough replicas", block);
continue;
}
@@@ -1637,11 -1510,11 +1655,11 @@@
DatanodeStorageInfo[] targets = rw.targets;
if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)");
- for (int k = 0; k < targets.length; k++) {
+ for (DatanodeStorageInfo target : targets) {
targetList.append(' ');
- targetList.append(targets[k].getDatanodeDescriptor());
+ targetList.append(target.getDatanodeDescriptor());
}
- blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNodes,
- blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNode,
++ blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNodes,
rw.block, targetList);
}
}
@@@ -1882,11 -1765,8 +1921,11 @@@
final Block reportedBlock;
final ReplicaState reportedState;
- StatefulBlockInfo(BlockInfoContiguousUnderConstruction storedBlock,
+ StatefulBlockInfo(BlockInfo storedBlock,
Block reportedBlock, ReplicaState reportedState) {
+ Preconditions.checkArgument(
- storedBlock instanceof BlockInfoUnderConstructionContiguous ||
- storedBlock instanceof BlockInfoUnderConstructionStriped);
++ storedBlock instanceof BlockInfoContiguousUnderConstruction ||
++ storedBlock instanceof BlockInfoStripedUnderConstruction);
this.storedBlock = storedBlock;
this.reportedBlock = reportedBlock;
this.reportedState = reportedState;
@@@ -2263,8 -2141,8 +2311,8 @@@
QUEUE_REASON_FUTURE_GENSTAMP);
continue;
}
--
- BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
++
+ BlockInfo storedBlock = getStoredBlock(iblk);
// If block does not belong to any file, we are done.
if (storedBlock == null) continue;
@@@ -2306,9 -2186,9 +2354,9 @@@
}
private void reportDiff(DatanodeStorageInfo storageInfo,
-- BlockListAsLongs newReport,
- Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
- Collection<Block> toRemove, // remove from DatanodeDescriptor
++ BlockListAsLongs newReport,
+ Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor
+ Collection<BlockInfo> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
@@@ -2342,10 -2220,10 +2390,11 @@@
// collect blocks that have not been reported
// all of them are next to the delimiter
- Iterator<BlockInfo> it = storageInfo.new BlockIterator(delimiter.getNext(0));
+ Iterator<BlockInfo> it =
+ storageInfo.new BlockIterator(delimiter.getNext(0));
- while(it.hasNext())
+ while (it.hasNext()) {
toRemove.add(it.next());
+ }
storageInfo.removeBlock(delimiter);
}
@@@ -2382,8 -2260,8 +2431,8 @@@
*/
private BlockInfo processReportedBlock(
final DatanodeStorageInfo storageInfo,
-- final Block block, final ReplicaState reportedState,
- final Collection<BlockInfo> toAdd,
++ final Block block, final ReplicaState reportedState,
+ final Collection<BlockInfoToAdd> toAdd,
final Collection<Block> toInvalidate,
final Collection<BlockToMarkCorrupt> toCorrupt,
final Collection<StatefulBlockInfo> toUC) {
@@@ -2717,10 -2578,9 +2766,10 @@@
assert block != null && namesystem.hasWriteLock();
BlockInfo storedBlock;
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
- if (block instanceof BlockInfoUnderConstructionContiguous ||
- block instanceof BlockInfoUnderConstructionStriped) {
- if (block instanceof BlockInfoContiguousUnderConstruction) {
++ if (block instanceof BlockInfoContiguousUnderConstruction ||
++ block instanceof BlockInfoStripedUnderConstruction) {
//refresh our copy in case the block got completed in another thread
- storedBlock = blocksMap.getStoredBlock(block);
+ storedBlock = getStoredBlock(block);
} else {
storedBlock = block;
}
@@@ -3275,26 -3055,6 +3325,26 @@@
}
}
+ private void processChosenExcessReplica(
+ final Collection<DatanodeStorageInfo> nonExcess,
+ final DatanodeStorageInfo chosen, BlockInfo storedBlock) {
+ nonExcess.remove(chosen);
+ addToExcessReplicate(chosen.getDatanodeDescriptor(), storedBlock);
+ //
+ // The 'excessblocks' tracks blocks until we get confirmation
+ // that the datanode has deleted them; the only way we remove them
+ // is when we get a "removeBlock" message.
+ //
+ // The 'invalidate' list is used to inform the datanode the block
+ // should be deleted. Items are removed from the invalidate list
+ // upon giving instructions to the datanodes.
+ //
+ final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen);
+ addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor());
- blockLog.info("BLOCK* chooseExcessReplicates: "
- +"({}, {}) is added to invalidated blocks set", chosen, storedBlock);
++ blockLog.debug("BLOCK* chooseExcessReplicates: "
++ + "({}, {}) is added to invalidated blocks set", chosen, storedBlock);
+ }
+
/** Check if we can use delHint */
static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint,
DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks,
@@@ -3356,6 -3116,19 +3406,19 @@@
return;
}
+ CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks()
- .get(new CachedBlock(block.getBlockId(), (short) 0, false));
++ .get(new CachedBlock(storedBlock.getBlockId(), (short) 0, false));
+ if (cblock != null) {
+ boolean removed = false;
+ removed |= node.getPendingCached().remove(cblock);
+ removed |= node.getCached().remove(cblock);
+ removed |= node.getPendingUncached().remove(cblock);
+ if (removed) {
+ blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching "
- + "related lists on node {}", block, node);
++ + "related lists on node {}", storedBlock, node);
+ }
+ }
+
//
// It's possible that the block was removed because of a datanode
// failure. If the block is still valid, check if replication is
@@@ -3454,7 -3212,10 +3517,10 @@@
//
// Modify the blocks->datanode map and node's map.
//
- pendingReplications.decrement(getStoredBlock(block), node);
+ BlockInfo storedBlock = getStoredBlock(block);
+ if (storedBlock != null) {
- pendingReplications.decrement(getStoredBlock(block), node);
++ pendingReplications.decrement(storedBlock, node);
+ }
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode);
}
@@@ -4138,57 -3819,22 +4204,57 @@@
null);
}
- private static class ReplicationWork {
-
- private final BlockInfo block;
- private final BlockCollection bc;
-
- private final DatanodeDescriptor srcNode;
- private final List<DatanodeDescriptor> containingNodes;
- private final List<DatanodeStorageInfo> liveReplicaStorages;
- private final int additionalReplRequired;
+ public static LocatedStripedBlock newLocatedStripedBlock(
+ ExtendedBlock b, DatanodeStorageInfo[] storages,
+ int[] indices, long startOffset, boolean corrupt) {
+ // startOffset is unknown
+ return new LocatedStripedBlock(
+ b, DatanodeStorageInfo.toDatanodeInfos(storages),
+ DatanodeStorageInfo.toStorageIDs(storages),
+ DatanodeStorageInfo.toStorageTypes(storages),
+ indices, startOffset, corrupt,
+ null);
+ }
- private DatanodeStorageInfo targets[];
- private final int priority;
+ public static LocatedBlock newLocatedBlock(ExtendedBlock eb, BlockInfo info,
+ DatanodeStorageInfo[] locs, long offset) throws IOException {
+ final LocatedBlock lb;
+ if (info.isStriped()) {
+ lb = newLocatedStripedBlock(eb, locs,
- ((BlockInfoUnderConstructionStriped)info).getBlockIndices(),
++ ((BlockInfoStripedUnderConstruction)info).getBlockIndices(),
+ offset, false);
+ } else {
+ lb = newLocatedBlock(eb, locs, offset, false);
+ }
+ return lb;
+ }
- public ReplicationWork(BlockInfo block,
+ /**
+ * This class is used internally by {@link this#computeRecoveryWorkForBlocks}
+ * to represent a task to recover a block through replication or erasure
+ * coding. Recovery is done by transferring data from srcNodes to targets
+ */
+ private abstract static class BlockRecoveryWork {
+ final BlockInfo block;
+ final BlockCollection bc;
+
+ /**
+ * An erasure coding recovery task has multiple source nodes.
+ * A replication task only has 1 source node, stored on top of the array
+ */
+ final DatanodeDescriptor[] srcNodes;
+ /** Nodes containing the block; avoid them in choosing new targets */
+ final List<DatanodeDescriptor> containingNodes;
+ /** Required by {@link BlockPlacementPolicy#chooseTarget} */
+ final List<DatanodeStorageInfo> liveReplicaStorages;
+ final int additionalReplRequired;
+
+ DatanodeStorageInfo[] targets;
+ final int priority;
+
+ BlockRecoveryWork(BlockInfo block,
BlockCollection bc,
- DatanodeDescriptor srcNode,
+ DatanodeDescriptor[] srcNodes,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> liveReplicaStorages,
int additionalReplRequired,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index 9173920,0dbf485..5bfae42
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@@ -130,17 -123,13 +130,17 @@@ class BlocksMap
return;
blockInfo.setBlockCollection(null);
- for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) {
+ final int size = blockInfo instanceof BlockInfoContiguous ?
+ blockInfo.numNodes() : blockInfo.getCapacity();
+ for(int idx = size - 1; idx >= 0; idx--) {
DatanodeDescriptor dn = blockInfo.getDatanode(idx);
- dn.removeBlock(blockInfo); // remove from the list and wipe the location
+ if (dn != null) {
+ dn.removeBlock(blockInfo); // remove from the list and wipe the location
+ }
}
}
--
- /** Returns the block object it it exists in the map. */
++
+ /** Returns the block object if it exists in the map. */
BlockInfo getStoredBlock(Block b) {
return blocks.get(b);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 108ce2f,87ce753..87394f6
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@@ -31,10 -31,8 +31,7 @@@ import java.util.Queue
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
-
import com.google.common.collect.ImmutableList;
-
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
@@@ -50,9 -46,10 +47,11 @@@ import org.apache.hadoop.hdfs.server.pr
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.util.IntrusiveCollection;
import org.apache.hadoop.util.Time;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
/**
* This class extends the DatanodeInfo class with ephemeral information (eg
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 2275d91,216d6d2..bb9a706
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@@ -253,18 -252,10 +254,18 @@@ public class DatanodeStorageInfo
}
// add to the head of the data-node list
- b.addStorage(this);
+ b.addStorage(this, reportedBlock);
+ insertToList(b);
+ return result;
+ }
+
- AddBlockResult addBlock(BlockInfoContiguous b) {
++ AddBlockResult addBlock(BlockInfo b) {
+ return addBlock(b, b);
+ }
+
+ public void insertToList(BlockInfo b) {
blockList = b.listInsert(blockList, this);
numBlocks++;
- return result;
}
public boolean removeBlock(BlockInfo b) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
index 47afb05,ebc15b8..7e8f479
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
@@@ -240,10 -206,10 +241,10 @@@ class UnderReplicatedBlocks implements
/** remove a block from a under replication queue */
synchronized boolean remove(BlockInfo block,
- int oldReplicas,
+ int oldReplicas,
int decommissionedReplicas,
int oldExpectedReplicas) {
- int priLevel = getPriority(oldReplicas,
+ int priLevel = getPriority(block, oldReplicas,
decommissionedReplicas,
oldExpectedReplicas);
boolean removedBlock = remove(block, priLevel);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index dc6acd5,afacebb..34d92d0
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@@ -368,17 -356,15 +369,16 @@@ public class Mover
final BlockStoragePolicy policy = blockStoragePolicies[policyId];
if (policy == null) {
LOG.warn("Failed to get the storage policy of file " + fullPath);
- return false;
+ return;
}
- final List<StorageType> types = policy.chooseStorageTypes(
+ List<StorageType> types = policy.chooseStorageTypes(
status.getReplication());
+ final ErasureCodingPolicy ecPolicy = status.getErasureCodingPolicy();
final LocatedBlocks locatedBlocks = status.getBlockLocations();
- boolean hasRemaining = false;
final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
List<LocatedBlock> lbs = locatedBlocks.getLocatedBlocks();
- for(int i = 0; i < lbs.size(); i++) {
+ for (int i = 0; i < lbs.size(); i++) {
if (i == lbs.size() - 1 && !lastBlkComplete) {
// last block is incomplete, skip it
continue;
@@@ -390,22 -373,22 +390,25 @@@
final StorageTypeDiff diff = new StorageTypeDiff(types,
lb.getStorageTypes());
if (!diff.removeOverlap(true)) {
- if (scheduleMoves4Block(diff, lb)) {
+ if (scheduleMoves4Block(diff, lb, ecPolicy)) {
- hasRemaining |= (diff.existing.size() > 1 &&
- diff.expected.size() > 1);
+ result.updateHasRemaining(diff.existing.size() > 1
+ && diff.expected.size() > 1);
+ // One block scheduled successfully, set noBlockMoved to false
+ result.setNoBlockMoved(false);
+ } else {
+ result.updateHasRemaining(true);
}
}
}
- return hasRemaining;
}
- boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
+ boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb,
+ ErasureCodingPolicy ecPolicy) {
final List<MLocation> locations = MLocation.toLocations(lb);
- Collections.shuffle(locations);
- final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations);
+ if (!(lb instanceof LocatedStripedBlock)) {
+ Collections.shuffle(locations);
+ }
+ final DBlock db = newDBlock(lb, locations, ecPolicy);
for (final StorageType t : diff.existing) {
for (final MLocation ml : locations) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
index 0000000,3d79d09..ef67c1a
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
@@@ -1,0 -1,261 +1,268 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs.server.namenode;
+
+ import java.io.FileNotFoundException;
+ import java.io.IOException;
+ import java.util.List;
+
+ import org.apache.hadoop.fs.FileAlreadyExistsException;
+ import org.apache.hadoop.fs.StorageType;
+ import org.apache.hadoop.fs.permission.FsAction;
+ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
+ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
+ import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature;
+
+ import com.google.common.base.Preconditions;
+
+ /**
+ * Helper class to perform append operation.
+ */
+ final class FSDirAppendOp {
+
+ /**
+ * Private constructor for preventing FSDirAppendOp object creation.
+ * Static-only class.
+ */
+ private FSDirAppendOp() {}
+
+ /**
+ * Append to an existing file.
+ * <p>
+ *
+ * The method returns the last block of the file if this is a partial block,
+ * which can still be used for writing more data. The client uses the
+ * returned block locations to form the data pipeline for this block.<br>
+ * The {@link LocatedBlock} will be null if the last block is full.
+ * The client then allocates a new block with the next call using
+ * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#addBlock}.
+ * <p>
+ *
+ * For description of parameters and exceptions thrown see
+ * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#append}
+ *
+ * @param fsn namespace
+ * @param srcArg path name
+ * @param pc permission checker to check fs permission
+ * @param holder client name
+ * @param clientMachine client machine info
+ * @param newBlock if the data is appended to a new block
+ * @param logRetryCache whether to record RPC ids in editlog for retry cache
+ * rebuilding
+ * @return the last block with status
+ */
+ static LastBlockWithStatus appendFile(final FSNamesystem fsn,
+ final String srcArg, final FSPermissionChecker pc, final String holder,
+ final String clientMachine, final boolean newBlock,
+ final boolean logRetryCache) throws IOException {
+ assert fsn.hasWriteLock();
+
+ final byte[][] pathComponents = FSDirectory
+ .getPathComponentsForReservedPath(srcArg);
+ final LocatedBlock lb;
+ final FSDirectory fsd = fsn.getFSDirectory();
+ final String src;
+ fsd.writeLock();
+ try {
+ src = fsd.resolvePath(pc, srcArg, pathComponents);
+ final INodesInPath iip = fsd.getINodesInPath4Write(src);
+ // Verify that the destination does not exist as a directory already
+ final INode inode = iip.getLastINode();
+ final String path = iip.getPath();
+ if (inode != null && inode.isDirectory()) {
+ throw new FileAlreadyExistsException("Cannot append to directory "
+ + path + "; already exists as a directory.");
+ }
+ if (fsd.isPermissionEnabled()) {
+ fsd.checkPathAccess(pc, iip, FsAction.WRITE);
+ }
+
+ if (inode == null) {
+ throw new FileNotFoundException(
+ "Failed to append to non-existent file " + path + " for client "
+ + clientMachine);
+ }
+ final INodeFile file = INodeFile.valueOf(inode, path, true);
++
++ // not support appending file with striped blocks
++ if (file.isStriped()) {
++ throw new UnsupportedOperationException(
++ "Cannot append to files with striped block " + src);
++ }
++
+ BlockManager blockManager = fsd.getBlockManager();
+ final BlockStoragePolicy lpPolicy = blockManager
+ .getStoragePolicy("LAZY_PERSIST");
+ if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) {
+ throw new UnsupportedOperationException(
+ "Cannot append to lazy persist file " + path);
+ }
+ // Opening an existing file for append - may need to recover lease.
+ fsn.recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, path, holder,
+ clientMachine, false);
+
+ final BlockInfo lastBlock = file.getLastBlock();
+ // Check that the block has at least minimum replication.
+ if (lastBlock != null && lastBlock.isComplete()
+ && !blockManager.isSufficientlyReplicated(lastBlock)) {
+ throw new IOException("append: lastBlock=" + lastBlock + " of src="
+ + path + " is not sufficiently replicated yet.");
+ }
+ lb = prepareFileForAppend(fsn, iip, holder, clientMachine, newBlock,
+ true, logRetryCache);
+ } catch (IOException ie) {
+ NameNode.stateChangeLog
+ .warn("DIR* NameSystem.append: " + ie.getMessage());
+ throw ie;
+ } finally {
+ fsd.writeUnlock();
+ }
+
+ HdfsFileStatus stat = FSDirStatAndListingOp.getFileInfo(fsd, src, false,
+ FSDirectory.isReservedRawName(srcArg));
+ if (lb != null) {
+ NameNode.stateChangeLog.debug(
+ "DIR* NameSystem.appendFile: file {} for {} at {} block {} block"
+ + " size {}", srcArg, holder, clientMachine, lb.getBlock(), lb
+ .getBlock().getNumBytes());
+ }
+ return new LastBlockWithStatus(lb, stat);
+ }
+
+ /**
+ * Convert current node to under construction.
+ * Recreate in-memory lease record.
+ *
+ * @param fsn namespace
+ * @param iip inodes in the path containing the file
+ * @param leaseHolder identifier of the lease holder on this file
+ * @param clientMachine identifier of the client machine
+ * @param newBlock if the data is appended to a new block
+ * @param writeToEditLog whether to persist this change to the edit log
+ * @param logRetryCache whether to record RPC ids in editlog for retry cache
+ * rebuilding
+ * @return the last block locations if the block is partial or null otherwise
+ * @throws IOException
+ */
+ static LocatedBlock prepareFileForAppend(final FSNamesystem fsn,
+ final INodesInPath iip, final String leaseHolder,
+ final String clientMachine, final boolean newBlock,
+ final boolean writeToEditLog, final boolean logRetryCache)
+ throws IOException {
+ assert fsn.hasWriteLock();
+
+ final INodeFile file = iip.getLastINode().asFile();
+ final QuotaCounts delta = verifyQuotaForUCBlock(fsn, file, iip);
+
+ file.recordModification(iip.getLatestSnapshotId());
+ file.toUnderConstruction(leaseHolder, clientMachine);
+
+ fsn.getLeaseManager().addLease(
+ file.getFileUnderConstructionFeature().getClientName(), file.getId());
+
+ LocatedBlock ret = null;
+ if (!newBlock) {
+ FSDirectory fsd = fsn.getFSDirectory();
+ ret = fsd.getBlockManager().convertLastBlockToUnderConstruction(file, 0);
+ if (ret != null && delta != null) {
+ Preconditions.checkState(delta.getStorageSpace() >= 0, "appending to"
+ + " a block with size larger than the preferred block size");
+ fsd.writeLock();
+ try {
+ fsd.updateCountNoQuotaCheck(iip, iip.length() - 1, delta);
+ } finally {
+ fsd.writeUnlock();
+ }
+ }
+ } else {
+ BlockInfo lastBlock = file.getLastBlock();
+ if (lastBlock != null) {
+ ExtendedBlock blk = new ExtendedBlock(fsn.getBlockPoolId(), lastBlock);
+ ret = new LocatedBlock(blk, new DatanodeInfo[0]);
+ }
+ }
+
+ if (writeToEditLog) {
+ final String path = iip.getPath();
+ if (NameNodeLayoutVersion.supports(Feature.APPEND_NEW_BLOCK,
+ fsn.getEffectiveLayoutVersion())) {
+ fsn.getEditLog().logAppendFile(path, file, newBlock, logRetryCache);
+ } else {
+ fsn.getEditLog().logOpenFile(path, file, false, logRetryCache);
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Verify quota when using the preferred block size for UC block. This is
+ * usually used by append and truncate.
+ *
+ * @throws QuotaExceededException when violating the storage quota
+ * @return expected quota usage update. null means no change or no need to
+ * update quota usage later
+ */
+ private static QuotaCounts verifyQuotaForUCBlock(FSNamesystem fsn,
+ INodeFile file, INodesInPath iip) throws QuotaExceededException {
+ FSDirectory fsd = fsn.getFSDirectory();
+ if (!fsn.isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
+ // Do not check quota if editlog is still being processed
+ return null;
+ }
+ if (file.getLastBlock() != null) {
+ final QuotaCounts delta = computeQuotaDeltaForUCBlock(fsn, file);
+ fsd.readLock();
+ try {
+ FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null);
+ return delta;
+ } finally {
+ fsd.readUnlock();
+ }
+ }
+ return null;
+ }
+
+ /** Compute quota change for converting a complete block to a UC block. */
+ private static QuotaCounts computeQuotaDeltaForUCBlock(FSNamesystem fsn,
+ INodeFile file) {
+ final QuotaCounts delta = new QuotaCounts.Builder().build();
+ final BlockInfo lastBlock = file.getLastBlock();
+ if (lastBlock != null) {
+ final long diff = file.getPreferredBlockSize() - lastBlock.getNumBytes();
+ final short repl = file.getPreferredBlockReplication();
+ delta.addStorageSpace(diff * repl);
+ final BlockStoragePolicy policy = fsn.getFSDirectory()
+ .getBlockStoragePolicySuite().getPolicy(file.getStoragePolicyID());
+ List<StorageType> types = policy.chooseStorageTypes(repl);
+ for (StorageType t : types) {
+ if (t.supportTypeQuota()) {
+ delta.addTypeSpace(t, diff);
+ }
+ }
+ }
+ return delta;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index bad7c42,4a45074..6ec97c9
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@@ -28,9 -27,8 +28,10 @@@ import org.apache.hadoop.fs.InvalidPath
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtil;
+ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@@ -140,9 -139,69 +142,71 @@@ class FSDirStatAndListingOp
return getContentSummaryInt(fsd, iip);
}
+ /**
+ * Get block locations within the specified range.
+ * @see ClientProtocol#getBlockLocations(String, long, long)
+ * @throws IOException
+ */
+ static GetBlockLocationsResult getBlockLocations(
+ FSDirectory fsd, FSPermissionChecker pc, String src, long offset,
+ long length, boolean needBlockToken) throws IOException {
+ Preconditions.checkArgument(offset >= 0,
+ "Negative offset is not supported. File: " + src);
+ Preconditions.checkArgument(length >= 0,
+ "Negative length is not supported. File: " + src);
+ CacheManager cm = fsd.getFSNamesystem().getCacheManager();
+ BlockManager bm = fsd.getBlockManager();
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ boolean isReservedName = FSDirectory.isReservedRawName(src);
+ fsd.readLock();
+ try {
+ src = fsd.resolvePath(pc, src, pathComponents);
+ final INodesInPath iip = fsd.getINodesInPath(src, true);
+ final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
+ if (fsd.isPermissionEnabled()) {
+ fsd.checkPathAccess(pc, iip, FsAction.READ);
+ fsd.checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
+ }
+
+ final long fileSize = iip.isSnapshot()
+ ? inode.computeFileSize(iip.getPathSnapshotId())
+ : inode.computeFileSizeNotIncludingLastUcBlock();
+
+ boolean isUc = inode.isUnderConstruction();
+ if (iip.isSnapshot()) {
+ // if src indicates a snapshot file, we need to make sure the returned
+ // blocks do not exceed the size of the snapshot file.
+ length = Math.min(length, fileSize - offset);
+ isUc = false;
+ }
+
+ final FileEncryptionInfo feInfo = isReservedName ? null
+ : fsd.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
++ final ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
++ fsd.getFSNamesystem(), iip);
+
+ final LocatedBlocks blocks = bm.createLocatedBlocks(
+ inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
- length, needBlockToken, iip.isSnapshot(), feInfo);
++ length, needBlockToken, iip.isSnapshot(), feInfo, ecZone);
+
+ // Set caching information for the located blocks.
+ for (LocatedBlock lb : blocks.getLocatedBlocks()) {
+ cm.setCachedLocations(lb);
+ }
+
+ final long now = now();
+ boolean updateAccessTime = fsd.isAccessTimeSupported()
+ && !iip.isSnapshot()
+ && now > inode.getAccessTime() + fsd.getAccessTimePrecision();
+ return new GetBlockLocationsResult(updateAccessTime, blocks);
+ } finally {
+ fsd.readUnlock();
+ }
+ }
+
private static byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) {
- return inodePolicy != HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED ? inodePolicy :
- parentPolicy;
+ return inodePolicy != HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED
+ ? inodePolicy : parentPolicy;
}
/**
@@@ -457,9 -505,9 +518,9 @@@
final long fileSize = !inSnapshot && isUc ?
fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
- loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks(
+ loc = fsd.getBlockManager().createLocatedBlocks(
fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false,
- inSnapshot, feInfo);
+ inSnapshot, feInfo, ecZone);
if (loc == null) {
loc = new LocatedBlocks();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
index 0000000,474c257..215a761
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
@@@ -1,0 -1,360 +1,370 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs.server.namenode;
+
+ import java.io.IOException;
+
+ import org.apache.hadoop.HadoopIllegalArgumentException;
+ import org.apache.hadoop.fs.UnresolvedLinkException;
+ import org.apache.hadoop.fs.permission.FsAction;
+ import org.apache.hadoop.hdfs.protocol.Block;
+ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
+ import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
+
+ import com.google.common.annotations.VisibleForTesting;
+
+ /**
+ * Helper class to perform truncate operation.
+ */
+ final class FSDirTruncateOp {
+
+ /**
+ * Private constructor for preventing FSDirTruncateOp object creation.
+ * Static-only class.
+ */
+ private FSDirTruncateOp() {}
+
+ /**
+ * Truncate a file to a given size.
+ *
+ * @param fsn namespace
+ * @param srcArg path name
+ * @param newLength the target file size
+ * @param clientName client name
+ * @param clientMachine client machine info
+ * @param mtime modified time
+ * @param toRemoveBlocks to be removed blocks
+ * @param pc permission checker to check fs permission
+ * @return tuncate result
+ * @throws IOException
+ */
+ static TruncateResult truncate(final FSNamesystem fsn, final String srcArg,
+ final long newLength, final String clientName,
+ final String clientMachine, final long mtime,
+ final BlocksMapUpdateInfo toRemoveBlocks, final FSPermissionChecker pc)
+ throws IOException, UnresolvedLinkException {
+ assert fsn.hasWriteLock();
+
+ FSDirectory fsd = fsn.getFSDirectory();
+ byte[][] pathComponents = FSDirectory
+ .getPathComponentsForReservedPath(srcArg);
+ final String src;
+ final INodesInPath iip;
+ final boolean onBlockBoundary;
+ Block truncateBlock = null;
+ fsd.writeLock();
+ try {
+ src = fsd.resolvePath(pc, srcArg, pathComponents);
+ iip = fsd.getINodesInPath4Write(src, true);
+ if (fsd.isPermissionEnabled()) {
+ fsd.checkPathAccess(pc, iip, FsAction.WRITE);
+ }
+ INodeFile file = INodeFile.valueOf(iip.getLastINode(), src);
++
++ // not support truncating file with striped blocks
++ if (file.isStriped()) {
++ throw new UnsupportedOperationException(
++ "Cannot truncate file with striped block " + src);
++ }
++
+ final BlockStoragePolicy lpPolicy = fsd.getBlockManager()
+ .getStoragePolicy("LAZY_PERSIST");
+
+ if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) {
+ throw new UnsupportedOperationException(
+ "Cannot truncate lazy persist file " + src);
+ }
+
+ // Check if the file is already being truncated with the same length
+ final BlockInfo last = file.getLastBlock();
+ if (last != null && last.getBlockUCState()
+ == BlockUCState.UNDER_RECOVERY) {
+ final Block truncatedBlock = ((BlockInfoContiguousUnderConstruction) last)
+ .getTruncateBlock();
+ if (truncatedBlock != null) {
+ final long truncateLength = file.computeFileSize(false, false)
+ + truncatedBlock.getNumBytes();
+ if (newLength == truncateLength) {
+ return new TruncateResult(false, fsd.getAuditFileInfo(iip));
+ }
+ }
+ }
+
+ // Opening an existing file for truncate. May need lease recovery.
+ fsn.recoverLeaseInternal(RecoverLeaseOp.TRUNCATE_FILE, iip, src,
+ clientName, clientMachine, false);
+ // Truncate length check.
+ long oldLength = file.computeFileSize();
+ if (oldLength == newLength) {
+ return new TruncateResult(true, fsd.getAuditFileInfo(iip));
+ }
+ if (oldLength < newLength) {
+ throw new HadoopIllegalArgumentException(
+ "Cannot truncate to a larger file size. Current size: " + oldLength
+ + ", truncate size: " + newLength + ".");
+ }
+ // Perform INodeFile truncation.
+ final QuotaCounts delta = new QuotaCounts.Builder().build();
+ onBlockBoundary = unprotectedTruncate(fsn, iip, newLength,
+ toRemoveBlocks, mtime, delta);
+ if (!onBlockBoundary) {
+ // Open file for write, but don't log into edits
+ long lastBlockDelta = file.computeFileSize() - newLength;
+ assert lastBlockDelta > 0 : "delta is 0 only if on block bounday";
+ truncateBlock = prepareFileForTruncate(fsn, iip, clientName,
+ clientMachine, lastBlockDelta, null);
+ }
+
+ // update the quota: use the preferred block size for UC block
+ fsd.updateCountNoQuotaCheck(iip, iip.length() - 1, delta);
+ } finally {
+ fsd.writeUnlock();
+ }
+
+ fsn.getEditLog().logTruncate(src, clientName, clientMachine, newLength,
+ mtime, truncateBlock);
+ return new TruncateResult(onBlockBoundary, fsd.getAuditFileInfo(iip));
+ }
+
+ /**
+ * Unprotected truncate implementation. Unlike
+ * {@link FSDirTruncateOp#truncate}, this will not schedule block recovery.
+ *
+ * @param fsn namespace
+ * @param src path name
+ * @param clientName client name
+ * @param clientMachine client machine info
+ * @param newLength the target file size
+ * @param mtime modified time
+ * @param truncateBlock truncate block
+ * @throws IOException
+ */
+ static void unprotectedTruncate(final FSNamesystem fsn, final String src,
+ final String clientName, final String clientMachine,
+ final long newLength, final long mtime, final Block truncateBlock)
+ throws UnresolvedLinkException, QuotaExceededException,
+ SnapshotAccessControlException, IOException {
+ assert fsn.hasWriteLock();
+
+ FSDirectory fsd = fsn.getFSDirectory();
+ INodesInPath iip = fsd.getINodesInPath(src, true);
+ INodeFile file = iip.getLastINode().asFile();
+ BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
+ boolean onBlockBoundary = unprotectedTruncate(fsn, iip, newLength,
+ collectedBlocks, mtime, null);
+
+ if (!onBlockBoundary) {
+ BlockInfo oldBlock = file.getLastBlock();
+ Block tBlk = prepareFileForTruncate(fsn, iip, clientName, clientMachine,
+ file.computeFileSize() - newLength, truncateBlock);
+ assert Block.matchingIdAndGenStamp(tBlk, truncateBlock) &&
+ tBlk.getNumBytes() == truncateBlock.getNumBytes() :
+ "Should be the same block.";
+ if (oldBlock.getBlockId() != tBlk.getBlockId()
+ && !file.isBlockInLatestSnapshot(oldBlock)) {
+ fsd.getBlockManager().removeBlockFromMap(oldBlock);
+ }
+ }
+ assert onBlockBoundary == (truncateBlock == null) :
+ "truncateBlock is null iff on block boundary: " + truncateBlock;
+ fsn.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
+ }
+
+ /**
+ * Convert current INode to UnderConstruction. Recreate lease. Create new
+ * block for the truncated copy. Schedule truncation of the replicas.
+ *
+ * @param fsn namespace
+ * @param iip inodes in the path containing the file
+ * @param leaseHolder lease holder
+ * @param clientMachine client machine info
+ * @param lastBlockDelta last block delta size
+ * @param newBlock new block
+ * @return the returned block will be written to editLog and passed back
+ * into this method upon loading.
+ * @throws IOException
+ */
+ @VisibleForTesting
+ static Block prepareFileForTruncate(FSNamesystem fsn, INodesInPath iip,
+ String leaseHolder, String clientMachine, long lastBlockDelta,
+ Block newBlock) throws IOException {
+ assert fsn.hasWriteLock();
+
+ INodeFile file = iip.getLastINode().asFile();
++ assert !file.isStriped();
+ file.recordModification(iip.getLatestSnapshotId());
+ file.toUnderConstruction(leaseHolder, clientMachine);
+ assert file.isUnderConstruction() : "inode should be under construction.";
+ fsn.getLeaseManager().addLease(
+ file.getFileUnderConstructionFeature().getClientName(), file.getId());
+ boolean shouldRecoverNow = (newBlock == null);
+ BlockInfo oldBlock = file.getLastBlock();
++
+ boolean shouldCopyOnTruncate = shouldCopyOnTruncate(fsn, file, oldBlock);
+ if (newBlock == null) {
- newBlock = (shouldCopyOnTruncate) ? fsn.createNewBlock() : new Block(
- oldBlock.getBlockId(), oldBlock.getNumBytes(),
++ newBlock = (shouldCopyOnTruncate) ? fsn.createNewBlock(false)
++ : new Block(oldBlock.getBlockId(), oldBlock.getNumBytes(),
+ fsn.nextGenerationStamp(fsn.getBlockIdManager().isLegacyBlock(
+ oldBlock)));
+ }
+
+ BlockInfoContiguousUnderConstruction truncatedBlockUC;
+ BlockManager blockManager = fsn.getFSDirectory().getBlockManager();
+ if (shouldCopyOnTruncate) {
+ // Add new truncateBlock into blocksMap and
+ // use oldBlock as a source for copy-on-truncate recovery
+ truncatedBlockUC = new BlockInfoContiguousUnderConstruction(newBlock,
+ file.getPreferredBlockReplication());
+ truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
+ truncatedBlockUC.setTruncateBlock(oldBlock);
- file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock));
++ file.convertLastBlockToUC(truncatedBlockUC,
++ blockManager.getStorages(oldBlock));
+ blockManager.addBlockCollection(truncatedBlockUC, file);
+
+ NameNode.stateChangeLog.debug(
+ "BLOCK* prepareFileForTruncate: Scheduling copy-on-truncate to new"
+ + " size {} new block {} old block {}",
+ truncatedBlockUC.getNumBytes(), newBlock,
+ truncatedBlockUC.getTruncateBlock());
+ } else {
+ // Use new generation stamp for in-place truncate recovery
+ blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta);
+ oldBlock = file.getLastBlock();
+ assert !oldBlock.isComplete() : "oldBlock should be under construction";
+ truncatedBlockUC = (BlockInfoContiguousUnderConstruction) oldBlock;
+ truncatedBlockUC.setTruncateBlock(new Block(oldBlock));
+ truncatedBlockUC.getTruncateBlock().setNumBytes(
+ oldBlock.getNumBytes() - lastBlockDelta);
+ truncatedBlockUC.getTruncateBlock().setGenerationStamp(
+ newBlock.getGenerationStamp());
+
+ NameNode.stateChangeLog.debug(
+ "BLOCK* prepareFileForTruncate: {} Scheduling in-place block "
+ + "truncate to new size {}", truncatedBlockUC.getTruncateBlock()
+ .getNumBytes(), truncatedBlockUC);
+ }
+ if (shouldRecoverNow) {
+ truncatedBlockUC.initializeBlockRecovery(newBlock.getGenerationStamp());
+ }
+
+ return newBlock;
+ }
+
+ /**
+ * Truncate has the following properties:
+ * 1.) Any block deletions occur now.
+ * 2.) INode length is truncated now - new clients can only read up to
+ * the truncated length.
+ * 3.) INode will be set to UC and lastBlock set to UNDER_RECOVERY.
+ * 4.) NN will trigger DN truncation recovery and waits for DNs to report.
+ * 5.) File is considered UNDER_RECOVERY until truncation recovery
+ * completes.
+ * 6.) Soft and hard Lease expiration require truncation recovery to
+ * complete.
+ *
+ * @return true if on the block boundary or false if recovery is need
+ */
+ private static boolean unprotectedTruncate(FSNamesystem fsn,
+ INodesInPath iip, long newLength, BlocksMapUpdateInfo collectedBlocks,
+ long mtime, QuotaCounts delta) throws IOException {
+ assert fsn.hasWriteLock();
+
+ INodeFile file = iip.getLastINode().asFile();
+ int latestSnapshot = iip.getLatestSnapshotId();
+ file.recordModification(latestSnapshot, true);
+
+ verifyQuotaForTruncate(fsn, iip, file, newLength, delta);
+
+ long remainingLength =
+ file.collectBlocksBeyondMax(newLength, collectedBlocks);
+ file.excludeSnapshotBlocks(latestSnapshot, collectedBlocks);
+ file.setModificationTime(mtime);
+ // return whether on a block boundary
+ return (remainingLength - newLength) == 0;
+ }
+
+ private static void verifyQuotaForTruncate(FSNamesystem fsn,
+ INodesInPath iip, INodeFile file, long newLength, QuotaCounts delta)
+ throws QuotaExceededException {
+ FSDirectory fsd = fsn.getFSDirectory();
+ if (!fsn.isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
+ // Do not check quota if edit log is still being processed
+ return;
+ }
+ final BlockStoragePolicy policy = fsd.getBlockStoragePolicySuite()
+ .getPolicy(file.getStoragePolicyID());
+ file.computeQuotaDeltaForTruncate(newLength, policy, delta);
+ fsd.readLock();
+ try {
+ FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null);
+ } finally {
+ fsd.readUnlock();
+ }
+ }
+
+ /**
+ * Defines if a replica needs to be copied on truncate or
+ * can be truncated in place.
+ */
+ private static boolean shouldCopyOnTruncate(FSNamesystem fsn, INodeFile file,
+ BlockInfo blk) {
+ if (!fsn.isUpgradeFinalized()) {
+ return true;
+ }
+ if (fsn.isRollingUpgrade()) {
+ return true;
+ }
+ return file.isBlockInLatestSnapshot(blk);
+ }
+
+ /**
+ * Result of truncate operation.
+ */
+ static class TruncateResult {
+ private final boolean result;
+ private final HdfsFileStatus stat;
+
+ public TruncateResult(boolean result, HdfsFileStatus stat) {
+ this.result = result;
+ this.stat = stat;
+ }
+
+ /**
+ * @return true if client does not need to wait for block recovery,
+ * false if client needs to wait for block recovery.
+ */
+ boolean getResult() {
+ return result;
+ }
+
+ /**
+ * @return file information.
+ */
+ HdfsFileStatus getFileStatus() {
+ return stat;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 8f4f51a,3d30a19..b9466f6
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@@ -45,9 -43,7 +45,9 @@@ import org.apache.hadoop.hdfs.protocol.
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
- import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous;
- import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionStriped;
+ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
++import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@@ -527,31 -515,17 +526,31 @@@ class FSDirWriteFileOp
final INodeFile fileINode = inodesInPath.getLastINode().asFile();
Preconditions.checkState(fileINode.isUnderConstruction());
- // check quota limits and updated space consumed
- fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
- fileINode.getPreferredBlockReplication(), true);
-
// associate new last block for the file
- BlockInfoContiguousUnderConstruction blockInfo =
- new BlockInfoContiguousUnderConstruction(
- block,
- fileINode.getFileReplication(),
- HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
+ final BlockInfo blockInfo;
+ if (isStriped) {
+ ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
+ fsd.getFSNamesystem(), inodesInPath);
+ ErasureCodingPolicy ecPolicy = ecZone.getErasureCodingPolicy();
+ short numDataUnits = (short) ecPolicy.getNumDataUnits();
+ short numParityUnits = (short) ecPolicy.getNumParityUnits();
+ short numLocations = (short) (numDataUnits + numParityUnits);
+
+ // check quota limits and updated space consumed
+ fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
+ numLocations, true);
- blockInfo = new BlockInfoUnderConstructionStriped(block, ecPolicy,
++ blockInfo = new BlockInfoStripedUnderConstruction(block, ecPolicy,
+ HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
+ } else {
+ // check quota limits and updated space consumed
+ fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
+ fileINode.getPreferredBlockReplication(), true);
+
+ short numLocations = fileINode.getFileReplication();
- blockInfo = new BlockInfoUnderConstructionContiguous(block,
++ blockInfo = new BlockInfoContiguousUnderConstruction(block,
+ numLocations, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
targets);
+ }
fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
fileINode.addBlock(blockInfo);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index b5b7178,3dd076d..008a327
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@@ -36,20 -36,16 +36,20 @@@ import org.apache.hadoop.classification
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
- import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionStriped;
++import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
- import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous;
+ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@@@ -988,17 -969,10 +988,17 @@@ public class FSEditLogLoader
Preconditions.checkState(oldBlocks == null || oldBlocks.length == 0);
}
// add the new block
- BlockInfo newBI = new BlockInfoContiguousUnderConstruction(
- newBlock, file.getPreferredBlockReplication());
- fsNamesys.getBlockManager().addBlockCollection(newBI, file);
- file.addBlock(newBI);
+ final BlockInfo newBlockInfo;
+ boolean isStriped = ecZone != null;
+ if (isStriped) {
- newBlockInfo = new BlockInfoUnderConstructionStriped(newBlock,
++ newBlockInfo = new BlockInfoStripedUnderConstruction(newBlock,
+ ecZone.getErasureCodingPolicy());
+ } else {
- newBlockInfo = new BlockInfoUnderConstructionContiguous(newBlock,
++ newBlockInfo = new BlockInfoContiguousUnderConstruction(newBlock,
+ file.getPreferredBlockReplication());
+ }
+ fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBlockInfo, file);
+ file.addBlock(newBlockInfo);
fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
}
@@@ -1076,13 -1049,8 +1076,13 @@@
// TODO: shouldn't this only be true for the last block?
// what about an old-version fsync() where fsync isn't called
// until several blocks in?
- newBI = new BlockInfoContiguousUnderConstruction(
- newBlock, file.getPreferredBlockReplication());
+ if (isStriped) {
- newBI = new BlockInfoUnderConstructionStriped(newBlock,
++ newBI = new BlockInfoStripedUnderConstruction(newBlock,
+ ecZone.getErasureCodingPolicy());
+ } else {
- newBI = new BlockInfoUnderConstructionContiguous(newBlock,
++ newBI = new BlockInfoContiguousUnderConstruction(newBlock,
+ file.getPreferredBlockReplication());
+ }
} else {
// OP_CLOSE should add finalized blocks. This code path
// is only executed when loading edits written by prior
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index 2e490e7,30517d0..e7c87d6
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@@ -778,9 -776,9 +778,9 @@@ public class FSImageFormat
clientMachine = FSImageSerialization.readString(in);
// convert the last block to BlockUC
if (blocks.length > 0) {
- BlockInfo lastBlk = blocks[blocks.length - 1];
- blocks[blocks.length - 1] = new BlockInfoContiguousUnderConstruction(
- lastBlk, replication);
+ Block lastBlk = blocks[blocks.length - 1];
+ blocks[blocks.length - 1] =
- new BlockInfoUnderConstructionContiguous(lastBlk, replication);
++ new BlockInfoContiguousUnderConstruction(lastBlk, replication);
}
}
}
@@@ -1144,7 -1141,7 +1144,7 @@@
+ " option to automatically rename these paths during upgrade.";
/**
-- * Same as {@link #renameReservedPathsOnUpgrade(String)}, but for a single
++ * Same as {@link #renameReservedPathsOnUpgrade}, but for a single
* byte array path component.
*/
private static byte[] renameReservedComponentOnUpgrade(byte[] component,
@@@ -1164,7 -1161,7 +1164,7 @@@
}
/**
-- * Same as {@link #renameReservedPathsOnUpgrade(String)}, but for a single
++ * Same as {@link #renameReservedPathsOnUpgrade}, but for a single
* byte array path component.
*/
private static byte[] renameReservedRootComponentOnUpgrade(byte[] component,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index 653bd4a,e8378e5..51b04d0
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@@ -45,9 -44,7 +45,9 @@@ import org.apache.hadoop.hdfs.protocol.
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
- import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous;
+ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
- import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionStriped;
++import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SaverContext;
@@@ -366,15 -364,8 +375,15 @@@ public final class FSImageFormatPBINod
if (blocks.length > 0) {
BlockInfo lastBlk = file.getLastBlock();
// replace the last block of file
- file.setBlock(file.numBlocks() - 1, new BlockInfoContiguousUnderConstruction(
- lastBlk, replication));
+ final BlockInfo ucBlk;
+ if (isStriped) {
+ BlockInfoStriped striped = (BlockInfoStriped) lastBlk;
- ucBlk = new BlockInfoUnderConstructionStriped(striped, ecPolicy);
++ ucBlk = new BlockInfoStripedUnderConstruction(striped, ecPolicy);
+ } else {
- ucBlk = new BlockInfoUnderConstructionContiguous(lastBlk,
++ ucBlk = new BlockInfoContiguousUnderConstruction(lastBlk,
+ replication);
+ }
+ file.setBlock(file.numBlocks() - 1, ucBlk);
}
}
return file;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
index d87378c,f71cf0b..af3f813
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
@@@ -33,8 -33,9 +33,8 @@@ import org.apache.hadoop.hdfs.protocol.
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
- import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous;
+ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
@@@ -138,10 -137,9 +138,10 @@@ public class FSImageSerialization
// last block is UNDER_CONSTRUCTION
if(numBlocks > 0) {
blk.readFields(in);
- blocksContiguous[i] = new BlockInfoUnderConstructionContiguous(
- blocks[i] = new BlockInfoContiguousUnderConstruction(
- blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
++ blocksContiguous[i] = new BlockInfoContiguousUnderConstruction(
+ blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
}
+
PermissionStatus perm = PermissionStatus.read(in);
String clientName = readString(in);
String clientMachine = readString(in);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 657f29d,d34242c..a7107d7
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@@ -168,10 -166,8 +165,9 @@@ import org.apache.hadoop.hdfs.DFSUtil
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException;
- import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
@@@ -207,10 -199,7 +203,10 @@@ import org.apache.hadoop.hdfs.security.
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
- import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous;
+ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@@@ -4240,8 -3714,9 +3775,9 @@@ public class FSNamesystem implements Na
while (it.hasNext()) {
Block b = it.next();
- BlockInfo blockInfo = blockManager.getStoredBlock(b);
+ BlockInfo blockInfo = getStoredBlock(b);
- if (blockInfo.getBlockCollection().getStoragePolicyID() == lpPolicy.getId()) {
+ if (blockInfo.getBlockCollection().getStoragePolicyID()
+ == lpPolicy.getId()) {
filesToDelete.add(blockInfo.getBlockCollection());
}
}
@@@ -6670,8 -6142,8 +6221,9 @@@
public void setFSDirectory(FSDirectory dir) {
this.dir = dir;
}
+
/** @return the cache manager. */
+ @Override
public CacheManager getCacheManager() {
return cacheManager;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 25415ef,3f242e0..13f180a
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@@ -39,11 -37,10 +39,12 @@@ import org.apache.hadoop.hdfs.protocol.
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiff;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshotFeature;
@@@ -445,16 -413,6 +446,15 @@@ public class INodeFile extends INodeWit
setStoragePolicyID(storagePolicyId);
}
-
+ /**
+ * @return true if the file is in the striping layout.
+ */
+ @VisibleForTesting
+ @Override
+ public boolean isStriped() {
+ return HeaderFormat.isStriped(header);
+ }
+
@Override // INodeFileAttributes
public long getHeaderLong() {
return header;
@@@ -483,9 -439,9 +483,8 @@@
snapshotBlocks = getDiffs().findLaterSnapshotBlocks(snapshot);
return (snapshotBlocks == null) ? getBlocks() : snapshotBlocks;
}
--
- /** Used during concat to update the BlockCollection for each block */
- void updateBlockCollection() {
+ /** Used during concat to update the BlockCollection for each block. */
+ private void updateBlockCollection() {
if (blocks != null) {
for(BlockInfo b : blocks) {
b.setBlockCollection(this);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
index f93218f,3a5dc12..2943fc2
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
@@@ -63,17 -78,16 +78,17 @@@ public class NameNodeLayoutVersion
* </ul>
*/
public static enum Feature implements LayoutFeature {
- ROLLING_UPGRADE(-55, -53, "Support rolling upgrade", false),
- EDITLOG_LENGTH(-56, "Add length field to every edit log op"),
- XATTRS(-57, "Extended attributes"),
- CREATE_OVERWRITE(-58, "Use single editlog record for " +
+ ROLLING_UPGRADE(-55, -53, -55, "Support rolling upgrade", false),
+ EDITLOG_LENGTH(-56, -56, "Add length field to every edit log op"),
+ XATTRS(-57, -57, "Extended attributes"),
+ CREATE_OVERWRITE(-58, -58, "Use single editlog record for " +
"creating file with overwrite"),
- XATTRS_NAMESPACE_EXT(-59, "Increase number of xattr namespaces"),
- BLOCK_STORAGE_POLICY(-60, "Block Storage policy"),
- TRUNCATE(-61, "Truncate"),
- APPEND_NEW_BLOCK(-62, "Support appending to new block"),
- QUOTA_BY_STORAGE_TYPE(-63, "Support quota for specific storage types"),
- ERASURE_CODING(-64, "Support erasure coding");
+ XATTRS_NAMESPACE_EXT(-59, -59, "Increase number of xattr namespaces"),
+ BLOCK_STORAGE_POLICY(-60, -60, "Block Storage policy"),
+ TRUNCATE(-61, -61, "Truncate"),
+ APPEND_NEW_BLOCK(-62, -61, "Support appending to new block"),
- QUOTA_BY_STORAGE_TYPE(-63, -61, "Support quota for specific storage types");
++ QUOTA_BY_STORAGE_TYPE(-63, -61, "Support quota for specific storage types"),
++ ERASURE_CODING(-64, -61, "Support erasure coding");
private final FeatureInfo info;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6b6a63bb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index 3a2c3d4,7d4cd7e..9d43c15
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@@ -455,17 -442,11 +455,17 @@@ public class NamenodeFsck implements Da
if (blocks == null) { // the file is deleted
return;
}
- collectFileSummary(path, file, res, blocks);
- collectBlocksSummary(parent, file, res, blocks);
+
- final Result r = file.getReplication() == 0? ecRes: replRes;
++ final Result r = file.getErasureCodingPolicy() != null ? ecRes: replRes;
+ collectFileSummary(path, file, r, blocks);
+ if (showprogress && (replRes.totalFiles + ecRes.totalFiles) % 100 == 0) {
+ out.println();
+ out.flush();
+ }
+ collectBlocksSummary(parent, file, r, blocks);
}
- private void checkDir(String path, Result res) throws IOException {
+ private void checkDir(String path, Result replRes, Result ecRes) throws IOException {
if (snapshottableDirs != null && snapshottableDirs.contains(path)) {
String snapshotPath = (path.endsWith(Path.SEPARATOR) ? path : path
+ Path.SEPARATOR)