You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/21 01:48:34 UTC
[10/50] [abbrv] hadoop git commit: HDFS-8394. Move
getAdditionalBlock() and related functionalities into a separate class.
Contributed by Haohui Mai.
HDFS-8394. Move getAdditionalBlock() and related functionalities into a separate class. Contributed by Haohui Mai.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e5afac58
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e5afac58
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e5afac58
Branch: refs/heads/HDFS-7240
Commit: e5afac5896a1a88e152746598527d91f73cbb724
Parents: 8f37873
Author: Haohui Mai <wh...@apache.org>
Authored: Fri May 15 19:09:59 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri May 15 19:09:59 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/server/namenode/FSDirWriteFileOp.java | 563 +++++++++++++++++++
.../hdfs/server/namenode/FSDirectory.java | 78 +--
.../hdfs/server/namenode/FSEditLogLoader.java | 3 +-
.../hdfs/server/namenode/FSNamesystem.java | 497 ++--------------
.../hdfs/server/namenode/NameNodeRpcServer.java | 30 +-
.../hdfs/server/namenode/TestAddBlockRetry.java | 30 +-
.../TestCommitBlockSynchronization.java | 3 +
8 files changed, 648 insertions(+), 559 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 35e81f9..4a33987 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -557,6 +557,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8397. Refactor the error handling code in DataStreamer.
(Tsz Wo Nicholas Sze via jing9)
+ HDFS-8394. Move getAdditionalBlock() and related functionalities into a
+ separate class. (wheat9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
new file mode 100644
index 0000000..1ff0899
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+class FSDirWriteFileOp {
+ private FSDirWriteFileOp() {}
+ static boolean unprotectedRemoveBlock(
+ FSDirectory fsd, String path, INodesInPath iip, INodeFile fileNode,
+ Block block) throws IOException {
+ // modify file-> block and blocksMap
+ // fileNode should be under construction
+ BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block);
+ if (uc == null) {
+ return false;
+ }
+ fsd.getBlockManager().removeBlockFromMap(block);
+
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
+ +path+" with "+block
+ +" block is removed from the file system");
+ }
+
+ // update space consumed
+ fsd.updateCount(iip, 0, -fileNode.getPreferredBlockSize(),
+ fileNode.getPreferredBlockReplication(), true);
+ return true;
+ }
+
+ /**
+ * Persist the block list for the inode.
+ */
+ static void persistBlocks(
+ FSDirectory fsd, String path, INodeFile file, boolean logRetryCache) {
+ assert fsd.getFSNamesystem().hasWriteLock();
+ Preconditions.checkArgument(file.isUnderConstruction());
+ fsd.getEditLog().logUpdateBlocks(path, file, logRetryCache);
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("persistBlocks: " + path
+ + " with " + file.getBlocks().length + " blocks is persisted to" +
+ " the file system");
+ }
+ }
+
+ static void abandonBlock(
+ FSDirectory fsd, FSPermissionChecker pc, ExtendedBlock b, long fileId,
+ String src, String holder) throws IOException {
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ src = fsd.resolvePath(pc, src, pathComponents);
+
+ final INode inode;
+ final INodesInPath iip;
+ if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
+ // Older clients may not have given us an inode ID to work with.
+ // In this case, we have to try to resolve the path and hope it
+ // hasn't changed or been deleted since the file was opened for write.
+ iip = fsd.getINodesInPath(src, true);
+ inode = iip.getLastINode();
+ } else {
+ inode = fsd.getInode(fileId);
+ iip = INodesInPath.fromINode(inode);
+ if (inode != null) {
+ src = iip.getPath();
+ }
+ }
+ FSNamesystem fsn = fsd.getFSNamesystem();
+ final INodeFile file = fsn.checkLease(src, holder, inode, fileId);
+ Preconditions.checkState(file.isUnderConstruction());
+
+ Block localBlock = ExtendedBlock.getLocalBlock(b);
+ fsd.writeLock();
+ try {
+ // Remove the block from the pending creates list
+ if (!unprotectedRemoveBlock(fsd, src, iip, file, localBlock)) {
+ return;
+ }
+ } finally {
+ fsd.writeUnlock();
+ }
+ persistBlocks(fsd, src, file, false);
+ }
+
+ static void checkBlock(FSNamesystem fsn, ExtendedBlock block)
+ throws IOException {
+ String bpId = fsn.getBlockPoolId();
+ if (block != null && !bpId.equals(block.getBlockPoolId())) {
+ throw new IOException("Unexpected BlockPoolId " + block.getBlockPoolId()
+ + " - expected " + bpId);
+ }
+ }
+
+ /**
+ * Part I of getAdditionalBlock().
+ * Analyze the state of the file under read lock to determine if the client
+ * can add a new block, detect potential retries, lease mismatches,
+ * and minimal replication of the penultimate block.
+ *
+ * Generate target DataNode locations for the new block,
+ * but do not create the new block yet.
+ */
+ static ValidateAddBlockResult validateAddBlock(
+ FSNamesystem fsn, FSPermissionChecker pc,
+ String src, long fileId, String clientName,
+ ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException {
+ final long blockSize;
+ final int replication;
+ final byte storagePolicyID;
+ String clientMachine;
+
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ src = fsn.dir.resolvePath(pc, src, pathComponents);
+ FileState fileState = analyzeFileState(fsn, src, fileId, clientName,
+ previous, onRetryBlock);
+ final INodeFile pendingFile = fileState.inode;
+ // Check if the penultimate block is minimally replicated
+ if (!fsn.checkFileProgress(src, pendingFile, false)) {
+ throw new NotReplicatedYetException("Not replicated yet: " + src);
+ }
+
+ if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
+ // This is a retry. No need to generate new locations.
+ // Use the last block if it has locations.
+ return null;
+ }
+ if (pendingFile.getBlocks().length >= fsn.maxBlocksPerFile) {
+ throw new IOException("File has reached the limit on maximum number of"
+ + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
+ + "): " + pendingFile.getBlocks().length + " >= "
+ + fsn.maxBlocksPerFile);
+ }
+ blockSize = pendingFile.getPreferredBlockSize();
+ clientMachine = pendingFile.getFileUnderConstructionFeature()
+ .getClientMachine();
+ replication = pendingFile.getFileReplication();
+ storagePolicyID = pendingFile.getStoragePolicyID();
+ return new ValidateAddBlockResult(blockSize, replication, storagePolicyID,
+ clientMachine);
+ }
+
+ static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk,
+ DatanodeStorageInfo[] locs, long offset) throws IOException {
+ LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk),
+ locs, offset, false);
+ fsn.getBlockManager().setBlockToken(lBlk,
+ BlockTokenIdentifier.AccessMode.WRITE);
+ return lBlk;
+ }
+
+ /**
+ * Part II of getAdditionalBlock().
+ * Should repeat the same analysis of the file state as in Part 1,
+ * but under the write lock.
+ * If the conditions still hold, then allocate a new block with
+ * the new targets, add it to the INode and to the BlocksMap.
+ */
+ static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src,
+ long fileId, String clientName, ExtendedBlock previous,
+ DatanodeStorageInfo[] targets) throws IOException {
+ long offset;
+ // Run the full analysis again, since things could have changed
+ // while chooseTarget() was executing.
+ LocatedBlock[] onRetryBlock = new LocatedBlock[1];
+ FileState fileState = analyzeFileState(fsn, src, fileId, clientName,
+ previous, onRetryBlock);
+ final INodeFile pendingFile = fileState.inode;
+ src = fileState.path;
+
+ if (onRetryBlock[0] != null) {
+ if (onRetryBlock[0].getLocations().length > 0) {
+ // This is a retry. Just return the last block if having locations.
+ return onRetryBlock[0];
+ } else {
+ // add new chosen targets to already allocated block and return
+ BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
+ ((BlockInfoContiguousUnderConstruction) lastBlockInFile)
+ .setExpectedLocations(targets);
+ offset = pendingFile.computeFileSize();
+ return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
+ }
+ }
+
+ // commit the last block and complete it if it has minimum replicas
+ fsn.commitOrCompleteLastBlock(pendingFile, fileState.iip,
+ ExtendedBlock.getLocalBlock(previous));
+
+ // allocate new block, record block locations in INode.
+ Block newBlock = fsn.createNewBlock();
+ INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
+ saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets);
+
+ persistNewBlock(fsn, src, pendingFile);
+ offset = pendingFile.computeFileSize();
+
+ // Return located block
+ return makeLocatedBlock(fsn, newBlock, targets, offset);
+ }
+
+ static DatanodeStorageInfo[] chooseTargetForNewBlock(
+ BlockManager bm, String src, DatanodeInfo[] excludedNodes, String[]
+ favoredNodes, ValidateAddBlockResult r) throws IOException {
+ Node clientNode = bm.getDatanodeManager()
+ .getDatanodeByHost(r.clientMachine);
+ if (clientNode == null) {
+ clientNode = getClientNode(bm, r.clientMachine);
+ }
+
+ Set<Node> excludedNodesSet = null;
+ if (excludedNodes != null) {
+ excludedNodesSet = new HashSet<>(excludedNodes.length);
+ Collections.addAll(excludedNodesSet, excludedNodes);
+ }
+ List<String> favoredNodesList = (favoredNodes == null) ? null
+ : Arrays.asList(favoredNodes);
+
+ // choose targets for the new block to be allocated.
+ return bm.chooseTarget4NewBlock(src, r.replication, clientNode,
+ excludedNodesSet, r.blockSize,
+ favoredNodesList, r.storagePolicyID);
+ }
+
+ /**
+ * Resolve clientmachine address to get a network location path
+ */
+ static Node getClientNode(BlockManager bm, String clientMachine) {
+ List<String> hosts = new ArrayList<>(1);
+ hosts.add(clientMachine);
+ List<String> rName = bm.getDatanodeManager()
+ .resolveNetworkLocation(hosts);
+ Node clientNode = null;
+ if (rName != null) {
+ // Able to resolve clientMachine mapping.
+ // Create a temp node to findout the rack local nodes
+ clientNode = new NodeBase(rName.get(0) + NodeBase.PATH_SEPARATOR_STR
+ + clientMachine);
+ }
+ return clientNode;
+ }
+
+ /**
+ * Add a block to the file. Returns a reference to the added block.
+ */
+ private static BlockInfoContiguous addBlock(
+ FSDirectory fsd, String path, INodesInPath inodesInPath, Block block,
+ DatanodeStorageInfo[] targets) throws IOException {
+ fsd.writeLock();
+ try {
+ final INodeFile fileINode = inodesInPath.getLastINode().asFile();
+ Preconditions.checkState(fileINode.isUnderConstruction());
+
+ // check quota limits and updated space consumed
+ fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
+ fileINode.getPreferredBlockReplication(), true);
+
+ // associate new last block for the file
+ BlockInfoContiguousUnderConstruction blockInfo =
+ new BlockInfoContiguousUnderConstruction(
+ block,
+ fileINode.getFileReplication(),
+ HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
+ targets);
+ fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
+ fileINode.addBlock(blockInfo);
+
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: "
+ + path + " with " + block
+ + " block is added to the in-memory "
+ + "file system");
+ }
+ return blockInfo;
+ } finally {
+ fsd.writeUnlock();
+ }
+ }
+
+ private static FileState analyzeFileState(
+ FSNamesystem fsn, String src, long fileId, String clientName,
+ ExtendedBlock previous, LocatedBlock[] onRetryBlock)
+ throws IOException {
+ assert fsn.hasReadLock();
+
+ checkBlock(fsn, previous);
+ onRetryBlock[0] = null;
+ fsn.checkNameNodeSafeMode("Cannot add block to " + src);
+
+ // have we exceeded the configured limit of fs objects.
+ fsn.checkFsObjectLimit();
+
+ Block previousBlock = ExtendedBlock.getLocalBlock(previous);
+ final INode inode;
+ final INodesInPath iip;
+ if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
+ // Older clients may not have given us an inode ID to work with.
+ // In this case, we have to try to resolve the path and hope it
+ // hasn't changed or been deleted since the file was opened for write.
+ iip = fsn.dir.getINodesInPath4Write(src);
+ inode = iip.getLastINode();
+ } else {
+ // Newer clients pass the inode ID, so we can just get the inode
+ // directly.
+ inode = fsn.dir.getInode(fileId);
+ iip = INodesInPath.fromINode(inode);
+ if (inode != null) {
+ src = iip.getPath();
+ }
+ }
+ final INodeFile file = fsn.checkLease(src, clientName,
+ inode, fileId);
+ BlockInfoContiguous lastBlockInFile = file.getLastBlock();
+ if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
+ // The block that the client claims is the current last block
+ // doesn't match up with what we think is the last block. There are
+ // four possibilities:
+ // 1) This is the first block allocation of an append() pipeline
+ // which started appending exactly at or exceeding the block boundary.
+ // In this case, the client isn't passed the previous block,
+ // so it makes the allocateBlock() call with previous=null.
+ // We can distinguish this since the last block of the file
+ // will be exactly a full block.
+ // 2) This is a retry from a client that missed the response of a
+ // prior getAdditionalBlock() call, perhaps because of a network
+ // timeout, or because of an HA failover. In that case, we know
+ // by the fact that the client is re-issuing the RPC that it
+ // never began to write to the old block. Hence it is safe to
+ // to return the existing block.
+ // 3) This is an entirely bogus request/bug -- we should error out
+ // rather than potentially appending a new block with an empty
+ // one in the middle, etc
+ // 4) This is a retry from a client that timed out while
+ // the prior getAdditionalBlock() is still being processed,
+ // currently working on chooseTarget().
+ // There are no means to distinguish between the first and
+ // the second attempts in Part I, because the first one hasn't
+ // changed the namesystem state yet.
+ // We run this analysis again in Part II where case 4 is impossible.
+
+ BlockInfoContiguous penultimateBlock = file.getPenultimateBlock();
+ if (previous == null &&
+ lastBlockInFile != null &&
+ lastBlockInFile.getNumBytes() >= file.getPreferredBlockSize() &&
+ lastBlockInFile.isComplete()) {
+ // Case 1
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug(
+ "BLOCK* NameSystem.allocateBlock: handling block allocation" +
+ " writing to a file with a complete previous block: src=" +
+ src + " lastBlock=" + lastBlockInFile);
+ }
+ } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
+ if (lastBlockInFile.getNumBytes() != 0) {
+ throw new IOException(
+ "Request looked like a retry to allocate block " +
+ lastBlockInFile + " but it already contains " +
+ lastBlockInFile.getNumBytes() + " bytes");
+ }
+
+ // Case 2
+ // Return the last block.
+ NameNode.stateChangeLog.info("BLOCK* allocateBlock: caught retry for " +
+ "allocation of a new block in " + src + ". Returning previously" +
+ " allocated block " + lastBlockInFile);
+ long offset = file.computeFileSize();
+ BlockInfoContiguousUnderConstruction lastBlockUC =
+ (BlockInfoContiguousUnderConstruction) lastBlockInFile;
+ onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile,
+ lastBlockUC.getExpectedStorageLocations(), offset);
+ return new FileState(file, src, iip);
+ } else {
+ // Case 3
+ throw new IOException("Cannot allocate block in " + src + ": " +
+ "passed 'previous' block " + previous + " does not match actual " +
+ "last block in file " + lastBlockInFile);
+ }
+ }
+ return new FileState(file, src, iip);
+ }
+
+ static boolean completeFile(FSNamesystem fsn, FSPermissionChecker pc,
+ final String srcArg, String holder, ExtendedBlock last, long fileId)
+ throws IOException {
+ String src = srcArg;
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
+ src + " for " + holder);
+ }
+ checkBlock(fsn, last);
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ src = fsn.dir.resolvePath(pc, src, pathComponents);
+ boolean success = completeFileInternal(fsn, src, holder,
+ ExtendedBlock.getLocalBlock(last),
+ fileId);
+ if (success) {
+ NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg
+ + " is closed by " + holder);
+ }
+ return success;
+ }
+
+ private static boolean completeFileInternal(
+ FSNamesystem fsn, String src, String holder, Block last, long fileId)
+ throws IOException {
+ assert fsn.hasWriteLock();
+ final INodeFile pendingFile;
+ final INodesInPath iip;
+ INode inode = null;
+ try {
+ if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
+ // Older clients may not have given us an inode ID to work with.
+ // In this case, we have to try to resolve the path and hope it
+ // hasn't changed or been deleted since the file was opened for write.
+ iip = fsn.dir.getINodesInPath(src, true);
+ inode = iip.getLastINode();
+ } else {
+ inode = fsn.dir.getInode(fileId);
+ iip = INodesInPath.fromINode(inode);
+ if (inode != null) {
+ src = iip.getPath();
+ }
+ }
+ pendingFile = fsn.checkLease(src, holder, inode, fileId);
+ } catch (LeaseExpiredException lee) {
+ if (inode != null && inode.isFile() &&
+ !inode.asFile().isUnderConstruction()) {
+ // This could be a retry RPC - i.e the client tried to close
+ // the file, but missed the RPC response. Thus, it is trying
+ // again to close the file. If the file still exists and
+ // the client's view of the last block matches the actual
+ // last block, then we'll treat it as a successful close.
+ // See HDFS-3031.
+ final Block realLastBlock = inode.asFile().getLastBlock();
+ if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
+ NameNode.stateChangeLog.info("DIR* completeFile: " +
+ "request from " + holder + " to complete inode " + fileId +
+ "(" + src + ") which is already closed. But, it appears to be " +
+ "an RPC retry. Returning success");
+ return true;
+ }
+ }
+ throw lee;
+ }
+ // Check the state of the penultimate block. It should be completed
+ // before attempting to complete the last one.
+ if (!fsn.checkFileProgress(src, pendingFile, false)) {
+ return false;
+ }
+
+ // commit the last block and complete it if it has minimum replicas
+ fsn.commitOrCompleteLastBlock(pendingFile, iip, last);
+
+ if (!fsn.checkFileProgress(src, pendingFile, true)) {
+ return false;
+ }
+
+ fsn.finalizeINodeFileUnderConstruction(src, pendingFile,
+ Snapshot.CURRENT_STATE_ID);
+ return true;
+ }
+
+ /**
+ * Persist the new block (the last block of the given file).
+ */
+ private static void persistNewBlock(
+ FSNamesystem fsn, String path, INodeFile file) {
+ Preconditions.checkArgument(file.isUnderConstruction());
+ fsn.getEditLog().logAddBlock(path, file);
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("persistNewBlock: "
+ + path + " with new block " + file.getLastBlock().toString()
+ + ", current total block count is " + file.getBlocks().length);
+ }
+ }
+
+ /**
+ * Save allocated block at the given pending filename
+ *
+ * @param fsn FSNamesystem
+ * @param src path to the file
+ * @param inodesInPath representing each of the components of src.
+ * The last INode is the INode for {@code src} file.
+ * @param newBlock newly allocated block to be save
+ * @param targets target datanodes where replicas of the new block is placed
+ * @throws QuotaExceededException If addition of block exceeds space quota
+ */
+ private static void saveAllocatedBlock(
+ FSNamesystem fsn, String src, INodesInPath inodesInPath, Block newBlock,
+ DatanodeStorageInfo[] targets)
+ throws IOException {
+ assert fsn.hasWriteLock();
+ BlockInfoContiguous b = addBlock(fsn.dir, src, inodesInPath, newBlock,
+ targets);
+ NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src);
+ DatanodeStorageInfo.incrementBlocksScheduled(targets);
+ }
+
+ private static class FileState {
+ final INodeFile inode;
+ final String path;
+ final INodesInPath iip;
+
+ FileState(INodeFile inode, String fullPath, INodesInPath iip) {
+ this.inode = inode;
+ this.path = fullPath;
+ this.iip = iip;
+ }
+ }
+
+ static class ValidateAddBlockResult {
+ final long blockSize;
+ final int replication;
+ final byte storagePolicyID;
+ final String clientMachine;
+
+ ValidateAddBlockResult(
+ long blockSize, int replication, byte storagePolicyID,
+ String clientMachine) {
+ this.blockSize = blockSize;
+ this.replication = replication;
+ this.storagePolicyID = storagePolicyID;
+ this.clientMachine = clientMachine;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 1583815..c2ed956 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -55,12 +55,9 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.util.ByteArray;
import org.apache.hadoop.hdfs.util.EnumCounters;
@@ -308,7 +305,7 @@ public class FSDirectory implements Closeable {
return namesystem;
}
- private BlockManager getBlockManager() {
+ BlockManager getBlockManager() {
return getFSNamesystem().getBlockManager();
}
@@ -479,79 +476,6 @@ public class FSDirectory implements Closeable {
}
/**
- * Add a block to the file. Returns a reference to the added block.
- */
- BlockInfoContiguous addBlock(String path, INodesInPath inodesInPath,
- Block block, DatanodeStorageInfo[] targets) throws IOException {
- writeLock();
- try {
- final INodeFile fileINode = inodesInPath.getLastINode().asFile();
- Preconditions.checkState(fileINode.isUnderConstruction());
-
- // check quota limits and updated space consumed
- updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
- fileINode.getPreferredBlockReplication(), true);
-
- // associate new last block for the file
- BlockInfoContiguousUnderConstruction blockInfo =
- new BlockInfoContiguousUnderConstruction(
- block,
- fileINode.getFileReplication(),
- BlockUCState.UNDER_CONSTRUCTION,
- targets);
- getBlockManager().addBlockCollection(blockInfo, fileINode);
- fileINode.addBlock(blockInfo);
-
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* FSDirectory.addBlock: "
- + path + " with " + block
- + " block is added to the in-memory "
- + "file system");
- }
- return blockInfo;
- } finally {
- writeUnlock();
- }
- }
-
- /**
- * Remove a block from the file.
- * @return Whether the block exists in the corresponding file
- */
- boolean removeBlock(String path, INodesInPath iip, INodeFile fileNode,
- Block block) throws IOException {
- Preconditions.checkArgument(fileNode.isUnderConstruction());
- writeLock();
- try {
- return unprotectedRemoveBlock(path, iip, fileNode, block);
- } finally {
- writeUnlock();
- }
- }
-
- boolean unprotectedRemoveBlock(String path, INodesInPath iip,
- INodeFile fileNode, Block block) throws IOException {
- // modify file-> block and blocksMap
- // fileNode should be under construction
- BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block);
- if (uc == null) {
- return false;
- }
- getBlockManager().removeBlockFromMap(block);
-
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
- +path+" with "+block
- +" block is removed from the file system");
- }
-
- // update space consumed
- updateCount(iip, 0, -fileNode.getPreferredBlockSize(),
- fileNode.getPreferredBlockReplication(), true);
- return true;
- }
-
- /**
* This is a wrapper for resolvePath(). If the path passed
* is prefixed with /.reserved/raw, then it checks to ensure that the caller
* has super user privileges.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index f75c117..dec1298 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -1037,7 +1037,8 @@ public class FSEditLogLoader {
+ path);
}
Block oldBlock = oldBlocks[oldBlocks.length - 1];
- boolean removed = fsDir.unprotectedRemoveBlock(path, iip, file, oldBlock);
+ boolean removed = FSDirWriteFileOp.unprotectedRemoveBlock(
+ fsDir, path, iip, file, oldBlock);
if (!removed && !(op instanceof UpdateBlocksOp)) {
throw new IOException("Trying to delete non-existant block " + oldBlock);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 4d82fab..0fec5ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -268,7 +268,6 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -484,7 +483,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private final long maxFsObjects; // maximum number of fs objects
private final long minBlockSize; // minimum block size
- private final long maxBlocksPerFile; // maximum # of blocks per file
+ final long maxBlocksPerFile; // maximum # of blocks per file
// precision of access times.
private final long accessTimePrecision;
@@ -614,7 +613,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
boolean isHaEnabled() {
return haEnabled;
}
-
+
/**
* Check the supplied configuration for correctness.
* @param conf Supplies the configuration to validate.
@@ -1863,8 +1862,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
: dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
final LocatedBlocks blocks = blockManager.createLocatedBlocks(
- inode.getBlocks(iip.getPathSnapshotId()), fileSize,
- isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo);
+ inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset,
+ length, needBlockToken, iip.isSnapshot(), feInfo);
// Set caching information for the located blocks.
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
@@ -2232,8 +2231,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set storage policy for " + src);
- auditStat = FSDirAttrOp.setStoragePolicy(
- dir, blockManager, src, policyName);
+ auditStat = FSDirAttrOp.setStoragePolicy(dir, blockManager, src,
+ policyName);
} catch (AccessControlException e) {
logAuditEvent(false, "setStoragePolicy", src);
throw e;
@@ -2621,7 +2620,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return toRemoveBlocks;
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " + src + " " +
- ie.getMessage());
+ ie.getMessage());
throw ie;
}
}
@@ -2703,8 +2702,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
"Cannot append to lazy persist file " + src);
}
// Opening an existing file for append - may need to recover lease.
- recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE,
- iip, src, holder, clientMachine, false);
+ recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, src, holder,
+ clientMachine, false);
final BlockInfoContiguous lastBlock = myFile.getLastBlock();
// Check that the block has at least minimum replication.
@@ -3042,290 +3041,49 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* are replicated. Will return an empty 2-elt array if we want the
* client to "try again later".
*/
- LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
- ExtendedBlock previous, Set<Node> excludedNodes,
- List<String> favoredNodes) throws IOException {
- LocatedBlock[] onRetryBlock = new LocatedBlock[1];
- DatanodeStorageInfo targets[] = getNewBlockTargets(src, fileId,
- clientName, previous, excludedNodes, favoredNodes, onRetryBlock);
- if (targets == null) {
- assert onRetryBlock[0] != null : "Retry block is null";
- // This is a retry. Just return the last block.
- return onRetryBlock[0];
- }
- LocatedBlock newBlock = storeAllocatedBlock(
- src, fileId, clientName, previous, targets);
- return newBlock;
- }
-
- /**
- * Part I of getAdditionalBlock().
- * Analyze the state of the file under read lock to determine if the client
- * can add a new block, detect potential retries, lease mismatches,
- * and minimal replication of the penultimate block.
- *
- * Generate target DataNode locations for the new block,
- * but do not create the new block yet.
- */
- DatanodeStorageInfo[] getNewBlockTargets(String src, long fileId,
- String clientName, ExtendedBlock previous, Set<Node> excludedNodes,
- List<String> favoredNodes, LocatedBlock[] onRetryBlock) throws IOException {
- final long blockSize;
- final int replication;
- final byte storagePolicyID;
- Node clientNode = null;
- String clientMachine = null;
-
+ LocatedBlock getAdditionalBlock(
+ String src, long fileId, String clientName, ExtendedBlock previous,
+ DatanodeInfo[] excludedNodes, String[] favoredNodes) throws IOException {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: "
+ src + " inodeId " + fileId + " for " + clientName);
}
- checkOperation(OperationCategory.READ);
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ waitForLoadingFSImage();
+ LocatedBlock[] onRetryBlock = new LocatedBlock[1];
+ FSDirWriteFileOp.ValidateAddBlockResult r;
FSPermissionChecker pc = getPermissionChecker();
+ checkOperation(OperationCategory.READ);
readLock();
try {
checkOperation(OperationCategory.READ);
- src = dir.resolvePath(pc, src, pathComponents);
- FileState fileState = analyzeFileState(
- src, fileId, clientName, previous, onRetryBlock);
- final INodeFile pendingFile = fileState.inode;
- // Check if the penultimate block is minimally replicated
- if (!checkFileProgress(src, pendingFile, false)) {
- throw new NotReplicatedYetException("Not replicated yet: " + src);
- }
- src = fileState.path;
-
- if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
- // This is a retry. No need to generate new locations.
- // Use the last block if it has locations.
- return null;
- }
- if (pendingFile.getBlocks().length >= maxBlocksPerFile) {
- throw new IOException("File has reached the limit on maximum number of"
- + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
- + "): " + pendingFile.getBlocks().length + " >= "
- + maxBlocksPerFile);
- }
- blockSize = pendingFile.getPreferredBlockSize();
- clientMachine = pendingFile.getFileUnderConstructionFeature()
- .getClientMachine();
- clientNode = blockManager.getDatanodeManager().getDatanodeByHost(
- clientMachine);
- replication = pendingFile.getFileReplication();
- storagePolicyID = pendingFile.getStoragePolicyID();
+ r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName,
+ previous, onRetryBlock);
} finally {
readUnlock();
}
- if (clientNode == null) {
- clientNode = getClientNode(clientMachine);
+ if (r == null) {
+ assert onRetryBlock[0] != null : "Retry block is null";
+ // This is a retry. Just return the last block.
+ return onRetryBlock[0];
}
- // choose targets for the new block to be allocated.
- return getBlockManager().chooseTarget4NewBlock(
- src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
- storagePolicyID);
- }
+ DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
+ blockManager, src, excludedNodes, favoredNodes, r);
- /**
- * Part II of getAdditionalBlock().
- * Should repeat the same analysis of the file state as in Part 1,
- * but under the write lock.
- * If the conditions still hold, then allocate a new block with
- * the new targets, add it to the INode and to the BlocksMap.
- */
- LocatedBlock storeAllocatedBlock(String src, long fileId, String clientName,
- ExtendedBlock previous, DatanodeStorageInfo[] targets) throws IOException {
- Block newBlock = null;
- long offset;
checkOperation(OperationCategory.WRITE);
- waitForLoadingFSImage();
writeLock();
+ LocatedBlock lb;
try {
checkOperation(OperationCategory.WRITE);
- // Run the full analysis again, since things could have changed
- // while chooseTarget() was executing.
- LocatedBlock[] onRetryBlock = new LocatedBlock[1];
- FileState fileState =
- analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
- final INodeFile pendingFile = fileState.inode;
- src = fileState.path;
-
- if (onRetryBlock[0] != null) {
- if (onRetryBlock[0].getLocations().length > 0) {
- // This is a retry. Just return the last block if having locations.
- return onRetryBlock[0];
- } else {
- // add new chosen targets to already allocated block and return
- BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
- ((BlockInfoContiguousUnderConstruction) lastBlockInFile)
- .setExpectedLocations(targets);
- offset = pendingFile.computeFileSize();
- return makeLocatedBlock(lastBlockInFile, targets, offset);
- }
- }
-
- // commit the last block and complete it if it has minimum replicas
- commitOrCompleteLastBlock(pendingFile, fileState.iip,
- ExtendedBlock.getLocalBlock(previous));
-
- // allocate new block, record block locations in INode.
- newBlock = createNewBlock();
- INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
- saveAllocatedBlock(src, inodesInPath, newBlock, targets);
-
- persistNewBlock(src, pendingFile);
- offset = pendingFile.computeFileSize();
+ lb = FSDirWriteFileOp.storeAllocatedBlock(
+ this, src, fileId, clientName, previous, targets);
} finally {
writeUnlock();
}
getEditLog().logSync();
-
- // Return located block
- return makeLocatedBlock(newBlock, targets, offset);
- }
-
- /*
- * Resolve clientmachine address to get a network location path
- */
- private Node getClientNode(String clientMachine) {
- List<String> hosts = new ArrayList<String>(1);
- hosts.add(clientMachine);
- List<String> rName = getBlockManager().getDatanodeManager()
- .resolveNetworkLocation(hosts);
- Node clientNode = null;
- if (rName != null) {
- // Able to resolve clientMachine mapping.
- // Create a temp node to findout the rack local nodes
- clientNode = new NodeBase(rName.get(0) + NodeBase.PATH_SEPARATOR_STR
- + clientMachine);
- }
- return clientNode;
- }
-
- static class FileState {
- public final INodeFile inode;
- public final String path;
- public final INodesInPath iip;
-
- public FileState(INodeFile inode, String fullPath, INodesInPath iip) {
- this.inode = inode;
- this.path = fullPath;
- this.iip = iip;
- }
- }
-
- FileState analyzeFileState(String src,
- long fileId,
- String clientName,
- ExtendedBlock previous,
- LocatedBlock[] onRetryBlock)
- throws IOException {
- assert hasReadLock();
-
- checkBlock(previous);
- onRetryBlock[0] = null;
- checkNameNodeSafeMode("Cannot add block to " + src);
-
- // have we exceeded the configured limit of fs objects.
- checkFsObjectLimit();
-
- Block previousBlock = ExtendedBlock.getLocalBlock(previous);
- final INode inode;
- final INodesInPath iip;
- if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
- // Older clients may not have given us an inode ID to work with.
- // In this case, we have to try to resolve the path and hope it
- // hasn't changed or been deleted since the file was opened for write.
- iip = dir.getINodesInPath4Write(src);
- inode = iip.getLastINode();
- } else {
- // Newer clients pass the inode ID, so we can just get the inode
- // directly.
- inode = dir.getInode(fileId);
- iip = INodesInPath.fromINode(inode);
- if (inode != null) {
- src = iip.getPath();
- }
- }
- final INodeFile pendingFile = checkLease(src, clientName, inode, fileId);
- BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
- if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
- // The block that the client claims is the current last block
- // doesn't match up with what we think is the last block. There are
- // four possibilities:
- // 1) This is the first block allocation of an append() pipeline
- // which started appending exactly at or exceeding the block boundary.
- // In this case, the client isn't passed the previous block,
- // so it makes the allocateBlock() call with previous=null.
- // We can distinguish this since the last block of the file
- // will be exactly a full block.
- // 2) This is a retry from a client that missed the response of a
- // prior getAdditionalBlock() call, perhaps because of a network
- // timeout, or because of an HA failover. In that case, we know
- // by the fact that the client is re-issuing the RPC that it
- // never began to write to the old block. Hence it is safe to
- // to return the existing block.
- // 3) This is an entirely bogus request/bug -- we should error out
- // rather than potentially appending a new block with an empty
- // one in the middle, etc
- // 4) This is a retry from a client that timed out while
- // the prior getAdditionalBlock() is still being processed,
- // currently working on chooseTarget().
- // There are no means to distinguish between the first and
- // the second attempts in Part I, because the first one hasn't
- // changed the namesystem state yet.
- // We run this analysis again in Part II where case 4 is impossible.
-
- BlockInfoContiguous penultimateBlock = pendingFile.getPenultimateBlock();
- if (previous == null &&
- lastBlockInFile != null &&
- lastBlockInFile.getNumBytes() >= pendingFile.getPreferredBlockSize() &&
- lastBlockInFile.isComplete()) {
- // Case 1
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.allocateBlock: handling block allocation" +
- " writing to a file with a complete previous block: src=" +
- src + " lastBlock=" + lastBlockInFile);
- }
- } else if (Block.matchingIdAndGenStamp(penultimateBlock, previousBlock)) {
- if (lastBlockInFile.getNumBytes() != 0) {
- throw new IOException(
- "Request looked like a retry to allocate block " +
- lastBlockInFile + " but it already contains " +
- lastBlockInFile.getNumBytes() + " bytes");
- }
-
- // Case 2
- // Return the last block.
- NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
- "caught retry for allocation of a new block in " +
- src + ". Returning previously allocated block " + lastBlockInFile);
- long offset = pendingFile.computeFileSize();
- onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
- ((BlockInfoContiguousUnderConstruction)lastBlockInFile).getExpectedStorageLocations(),
- offset);
- return new FileState(pendingFile, src, iip);
- } else {
- // Case 3
- throw new IOException("Cannot allocate block in " + src + ": " +
- "passed 'previous' block " + previous + " does not match actual " +
- "last block in file " + lastBlockInFile);
- }
- }
- return new FileState(pendingFile, src, iip);
- }
-
- LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
- long offset) throws IOException {
- LocatedBlock lBlk = BlockManager.newLocatedBlock(
- getExtendedBlock(blk), locs, offset, false);
- getBlockManager().setBlockToken(
- lBlk, BlockTokenIdentifier.AccessMode.WRITE);
- return lBlk;
+ return lb;
}
/** @see ClientProtocol#getAdditionalDatanode */
@@ -3378,7 +3136,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
if (clientnode == null) {
- clientnode = getClientNode(clientMachine);
+ clientnode = FSDirWriteFileOp.getClientNode(blockManager, clientMachine);
}
// choose new datanodes.
@@ -3394,60 +3152,32 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/**
* The client would like to let go of the given block
*/
- boolean abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
+ void abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
throws IOException {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b
+ "of file " + src);
}
+ waitForLoadingFSImage();
checkOperation(OperationCategory.WRITE);
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
FSPermissionChecker pc = getPermissionChecker();
- waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot abandon block " + b + " for file" + src);
- src = dir.resolvePath(pc, src, pathComponents);
-
- final INode inode;
- final INodesInPath iip;
- if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
- // Older clients may not have given us an inode ID to work with.
- // In this case, we have to try to resolve the path and hope it
- // hasn't changed or been deleted since the file was opened for write.
- iip = dir.getINodesInPath(src, true);
- inode = iip.getLastINode();
- } else {
- inode = dir.getInode(fileId);
- iip = INodesInPath.fromINode(inode);
- if (inode != null) {
- src = iip.getPath();
- }
- }
- final INodeFile file = checkLease(src, holder, inode, fileId);
-
- // Remove the block from the pending creates list
- boolean removed = dir.removeBlock(src, iip, file,
- ExtendedBlock.getLocalBlock(b));
- if (!removed) {
- return true;
- }
+ FSDirWriteFileOp.abandonBlock(dir, pc, b, fileId, src, holder);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+ b + " is removed from pendingCreates");
}
- persistBlocks(src, file, false);
} finally {
writeUnlock();
}
getEditLog().logSync();
-
- return true;
}
- private INodeFile checkLease(String src, String holder, INode inode,
- long fileId) throws LeaseExpiredException, FileNotFoundException {
+ INodeFile checkLease(
+ String src, String holder, INode inode, long fileId) throws LeaseExpiredException, FileNotFoundException {
assert hasReadLock();
final String ident = src + " (inode " + fileId + ")";
if (inode == null) {
@@ -3492,120 +3222,30 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* (e.g if not all blocks have reached minimum replication yet)
* @throws IOException on error (eg lease mismatch, file not open, file deleted)
*/
- boolean completeFile(final String srcArg, String holder,
+ boolean completeFile(final String src, String holder,
ExtendedBlock last, long fileId)
- throws SafeModeException, UnresolvedLinkException, IOException {
- String src = srcArg;
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
- src + " for " + holder);
- }
- checkBlock(last);
+ throws IOException {
boolean success = false;
checkOperation(OperationCategory.WRITE);
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
- FSPermissionChecker pc = getPermissionChecker();
waitForLoadingFSImage();
+ FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot complete file " + src);
- src = dir.resolvePath(pc, src, pathComponents);
- success = completeFileInternal(src, holder,
- ExtendedBlock.getLocalBlock(last), fileId);
+ success = FSDirWriteFileOp.completeFile(this, pc, src, holder, last,
+ fileId);
} finally {
writeUnlock();
}
getEditLog().logSync();
- if (success) {
- NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg
- + " is closed by " + holder);
- }
return success;
}
- private boolean completeFileInternal(String src, String holder, Block last,
- long fileId) throws IOException {
- assert hasWriteLock();
- final INodeFile pendingFile;
- final INodesInPath iip;
- INode inode = null;
- try {
- if (fileId == HdfsConstants.GRANDFATHER_INODE_ID) {
- // Older clients may not have given us an inode ID to work with.
- // In this case, we have to try to resolve the path and hope it
- // hasn't changed or been deleted since the file was opened for write.
- iip = dir.getINodesInPath(src, true);
- inode = iip.getLastINode();
- } else {
- inode = dir.getInode(fileId);
- iip = INodesInPath.fromINode(inode);
- if (inode != null) {
- src = iip.getPath();
- }
- }
- pendingFile = checkLease(src, holder, inode, fileId);
- } catch (LeaseExpiredException lee) {
- if (inode != null && inode.isFile() &&
- !inode.asFile().isUnderConstruction()) {
- // This could be a retry RPC - i.e the client tried to close
- // the file, but missed the RPC response. Thus, it is trying
- // again to close the file. If the file still exists and
- // the client's view of the last block matches the actual
- // last block, then we'll treat it as a successful close.
- // See HDFS-3031.
- final Block realLastBlock = inode.asFile().getLastBlock();
- if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
- NameNode.stateChangeLog.info("DIR* completeFile: " +
- "request from " + holder + " to complete inode " + fileId +
- "(" + src + ") which is already closed. But, it appears to be " +
- "an RPC retry. Returning success");
- return true;
- }
- }
- throw lee;
- }
- // Check the state of the penultimate block. It should be completed
- // before attempting to complete the last one.
- if (!checkFileProgress(src, pendingFile, false)) {
- return false;
- }
-
- // commit the last block and complete it if it has minimum replicas
- commitOrCompleteLastBlock(pendingFile, iip, last);
-
- if (!checkFileProgress(src, pendingFile, true)) {
- return false;
- }
-
- finalizeINodeFileUnderConstruction(src, pendingFile,
- Snapshot.CURRENT_STATE_ID);
- return true;
- }
-
- /**
- * Save allocated block at the given pending filename
- *
- * @param src path to the file
- * @param inodesInPath representing each of the components of src.
- * The last INode is the INode for {@code src} file.
- * @param newBlock newly allocated block to be save
- * @param targets target datanodes where replicas of the new block is placed
- * @throws QuotaExceededException If addition of block exceeds space quota
- */
- private void saveAllocatedBlock(String src, INodesInPath inodesInPath,
- Block newBlock, DatanodeStorageInfo[] targets)
- throws IOException {
- assert hasWriteLock();
- BlockInfoContiguous b = dir.addBlock(src, inodesInPath, newBlock, targets);
- NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src);
- DatanodeStorageInfo.incrementBlocksScheduled(targets);
- }
-
/**
* Create new block with a unique block id and a new generation stamp.
*/
- private Block createNewBlock() throws IOException {
+ Block createNewBlock() throws IOException {
assert hasWriteLock();
Block b = new Block(nextBlockId(), 0, 0);
// Increment the generation stamp for every new block.
@@ -3997,7 +3637,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
pendingFile.getFileUnderConstructionFeature().updateLengthOfLastBlock(
pendingFile, lastBlockLength);
}
- persistBlocks(src, pendingFile, false);
+ FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, false);
} finally {
writeUnlock();
}
@@ -4167,8 +3807,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return leaseManager.reassignLease(lease, pendingFile, newHolder);
}
- private void commitOrCompleteLastBlock(final INodeFile fileINode,
- final INodesInPath iip, final Block commitBlock) throws IOException {
+ void commitOrCompleteLastBlock(
+ final INodeFile fileINode, final INodesInPath iip,
+ final Block commitBlock) throws IOException {
assert hasWriteLock();
Preconditions.checkArgument(fileINode.isUnderConstruction());
if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
@@ -4186,14 +3827,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
}
- private void finalizeINodeFileUnderConstruction(String src,
- INodeFile pendingFile, int latestSnapshot) throws IOException {
+ void finalizeINodeFileUnderConstruction(
+ String src, INodeFile pendingFile, int latestSnapshot) throws IOException {
assert hasWriteLock();
FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
Preconditions.checkArgument(uc != null);
leaseManager.removeLease(uc.getClientName(), pendingFile);
-
+
pendingFile.recordModification(latestSnapshot);
// The file is no longer pending.
@@ -4405,7 +4046,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
} else {
// If this commit does not want to close the file, persist blocks
src = iFile.getFullPathName();
- persistBlocks(src, iFile, false);
+ FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
}
} finally {
writeUnlock();
@@ -4596,24 +4237,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
/**
- * Persist the block list for the inode.
- * @param path
- * @param file
- * @param logRetryCache
- */
- private void persistBlocks(String path, INodeFile file,
- boolean logRetryCache) {
- assert hasWriteLock();
- Preconditions.checkArgument(file.isUnderConstruction());
- getEditLog().logUpdateBlocks(path, file, logRetryCache);
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("persistBlocks: " + path
- + " with " + file.getBlocks().length + " blocks is persisted to" +
- " the file system");
- }
- }
-
- /**
* Close file.
* @param path
* @param file
@@ -4800,13 +4423,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
public FSEditLog getEditLog() {
return getFSImage().getEditLog();
- }
-
- private void checkBlock(ExtendedBlock block) throws IOException {
- if (block != null && !this.blockPoolId.equals(block.getBlockPoolId())) {
- throw new IOException("Unexpected BlockPoolId " + block.getBlockPoolId()
- + " - expected " + blockPoolId);
- }
}
@Metric({"MissingBlocks", "Number of missing blocks"})
@@ -5080,21 +4696,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
/**
- * Persist the new block (the last block of the given file).
- * @param path
- * @param file
- */
- private void persistNewBlock(String path, INodeFile file) {
- Preconditions.checkArgument(file.isUnderConstruction());
- getEditLog().logAddBlock(path, file);
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("persistNewBlock: "
- + path + " with new block " + file.getLastBlock().toString()
- + ", current total block count is " + file.getBlocks().length);
- }
- }
-
- /**
* SafeModeInfo contains information related to the safe mode.
* <p>
* An instance of {@link SafeModeInfo} is created when the name node
@@ -6399,7 +6000,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
blockinfo.setExpectedLocations(storages);
String src = pendingFile.getFullPathName();
- persistBlocks(src, pendingFile, logRetryCache);
+ FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, logRetryCache);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 3311609..0d416a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -713,23 +713,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
String[] favoredNodes)
throws IOException {
checkNNStartup();
- if (stateChangeLog.isDebugEnabled()) {
- stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
- + " fileId=" + fileId + " for " + clientName);
- }
- Set<Node> excludedNodesSet = null;
- if (excludedNodes != null) {
- excludedNodesSet = new HashSet<Node>(excludedNodes.length);
- for (Node node : excludedNodes) {
- excludedNodesSet.add(node);
- }
- }
- List<String> favoredNodesList = (favoredNodes == null) ? null
- : Arrays.asList(favoredNodes);
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
- clientName, previous, excludedNodesSet, favoredNodesList);
- if (locatedBlock != null)
+ clientName, previous, excludedNodes, favoredNodes);
+ if (locatedBlock != null) {
metrics.incrAddBlockOps();
+ }
return locatedBlock;
}
@@ -770,13 +758,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void abandonBlock(ExtendedBlock b, long fileId, String src,
String holder) throws IOException {
checkNNStartup();
- if(stateChangeLog.isDebugEnabled()) {
- stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
- +b+" of file "+src);
- }
- if (!namesystem.abandonBlock(b, fileId, src, holder)) {
- throw new IOException("Cannot abandon block during write to " + src);
- }
+ namesystem.abandonBlock(b, fileId, src, holder);
}
@Override // ClientProtocol
@@ -784,10 +766,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
ExtendedBlock last, long fileId)
throws IOException {
checkNNStartup();
- if(stateChangeLog.isDebugEnabled()) {
- stateChangeLog.debug("*DIR* NameNode.complete: "
- + src + " fileId=" + fileId +" for " + clientName);
- }
return namesystem.completeFile(src, clientName, last, fileId);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
index 5a4134c..c92e79b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
/**
* Race between two threads simultaneously calling
@@ -88,25 +89,40 @@ public class TestAddBlockRetry {
// start first addBlock()
LOG.info("Starting first addBlock for " + src);
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
- DatanodeStorageInfo targets[] = ns.getNewBlockTargets(
- src, HdfsConstants.GRANDFATHER_INODE_ID, "clientName",
- null, null, null, onRetryBlock);
+ ns.readLock();
+ FSDirWriteFileOp.ValidateAddBlockResult r;
+ FSPermissionChecker pc = Mockito.mock(FSPermissionChecker.class);
+ try {
+ r = FSDirWriteFileOp.validateAddBlock(ns, pc, src,
+ HdfsConstants.GRANDFATHER_INODE_ID,
+ "clientName", null, onRetryBlock);
+ } finally {
+ ns.readUnlock();;
+ }
+ DatanodeStorageInfo targets[] = FSDirWriteFileOp.chooseTargetForNewBlock(
+ ns.getBlockManager(), src, null, null, r);
assertNotNull("Targets must be generated", targets);
// run second addBlock()
LOG.info("Starting second addBlock for " + src);
nn.addBlock(src, "clientName", null, null,
- HdfsConstants.GRANDFATHER_INODE_ID, null);
+ HdfsConstants.GRANDFATHER_INODE_ID, null);
assertTrue("Penultimate block must be complete",
- checkFileProgress(src, false));
+ checkFileProgress(src, false));
LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
LocatedBlock lb2 = lbs.get(0);
assertEquals("Wrong replication", REPLICATION, lb2.getLocations().length);
// continue first addBlock()
- LocatedBlock newBlock = ns.storeAllocatedBlock(
- src, HdfsConstants.GRANDFATHER_INODE_ID, "clientName", null, targets);
+ ns.writeLock();
+ LocatedBlock newBlock;
+ try {
+ newBlock = FSDirWriteFileOp.storeAllocatedBlock(ns, src,
+ HdfsConstants.GRANDFATHER_INODE_ID, "clientName", null, targets);
+ } finally {
+ ns.writeUnlock();
+ }
assertEquals("Blocks are not equal", lb2.getBlock(), newBlock.getBlock());
// check locations
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5afac58/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
index 3049612..ea560fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderCon
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
import java.io.IOException;
@@ -45,7 +46,9 @@ public class TestCommitBlockSynchronization {
private FSNamesystem makeNameSystemSpy(Block block, INodeFile file)
throws IOException {
Configuration conf = new Configuration();
+ FSEditLog editlog = mock(FSEditLog.class);
FSImage image = new FSImage(conf);
+ Whitebox.setInternalState(image, "editLog", editlog);
final DatanodeStorageInfo[] targets = {};
FSNamesystem namesystem = new FSNamesystem(conf, image);