You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2016/01/05 20:52:10 UTC
[10/50] [abbrv] hadoop git commit: [partial-ns] Implement
getAdditionalBlock().
[partial-ns] Implement getAdditionalBlock().
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6a36f5bc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6a36f5bc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6a36f5bc
Branch: refs/heads/feature-HDFS-8286
Commit: 6a36f5bcc4edfba4e9779b0fd19923179138b551
Parents: e9c9c72b
Author: Haohui Mai <wh...@apache.org>
Authored: Thu May 14 17:02:18 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Jun 12 13:56:57 2015 -0700
----------------------------------------------------------------------
.../server/blockmanagement/BlockManager.java | 109 +++++---
.../hdfs/server/blockmanagement/BlocksMap.java | 8 +-
.../hdfs/server/namenode/FSDirWriteFileOp.java | 247 ++++++++++---------
.../hadoop/hdfs/server/namenode/FSEditLog.java | 11 +-
.../hdfs/server/namenode/FSNamesystem.java | 73 +++++-
.../hdfs/server/namenode/RWTransaction.java | 4 +
.../hadoop/hdfs/server/namenode/Resolver.java | 4 +
7 files changed, 298 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 6139e37..9b18f45 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBloc
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.namenode.FlatINodeFileFeature;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
@@ -638,17 +640,42 @@ public class BlockManager {
}
/**
- * Convert a specified block of the file to a complete block.
- * @param bc file
- * @param blkIndex block index in the file
- * @throws IOException if the block does not have at least a minimal number
- * of replicas reported from data-nodes.
+ * Commit or complete the last block. Return the length of the last block
*/
+ public long commitOrCompleteLastBlock(FlatINodeFileFeature file, Block
+ commitBlock) throws IOException {
+ if(commitBlock == null)
+ return 0; // not committing, this is a block allocation retry
+ BlockInfoContiguous lastBlock = getStoredBlock(file.lastBlock());
+ if(lastBlock == null)
+ return 0; // no blocks in file yet
+ if(lastBlock.isComplete())
+ return lastBlock.getNumBytes(); // already completed (e.g. by syncBlock)
+
+ commitBlock((BlockInfoContiguousUnderConstruction) lastBlock, commitBlock);
+ if(countNodes(lastBlock).liveReplicas() >= minReplication) {
+ return completeBlock(lastBlock, false).getNumBytes();
+ }
+ return lastBlock.getNumBytes();
+ }
+
private BlockInfoContiguous completeBlock(final BlockCollection bc,
final int blkIndex, boolean force) throws IOException {
if(blkIndex < 0)
return null;
- BlockInfoContiguous curBlock = bc.getBlocks()[blkIndex];
+ BlockInfoContiguous block = completeBlock(bc.getBlocks()[blkIndex], force);
+ // replace penultimate block in file
+ bc.setBlock(blkIndex, block);
+ return block;
+ }
+
+ /**
+ * Convert a specified block of the file to a complete block.
+ * @throws IOException if the block does not have at least a minimal number
+ * of replicas reported from data-nodes.
+ */
+ private BlockInfoContiguous completeBlock(BlockInfoContiguous curBlock,
+ boolean force) throws IOException {
if(curBlock.isComplete())
return curBlock;
BlockInfoContiguousUnderConstruction ucBlock =
@@ -661,9 +688,7 @@ public class BlockManager {
throw new IOException(
"Cannot complete block: block has not been COMMITTED by the client");
BlockInfoContiguous completeBlock = ucBlock.convertToCompleteBlock();
- // replace penultimate block in file
- bc.setBlock(blkIndex, completeBlock);
-
+
// Since safe-mode only counts complete blocks, and we now have
// one more complete block, we need to adjust the total up, and
// also count it as safe, if we have at least the minimum replica
@@ -2800,10 +2825,10 @@ public class BlockManager {
processOverReplicatedBlock(block, expectedReplication, null, null);
return MisReplicationResult.OVER_REPLICATED;
}
-
+
return MisReplicationResult.OK;
}
-
+
/** Set replication for the blocks. */
public void setReplication(final short oldRepl, final short newRepl,
final String src, final Block b) {
@@ -2812,7 +2837,7 @@ public class BlockManager {
}
// update needReplication priority queues
- updateNeededReplications(b, 0, newRepl-oldRepl);
+ updateNeededReplications(b, 0, newRepl - oldRepl);
if (oldRepl > newRepl) {
// old replication > the new one; need to remove copies
@@ -2845,8 +2870,7 @@ public class BlockManager {
if (storage.areBlockContentsStale()) {
LOG.info("BLOCK* processOverReplicatedBlock: " +
"Postponing processing of over-replicated " +
- block + " since storage + " + storage
- + "datanode " + cur + " does not yet have up-to-date " +
+ block + " since storage + " + storage + "datanode " + cur + " does not yet have up-to-date " +
"block information.");
postponeBlock(block);
return;
@@ -2862,8 +2886,8 @@ public class BlockManager {
}
}
}
- chooseExcessReplicates(nonExcess, block, replication,
- addedNode, delNodeHint, blockplacement);
+ chooseExcessReplicates(nonExcess, block, replication, addedNode,
+ delNodeHint, blockplacement);
}
@@ -2969,15 +2993,17 @@ public class BlockManager {
private void addToExcessReplicate(DatanodeInfo dn, Block block) {
assert namesystem.hasWriteLock();
- LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid());
+ LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(
+ dn.getDatanodeUuid());
if (excessBlocks == null) {
excessBlocks = new LightWeightLinkedSet<Block>();
excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks);
}
if (excessBlocks.add(block)) {
excessBlocksCount.incrementAndGet();
- blockLog.debug("BLOCK* addToExcessReplicate: ({}, {}) is added to"
- + " excessReplicateMap", dn, block);
+ blockLog.debug(
+ "BLOCK* addToExcessReplicate: ({}, {}) is added to" + " excessReplicateMap",
+ dn, block);
}
}
@@ -2985,8 +3011,7 @@ public class BlockManager {
DatanodeDescriptor node) {
if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(block)) {
- queueReportedBlock(storageInfo, block, null,
- QUEUE_REASON_FUTURE_GENSTAMP);
+ queueReportedBlock(storageInfo, block, null, QUEUE_REASON_FUTURE_GENSTAMP);
return;
}
removeStoredBlock(block, node);
@@ -3093,7 +3118,7 @@ public class BlockManager {
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode);
}
-
+
private void processAndHandleReportedBlock(
DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, DatanodeDescriptor delHintNode)
@@ -3400,21 +3425,44 @@ public class BlockManager {
}
}
+ public boolean checkBlocksProperlyReplicated(String src,
+ final BlockInfoContiguous[] blocks) {
+ return checkBlocksProperlyReplicated(src, new Iterable<Block>() {
+ @Override
+ public Iterator<Block> iterator() {
+ return new Iterator<Block>() {
+ private int index;
+ @Override
+ public boolean hasNext() {
+ return index < blocks.length;
+ }
+
+ @Override
+ public Block next() {
+ return blocks[index++];
+ }
+ };
+ }
+ });
+ }
+
/**
* Check that the indicated blocks are present and
* replicated.
*/
- public boolean checkBlocksProperlyReplicated(
- String src, BlockInfoContiguous[] blocks) {
- for (BlockInfoContiguous b: blocks) {
+ public boolean checkBlocksProperlyReplicated(String src, Iterable<Block>
+ blocks) {
+ for (Block bid : blocks) {
+ BlockInfoContiguous b = bid instanceof BlockInfoContiguous
+ ? (BlockInfoContiguous) bid : getStoredBlock(bid);
if (!b.isComplete()) {
final BlockInfoContiguousUnderConstruction uc =
(BlockInfoContiguousUnderConstruction)b;
final int numNodes = b.numNodes();
LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = "
- + uc.getBlockUCState() + ", replication# = " + numNodes
- + (numNodes < minReplication ? " < ": " >= ")
- + " minimum = " + minReplication + ") in file " + src);
+ + uc.getBlockUCState() + ", replication# = " + numNodes
+ + (numNodes < minReplication ? " < ": " >= ")
+ + " minimum = " + minReplication + ") in file " + src);
return false;
}
}
@@ -3527,6 +3575,11 @@ public class BlockManager {
return blocksMap.addBlockCollection(block, bc);
}
+ public BlockInfoContiguous addBlockCollection(BlockInfoContiguous block,
+ long bcId) {
+ return blocksMap.addBlockCollection(block, bcId);
+ }
+
public long getBlockCollectionId(Block b) {
BlockInfoContiguous bi = getStoredBlock(b);
return bi == null ? INodeId.INVALID_INODE_ID : bi.getBlockCollectionId();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index 9a1dc29..e18e384 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@ -97,16 +97,20 @@ class BlocksMap {
}
}
+ BlockInfoContiguous addBlockCollection(BlockInfoContiguous b, BlockCollection bc) {
+ return addBlockCollection(b, bc.getId());
+ }
+
/**
* Add block b belonging to the specified block collection to the map.
*/
- BlockInfoContiguous addBlockCollection(BlockInfoContiguous b, BlockCollection bc) {
+ BlockInfoContiguous addBlockCollection(BlockInfoContiguous b, long bcId) {
BlockInfoContiguous info = blocks.get(b);
if (info != b) {
info = b;
blocks.put(info);
}
- info.setBlockCollectionId(bc.getId());
+ info.setBlockCollectionId(bcId);
return info;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 33e31e7..a136b1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.PermissionStatus;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
@@ -173,34 +175,34 @@ class FSDirWriteFileOp {
final byte storagePolicyID;
String clientMachine;
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
- src = fsn.dir.resolvePath(pc, src, pathComponents);
- FileState fileState = analyzeFileState(fsn, src, fileId, clientName,
- previous, onRetryBlock);
- final INodeFile pendingFile = fileState.inode;
- // Check if the penultimate block is minimally replicated
- if (!fsn.checkFileProgress(src, pendingFile, false)) {
- throw new NotReplicatedYetException("Not replicated yet: " + src);
- }
+ FSDirectory fsd = fsn.getFSDirectory();
+ try (ROTransaction tx = fsd.newROTransaction().begin()) {
+ FileState fileState = analyzeFileState(tx, fsn, src, fileId, clientName,
+ previous, onRetryBlock);
+ final FlatINode pendingFile = fileState.inode;
+ FlatINodeFileFeature f = pendingFile.feature(FlatINodeFileFeature.class);
+ // Check if the penultimate block is minimally replicated
+ if (!fsn.checkFileProgress(src, f, false)) {
+ throw new NotReplicatedYetException("Not replicated yet: " + src);
+ }
- if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
- // This is a retry. No need to generate new locations.
- // Use the last block if it has locations.
- return null;
- }
- if (pendingFile.getBlocks().length >= fsn.maxBlocksPerFile) {
- throw new IOException("File has reached the limit on maximum number of"
- + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
- + "): " + pendingFile.getBlocks().length + " >= "
- + fsn.maxBlocksPerFile);
+ if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
+ // This is a retry. No need to generate new locations.
+ // Use the last block if it has locations.
+ return null;
+ }
+ if (f.numBlocks() >= fsn.maxBlocksPerFile) {
+ throw new IOException("File has reached the limit on maximum number of"
+ + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
+ + "): " + f.numBlocks() + " >= " + fsn.maxBlocksPerFile);
+ }
+ blockSize = f.blockSize();
+ clientMachine = f.clientMachine();
+ replication = f.replication();
+ storagePolicyID = f.storagePolicyId();
+ return new ValidateAddBlockResult(blockSize, replication, storagePolicyID,
+ clientMachine);
}
- blockSize = pendingFile.getPreferredBlockSize();
- clientMachine = pendingFile.getFileUnderConstructionFeature()
- .getClientMachine();
- replication = pendingFile.getFileReplication();
- storagePolicyID = pendingFile.getStoragePolicyID();
- return new ValidateAddBlockResult(blockSize, replication, storagePolicyID,
- clientMachine);
}
static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk,
@@ -226,39 +228,50 @@ class FSDirWriteFileOp {
// Run the full analysis again, since things could have changed
// while chooseTarget() was executing.
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
- FileState fileState = analyzeFileState(fsn, src, fileId, clientName,
- previous, onRetryBlock);
- final INodeFile pendingFile = fileState.inode;
- src = fileState.path;
-
- if (onRetryBlock[0] != null) {
- if (onRetryBlock[0].getLocations().length > 0) {
- // This is a retry. Just return the last block if having locations.
- return onRetryBlock[0];
- } else {
- // add new chosen targets to already allocated block and return
- BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
- ((BlockInfoContiguousUnderConstruction) lastBlockInFile)
- .setExpectedLocations(targets);
- offset = pendingFile.computeFileSize();
- return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
+ FSDirectory fsd = fsn.getFSDirectory();
+ BlockManager bm = fsn.getBlockManager();
+ try (RWTransaction tx = fsd.newRWTransaction().begin()) {
+ FileState fileState = analyzeFileState(tx, fsn, src, fileId, clientName,
+ previous, onRetryBlock);
+ final FlatINode inode = fileState.inode;
+ FlatINodeFileFeature file = inode.feature(FlatINodeFileFeature.class);
+ src = fileState.path;
+
+ if (onRetryBlock[0] != null) {
+ if (onRetryBlock[0].getLocations().length > 0) {
+ // This is a retry. Just return the last block if having locations.
+ return onRetryBlock[0];
+ } else {
+ // add new chosen targets to already allocated block and return
+ Block lastBlock = file.lastBlock();
+ BlockInfoContiguous lastBlockInFile = bm.getStoredBlock(lastBlock);
+ ((BlockInfoContiguousUnderConstruction) lastBlockInFile)
+ .setExpectedLocations(targets);
+ offset = file.fileSize();
+ return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
+ }
}
- }
- // commit the last block and complete it if it has minimum replicas
- fsn.commitOrCompleteLastBlock(pendingFile, fileState.iip,
- ExtendedBlock.getLocalBlock(previous));
+ // commit the last block and complete it if it has minimum replicas
+ FlatINodeFileFeature.Builder newFile = fsn.commitOrCompleteLastBlock(
+ file, ExtendedBlock.getLocalBlock(previous));
- // allocate new block, record block locations in INode.
- Block newBlock = fsn.createNewBlock();
- INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
- saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets);
+ // allocate new block, record block locations in INode.
+ Block newBlock = fsn.createNewBlock();
+ saveAllocatedBlock(fsn, src, inode, newBlock, targets);
+ FlatINode newInode = persistNewBlock(tx, src, inode, newFile, newBlock);
+ offset = newInode.<FlatINodeFileFeature>feature(
+ FlatINodeFileFeature.class).fileSize();
- persistNewBlock(fsn, src, pendingFile);
- offset = pendingFile.computeFileSize();
+ // TODO: Update quota
+ // check quota limits and updated space consumed
+// fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
+// fileINode.getFileReplication(), true);
- // Return located block
- return makeLocatedBlock(fsn, newBlock, targets, offset);
+ tx.commit();
+ // Return located block
+ return makeLocatedBlock(fsn, newBlock, targets, offset);
+ }
}
static DatanodeStorageInfo[] chooseTargetForNewBlock(
@@ -553,72 +566,56 @@ class FSDirWriteFileOp {
* Add a block to the file. Returns a reference to the added block.
*/
private static BlockInfoContiguous addBlock(
- FSDirectory fsd, String path, INodesInPath inodesInPath, Block block,
+ BlockManager bm, String path, FlatINode inode, Block block,
DatanodeStorageInfo[] targets) throws IOException {
- fsd.writeLock();
- try {
- final INodeFile fileINode = inodesInPath.getLastINode().asFile();
- Preconditions.checkState(fileINode.isUnderConstruction());
-
- // check quota limits and updated space consumed
- fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
- fileINode.getFileReplication(), true);
-
- // associate new last block for the file
- BlockInfoContiguousUnderConstruction blockInfo =
+ FlatINodeFileFeature f = inode.feature(FlatINodeFileFeature.class);
+ Preconditions.checkState(f.inConstruction());
+ // associate new last block for the file
+ BlockInfoContiguousUnderConstruction blockInfo =
new BlockInfoContiguousUnderConstruction(
- block,
- fileINode.getFileReplication(),
+ block, f.replication(),
HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
targets);
- fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
- fileINode.addBlock(blockInfo);
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: "
- + path + " with " + block
- + " block is added to the in-memory "
- + "file system");
- }
- return blockInfo;
- } finally {
- fsd.writeUnlock();
+ bm.addBlockCollection(blockInfo, inode.id());
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: " + path
+ + " with " + block + " block is added to the in-memory file system");
}
+ return blockInfo;
}
private static FileState analyzeFileState(
- FSNamesystem fsn, String src, long fileId, String clientName,
+ Transaction tx, FSNamesystem fsn, String src, long fileId, String clientName,
ExtendedBlock previous, LocatedBlock[] onRetryBlock)
throws IOException {
assert fsn.hasReadLock();
-
+ BlockManager bm = fsn.getBlockManager();
checkBlock(fsn, previous);
onRetryBlock[0] = null;
- fsn.checkNameNodeSafeMode("Cannot add block to " + src);
-
- // have we exceeded the configured limit of fs objects.
- fsn.checkFsObjectLimit();
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
- final INode inode;
- final INodesInPath iip;
- if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
+ final Resolver.Result paths;
+ if (true || fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
// Older clients may not have given us an inode ID to work with.
// In this case, we have to try to resolve the path and hope it
// hasn't changed or been deleted since the file was opened for write.
- iip = fsn.dir.getINodesInPath4Write(src);
- inode = iip.getLastINode();
+ paths = Resolver.resolve(tx, src);
} else {
// Newer clients pass the inode ID, so we can just get the inode
// directly.
- inode = fsn.dir.getInode(fileId);
- iip = INodesInPath.fromINode(inode);
- if (inode != null) {
- src = iip.getPath();
- }
+ paths = Resolver.resolveById(tx, fileId);
+ }
+ if (paths.invalidPath()) {
+ throw new InvalidPathException(src);
+ } else if (paths.notFound()) {
+ throw new FileNotFoundException(src);
}
- final INodeFile file = fsn.checkLease(src, clientName, inode, fileId);
- BlockInfoContiguous lastBlockInFile = file.getLastBlock();
+ FlatINode inode = paths.inodesInPath().getLastINode();
+ fsn.checkLease(src, clientName, inode);
+ FlatINodeFileFeature pendingFile = inode.feature(FlatINodeFileFeature.class);
+ BlockInfoContiguous lastBlockInFile = pendingFile.lastBlock() == null ?
+ null : bm.getStoredBlock(pendingFile.lastBlock());
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
// The block that the client claims is the current last block
// doesn't match up with what we think is the last block. There are
@@ -646,10 +643,11 @@ class FSDirWriteFileOp {
// changed the namesystem state yet.
// We run this analysis again in Part II where case 4 is impossible.
- BlockInfoContiguous penultimateBlock = file.getPenultimateBlock();
+ BlockInfoContiguous penultimateBlock = bm.getStoredBlock(
+ pendingFile.penultimateBlock());
if (previous == null &&
lastBlockInFile != null &&
- lastBlockInFile.getNumBytes() >= file.getPreferredBlockSize() &&
+ lastBlockInFile.getNumBytes() >= pendingFile.blockSize() &&
lastBlockInFile.isComplete()) {
// Case 1
if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -668,15 +666,15 @@ class FSDirWriteFileOp {
// Case 2
// Return the last block.
- NameNode.stateChangeLog.info("BLOCK* allocateBlock: caught retry for " +
- "allocation of a new block in " + src + ". Returning previously" +
- " allocated block " + lastBlockInFile);
- long offset = file.computeFileSize();
+ NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
+ "caught retry for allocation of a new block in " +
+ src + ". Returning previously allocated block " + lastBlockInFile);
+ long offset = pendingFile.fileSize();
BlockInfoContiguousUnderConstruction lastBlockUC =
(BlockInfoContiguousUnderConstruction) lastBlockInFile;
onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile,
lastBlockUC.getExpectedStorageLocations(), offset);
- return new FileState(file, src, iip);
+ return new FileState(inode, src);
} else {
// Case 3
throw new IOException("Cannot allocate block in " + src + ": " +
@@ -684,7 +682,7 @@ class FSDirWriteFileOp {
"last block in file " + lastBlockInFile);
}
}
- return new FileState(file, src, iip);
+ return new FileState(inode, src);
}
static boolean completeFile(FSNamesystem fsn, FSPermissionChecker pc,
@@ -779,15 +777,25 @@ class FSDirWriteFileOp {
/**
* Persist the new block (the last block of the given file).
*/
- private static void persistNewBlock(
- FSNamesystem fsn, String path, INodeFile file) {
- Preconditions.checkArgument(file.isUnderConstruction());
- fsn.getEditLog().logAddBlock(path, file);
+ private static FlatINode persistNewBlock(
+ RWTransaction tx, String path, FlatINode inode,
+ FlatINodeFileFeature.Builder newFile, Block newBlock) {
+ Preconditions.checkArgument(newFile.inConstruction());
+ newFile.addBlock(newBlock);
+ FlatINodeFileFeature newFeature = FlatINodeFileFeature.wrap(newFile.build());
+
+ FlatINode.Builder builder = new FlatINode.Builder()
+ .mergeFrom(inode).replaceFeature(newFeature);
+ ByteString newFileBytes = builder.build();
+ FlatINode newInode = FlatINode.wrap(newFileBytes);
+ tx.putINode(inode.id(), newFileBytes);
+ tx.logAddBlock(path, newFeature);
if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("persistNewBlock: "
- + path + " with new block " + file.getLastBlock().toString()
- + ", current total block count is " + file.getBlocks().length);
+ NameNode.stateChangeLog.debug("persistNewBlock: " + path
+ + " with new block " + newBlock + ", current total block count is "
+ + newFeature.numBlocks());
}
+ return newInode;
}
/**
@@ -795,19 +803,18 @@ class FSDirWriteFileOp {
*
* @param fsn FSNamesystem
* @param src path to the file
- * @param inodesInPath representing each of the components of src.
- * The last INode is the INode for {@code src} file.
+ * @param inode the file
* @param newBlock newly allocated block to be save
* @param targets target datanodes where replicas of the new block is placed
* @throws QuotaExceededException If addition of block exceeds space quota
*/
private static void saveAllocatedBlock(
- FSNamesystem fsn, String src, INodesInPath inodesInPath, Block newBlock,
+ FSNamesystem fsn, String src, FlatINode inode, Block newBlock,
DatanodeStorageInfo[] targets)
throws IOException {
assert fsn.hasWriteLock();
- BlockInfoContiguous b = addBlock(fsn.dir, src, inodesInPath, newBlock,
- targets);
+ BlockManager bm = fsn.getBlockManager();
+ BlockInfoContiguous b = addBlock(bm, src, inode, newBlock, targets);
NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src);
DatanodeStorageInfo.incrementBlocksScheduled(targets);
}
@@ -842,14 +849,12 @@ class FSDirWriteFileOp {
}
private static class FileState {
- final INodeFile inode;
+ final FlatINode inode;
final String path;
- final INodesInPath iip;
- FileState(INodeFile inode, String fullPath, INodesInPath iip) {
+ FileState(FlatINode inode, String fullPath) {
this.inode = inode;
this.path = fullPath;
- this.iip = iip;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 370050d..739b2d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -772,12 +772,11 @@ public class FSEditLog implements LogsPurgeable {
logEdit(op);
}
- public void logAddBlock(String path, INodeFile file) {
- Preconditions.checkArgument(file.isUnderConstruction());
- BlockInfoContiguous[] blocks = file.getBlocks();
- Preconditions.checkState(blocks != null && blocks.length > 0);
- BlockInfoContiguous pBlock = blocks.length > 1 ? blocks[blocks.length - 2] : null;
- BlockInfoContiguous lastBlock = blocks[blocks.length - 1];
+ public void logAddBlock(String path, FlatINodeFileFeature file) {
+ Preconditions.checkArgument(file.inConstruction());
+ Preconditions.checkState(file.numBlocks() > 0);
+ Block pBlock = file.penultimateBlock();
+ Block lastBlock = file.lastBlock();
AddBlockOp op = AddBlockOp.getInstance(cache.get()).setPath(path)
.setPenultimateBlock(pBlock).setLastBlock(lastBlock);
logEdit(op);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index ed28547..5c12b50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -2761,6 +2761,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
readLock();
try {
checkOperation(OperationCategory.READ);
+ checkNameNodeSafeMode("Cannot add block to " + src);
r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName,
previous, onRetryBlock);
} finally {
@@ -2781,6 +2782,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
LocatedBlock lb;
try {
checkOperation(OperationCategory.WRITE);
+ checkNameNodeSafeMode("Cannot add block to " + src);
lb = FSDirWriteFileOp.storeAllocatedBlock(
this, src, fileId, clientName, previous, targets);
} finally {
@@ -2919,7 +2921,35 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
return file;
}
-
+
+ void checkLease(String src, String holder, FlatINode inode)
+ throws LeaseExpiredException, FileNotFoundException {
+ assert hasReadLock();
+ final String ident = src;
+ if (inode == null) {
+ throw new FileNotFoundException(src);
+ } else if (!inode.isFile()) {
+ Lease lease = leaseManager.getLease(holder);
+ throw new LeaseExpiredException(
+ "No lease on " + ident + ": INode is not a regular file. "
+ + (lease != null ? lease.toString()
+ : "Holder " + holder + " does not have any open files."));
+ }
+ FlatINodeFileFeature f = inode.feature(FlatINodeFileFeature.class);
+ if (!f.inConstruction()) {
+ Lease lease = leaseManager.getLease(holder);
+ throw new LeaseExpiredException(
+ "No lease on " + ident + ": File is not open for writing. "
+ + (lease != null ? lease.toString()
+ : "Holder " + holder + " does not have any open files."));
+ }
+ String clientName = f.clientName();
+ if (holder != null && !clientName.equals(holder)) {
+ throw new LeaseExpiredException("Lease mismatch on " + ident +
+ " owned by " + clientName + " but is accessed by " + holder);
+ }
+ }
+
/**
* Complete in-progress write to the given file.
* @return true if successful, false if the client should continue to retry
@@ -2977,6 +3007,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
/**
+ * Check that the indicated file's blocks are present and
+ * replicated. If not, return false. If checkall is true, then check
+ * all blocks, otherwise check only penultimate block.
+ */
+ boolean checkFileProgress(String src, FlatINodeFileFeature v,
+ boolean checkall) {
+ assert hasReadLock();
+ if (checkall) {
+ return blockManager.checkBlocksProperlyReplicated(src, v.blocks());
+ } else {
+ // check the penultimate block of this file
+ Block b = v.penultimateBlock();
+ return b == null ||
+ blockManager.checkBlocksProperlyReplicated(src, Lists.newArrayList
+ (b));
+ }
+ }
+
+ /**
* Change the indicated filename.
* @deprecated Use {@link #renameTo(String, String, boolean,
* Options.Rename...)} instead.
@@ -3537,6 +3586,28 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
}
+ FlatINodeFileFeature.Builder commitOrCompleteLastBlock(
+ FlatINodeFileFeature f, Block commitBlock)
+ throws IOException {
+ assert hasWriteLock();
+ Preconditions.checkArgument(f.inConstruction());
+ if (commitBlock == null) {
+ return new FlatINodeFileFeature.Builder().mergeFrom(f);
+ }
+
+ Preconditions.checkState(f.numBlocks() > 0);
+ long newBlockLength =
+ blockManager.commitOrCompleteLastBlock(f, commitBlock);
+ FlatINodeFileFeature.Builder b = new FlatINodeFileFeature.Builder()
+ .mergeFrom(f);
+ Block newBlock = new Block(commitBlock);
+ newBlock.setNumBytes(newBlockLength);
+ b.block(f.numBlocks() - 1, newBlock);
+ return b;
+ // TODO: Update quota
+ // Adjust disk space consumption if required
+ }
+
void finalizeINodeFileUnderConstruction(
String src, INodeFile pendingFile, int latestSnapshot) throws IOException {
assert hasWriteLock();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
index b97b11f..4db8fad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RWTransaction.java
@@ -159,4 +159,8 @@ class RWTransaction extends Transaction {
boolean overwrite, boolean logRetryCache) {
fsd.getEditLog().logOpenFile(ugid, src, inode, overwrite, logRetryCache);
}
+
+ public void logAddBlock(String src, FlatINodeFileFeature file) {
+ fsd.getEditLog().logAddBlock(src, file);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a36f5bc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Resolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Resolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Resolver.java
index 5420084..06f9e90 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Resolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Resolver.java
@@ -127,6 +127,10 @@ class Resolver {
return resolve(tx, path);
}
+ public static Result resolveById(Transaction tx, long id) {
+ throw new IllegalArgumentException("Unimplemented");
+ }
+
// public static Result getInodeById(Transaction tx, long id)
// throws IOException {
// byte[] inodeKey = Encoding.inodeIdKey(id);