You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aw...@apache.org on 2015/07/23 17:20:16 UTC
[04/14] hadoop git commit: HDFS-8495. Consolidate append() related
implementation into a single class. Contributed by Rakesh R.
HDFS-8495. Consolidate append() related implementation into a single class. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/31f11713
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/31f11713
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/31f11713
Branch: refs/heads/HADOOP-12111
Commit: 31f117138a00794de4951ee8433e304d72b04094
Parents: 393fe71
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Jul 21 17:25:23 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Jul 21 17:25:23 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/server/namenode/FSDirAppendOp.java | 261 +++++++++++++++++++
.../server/namenode/FSDirStatAndListingOp.java | 2 +-
.../hdfs/server/namenode/FSDirTruncateOp.java | 16 +-
.../hdfs/server/namenode/FSDirWriteFileOp.java | 6 +-
.../hdfs/server/namenode/FSEditLogLoader.java | 4 +-
.../hdfs/server/namenode/FSNamesystem.java | 241 ++---------------
7 files changed, 304 insertions(+), 229 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 8122045..50803de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -737,6 +737,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8721. Add a metric for number of encryption zones.
(Rakesh R via cnauroth)
+ HDFS-8495. Consolidate append() related implementation into a single class.
+ (Rakesh R via wheat9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
new file mode 100644
index 0000000..abb2dc8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Helper class to perform append operation.
+ */
+final class FSDirAppendOp {
+
+ /**
+ * Private constructor for preventing FSDirAppendOp object creation.
+ * Static-only class.
+ */
+ private FSDirAppendOp() {}
+
+ /**
+ * Append to an existing file.
+ * <p>
+ *
+ * The method returns the last block of the file if this is a partial block,
+ * which can still be used for writing more data. The client uses the
+ * returned block locations to form the data pipeline for this block.<br>
+ * The {@link LocatedBlock} will be null if the last block is full.
+ * The client then allocates a new block with the next call using
+ * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#addBlock}.
+ * <p>
+ *
+ * For description of parameters and exceptions thrown see
+ * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#append}
+ *
+ * @param fsn namespace
+ * @param srcArg path name
+ * @param pc permission checker to check fs permission
+ * @param holder client name
+ * @param clientMachine client machine info
+ * @param newBlock if the data is appended to a new block
+ * @param logRetryCache whether to record RPC ids in editlog for retry cache
+ * rebuilding
+ * @return the last block with status
+ */
+ static LastBlockWithStatus appendFile(final FSNamesystem fsn,
+ final String srcArg, final FSPermissionChecker pc, final String holder,
+ final String clientMachine, final boolean newBlock,
+ final boolean logRetryCache) throws IOException {
+ assert fsn.hasWriteLock();
+
+ final byte[][] pathComponents = FSDirectory
+ .getPathComponentsForReservedPath(srcArg);
+ final LocatedBlock lb;
+ final FSDirectory fsd = fsn.getFSDirectory();
+ final String src;
+ fsd.writeLock();
+ try {
+ src = fsd.resolvePath(pc, srcArg, pathComponents);
+ final INodesInPath iip = fsd.getINodesInPath4Write(src);
+ // Verify that the destination does not exist as a directory already
+ final INode inode = iip.getLastINode();
+ final String path = iip.getPath();
+ if (inode != null && inode.isDirectory()) {
+ throw new FileAlreadyExistsException("Cannot append to directory "
+ + path + "; already exists as a directory.");
+ }
+ if (fsd.isPermissionEnabled()) {
+ fsd.checkPathAccess(pc, iip, FsAction.WRITE);
+ }
+
+ if (inode == null) {
+ throw new FileNotFoundException(
+ "Failed to append to non-existent file " + path + " for client "
+ + clientMachine);
+ }
+ final INodeFile file = INodeFile.valueOf(inode, path, true);
+ BlockManager blockManager = fsd.getBlockManager();
+ final BlockStoragePolicy lpPolicy = blockManager
+ .getStoragePolicy("LAZY_PERSIST");
+ if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) {
+ throw new UnsupportedOperationException(
+ "Cannot append to lazy persist file " + path);
+ }
+ // Opening an existing file for append - may need to recover lease.
+ fsn.recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, path, holder,
+ clientMachine, false);
+
+ final BlockInfo lastBlock = file.getLastBlock();
+ // Check that the block has at least minimum replication.
+ if (lastBlock != null && lastBlock.isComplete()
+ && !blockManager.isSufficientlyReplicated(lastBlock)) {
+ throw new IOException("append: lastBlock=" + lastBlock + " of src="
+ + path + " is not sufficiently replicated yet.");
+ }
+ lb = prepareFileForAppend(fsn, iip, holder, clientMachine, newBlock,
+ true, logRetryCache);
+ } catch (IOException ie) {
+ NameNode.stateChangeLog
+ .warn("DIR* NameSystem.append: " + ie.getMessage());
+ throw ie;
+ } finally {
+ fsd.writeUnlock();
+ }
+
+ HdfsFileStatus stat = FSDirStatAndListingOp.getFileInfo(fsd, src, false,
+ FSDirectory.isReservedRawName(srcArg), true);
+ if (lb != null) {
+ NameNode.stateChangeLog.debug(
+ "DIR* NameSystem.appendFile: file {} for {} at {} block {} block"
+ + " size {}", srcArg, holder, clientMachine, lb.getBlock(), lb
+ .getBlock().getNumBytes());
+ }
+ return new LastBlockWithStatus(lb, stat);
+ }
+
+ /**
+ * Convert current node to under construction.
+ * Recreate in-memory lease record.
+ *
+ * @param fsn namespace
+ * @param iip inodes in the path containing the file
+ * @param leaseHolder identifier of the lease holder on this file
+ * @param clientMachine identifier of the client machine
+ * @param newBlock if the data is appended to a new block
+ * @param writeToEditLog whether to persist this change to the edit log
+ * @param logRetryCache whether to record RPC ids in editlog for retry cache
+ * rebuilding
+ * @return the last block locations if the block is partial or null otherwise
+ * @throws IOException
+ */
+ static LocatedBlock prepareFileForAppend(final FSNamesystem fsn,
+ final INodesInPath iip, final String leaseHolder,
+ final String clientMachine, final boolean newBlock,
+ final boolean writeToEditLog, final boolean logRetryCache)
+ throws IOException {
+ assert fsn.hasWriteLock();
+
+ final INodeFile file = iip.getLastINode().asFile();
+ final QuotaCounts delta = verifyQuotaForUCBlock(fsn, file, iip);
+
+ file.recordModification(iip.getLatestSnapshotId());
+ file.toUnderConstruction(leaseHolder, clientMachine);
+
+ fsn.getLeaseManager().addLease(
+ file.getFileUnderConstructionFeature().getClientName(), file.getId());
+
+ LocatedBlock ret = null;
+ if (!newBlock) {
+ FSDirectory fsd = fsn.getFSDirectory();
+ ret = fsd.getBlockManager().convertLastBlockToUnderConstruction(file, 0);
+ if (ret != null && delta != null) {
+ Preconditions.checkState(delta.getStorageSpace() >= 0, "appending to"
+ + " a block with size larger than the preferred block size");
+ fsd.writeLock();
+ try {
+ fsd.updateCountNoQuotaCheck(iip, iip.length() - 1, delta);
+ } finally {
+ fsd.writeUnlock();
+ }
+ }
+ } else {
+ BlockInfo lastBlock = file.getLastBlock();
+ if (lastBlock != null) {
+ ExtendedBlock blk = new ExtendedBlock(fsn.getBlockPoolId(), lastBlock);
+ ret = new LocatedBlock(blk, new DatanodeInfo[0]);
+ }
+ }
+
+ if (writeToEditLog) {
+ final String path = iip.getPath();
+ if (NameNodeLayoutVersion.supports(Feature.APPEND_NEW_BLOCK,
+ fsn.getEffectiveLayoutVersion())) {
+ fsn.getEditLog().logAppendFile(path, file, newBlock, logRetryCache);
+ } else {
+ fsn.getEditLog().logOpenFile(path, file, false, logRetryCache);
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Verify quota when using the preferred block size for UC block. This is
+ * usually used by append and truncate.
+ *
+ * @throws QuotaExceededException when violating the storage quota
+ * @return expected quota usage update. null means no change or no need to
+ * update quota usage later
+ */
+ private static QuotaCounts verifyQuotaForUCBlock(FSNamesystem fsn,
+ INodeFile file, INodesInPath iip) throws QuotaExceededException {
+ FSDirectory fsd = fsn.getFSDirectory();
+ if (!fsn.isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
+ // Do not check quota if editlog is still being processed
+ return null;
+ }
+ if (file.getLastBlock() != null) {
+ final QuotaCounts delta = computeQuotaDeltaForUCBlock(fsn, file);
+ fsd.readLock();
+ try {
+ FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null);
+ return delta;
+ } finally {
+ fsd.readUnlock();
+ }
+ }
+ return null;
+ }
+
+ /** Compute quota change for converting a complete block to a UC block. */
+ private static QuotaCounts computeQuotaDeltaForUCBlock(FSNamesystem fsn,
+ INodeFile file) {
+ final QuotaCounts delta = new QuotaCounts.Builder().build();
+ final BlockInfo lastBlock = file.getLastBlock();
+ if (lastBlock != null) {
+ final long diff = file.getPreferredBlockSize() - lastBlock.getNumBytes();
+ final short repl = file.getPreferredBlockReplication();
+ delta.addStorageSpace(diff * repl);
+ final BlockStoragePolicy policy = fsn.getFSDirectory()
+ .getBlockStoragePolicySuite().getPolicy(file.getStoragePolicyID());
+ List<StorageType> types = policy.chooseStorageTypes(repl);
+ for (StorageType t : types) {
+ if (t.supportTypeQuota()) {
+ delta.addTypeSpace(t, diff);
+ }
+ }
+ }
+ return delta;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index 201dabc..14f4d66 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -508,7 +508,7 @@ class FSDirStatAndListingOp {
final long fileSize = !inSnapshot && isUc ?
fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
- loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks(
+ loc = fsd.getBlockManager().createLocatedBlocks(
fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false,
inSnapshot, feInfo);
if (loc == null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
index 9fc9def..e24bb2f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
@@ -79,11 +80,11 @@ final class FSDirTruncateOp {
try {
src = fsd.resolvePath(pc, srcArg, pathComponents);
iip = fsd.getINodesInPath4Write(src, true);
- if (fsn.isPermissionEnabled()) {
+ if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
}
INodeFile file = INodeFile.valueOf(iip.getLastINode(), src);
- final BlockStoragePolicy lpPolicy = fsn.getBlockManager()
+ final BlockStoragePolicy lpPolicy = fsd.getBlockManager()
.getStoragePolicy("LAZY_PERSIST");
if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) {
@@ -178,7 +179,7 @@ final class FSDirTruncateOp {
"Should be the same block.";
if (oldBlock.getBlockId() != tBlk.getBlockId()
&& !file.isBlockInLatestSnapshot(oldBlock)) {
- fsn.getBlockManager().removeBlockFromMap(oldBlock);
+ fsd.getBlockManager().removeBlockFromMap(oldBlock);
}
}
assert onBlockBoundary == (truncateBlock == null) :
@@ -223,6 +224,7 @@ final class FSDirTruncateOp {
}
BlockInfoUnderConstruction truncatedBlockUC;
+ BlockManager blockManager = fsn.getFSDirectory().getBlockManager();
if (shouldCopyOnTruncate) {
// Add new truncateBlock into blocksMap and
// use oldBlock as a source for copy-on-truncate recovery
@@ -230,9 +232,8 @@ final class FSDirTruncateOp {
file.getPreferredBlockReplication());
truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
truncatedBlockUC.setTruncateBlock(oldBlock);
- file.setLastBlock(truncatedBlockUC,
- fsn.getBlockManager().getStorages(oldBlock));
- fsn.getBlockManager().addBlockCollection(truncatedBlockUC, file);
+ file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock));
+ blockManager.addBlockCollection(truncatedBlockUC, file);
NameNode.stateChangeLog.debug(
"BLOCK* prepareFileForTruncate: Scheduling copy-on-truncate to new"
@@ -241,8 +242,7 @@ final class FSDirTruncateOp {
truncatedBlockUC.getTruncateBlock());
} else {
// Use new generation stamp for in-place truncate recovery
- fsn.getBlockManager().convertLastBlockToUnderConstruction(file,
- lastBlockDelta);
+ blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta);
oldBlock = file.getLastBlock();
assert !oldBlock.isComplete() : "oldBlock should be under construction";
truncatedBlockUC = (BlockInfoUnderConstruction) oldBlock;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 4830d5d..008a945 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -206,8 +206,8 @@ class FSDirWriteFileOp {
DatanodeStorageInfo[] locs, long offset) throws IOException {
LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk),
locs, offset, false);
- fsn.getBlockManager().setBlockToken(lBlk,
- BlockTokenIdentifier.AccessMode.WRITE);
+ fsn.getFSDirectory().getBlockManager()
+ .setBlockToken(lBlk, BlockTokenIdentifier.AccessMode.WRITE);
return lBlk;
}
@@ -426,7 +426,7 @@ class FSDirWriteFileOp {
fsd.setFileEncryptionInfo(src, feInfo);
newNode = fsd.getInode(newNode.getId()).asFile();
}
- setNewINodeStoragePolicy(fsn.getBlockManager(), newNode, iip,
+ setNewINodeStoragePolicy(fsd.getBlockManager(), newNode, iip,
isLazyPersist);
fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 63ef985..357684a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -392,7 +392,7 @@ public class FSEditLogLoader {
FSNamesystem.LOG.debug("Reopening an already-closed file " +
"for append");
}
- LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip,
+ LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip,
addCloseOp.clientName, addCloseOp.clientMachine, false, false,
false);
// add the op into retry cache if necessary
@@ -466,7 +466,7 @@ public class FSEditLogLoader {
INodesInPath iip = fsDir.getINodesInPath4Write(path);
INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
if (!file.isUnderConstruction()) {
- LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip,
+ LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip,
appendOp.clientName, appendOp.clientMachine, appendOp.newBlock,
false, false);
// add the op into retry cache if necessary
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f11713/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index fd37fbe..0b44431 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -142,7 +142,6 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
@@ -185,7 +184,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
@@ -250,7 +248,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
-import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RetriableException;
@@ -2174,175 +2171,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
/**
- * Append to an existing file for append.
- * <p>
- *
- * The method returns the last block of the file if this is a partial block,
- * which can still be used for writing more data. The client uses the returned
- * block locations to form the data pipeline for this block.<br>
- * The method returns null if the last block is full. The client then
- * allocates a new block with the next call using
- * {@link ClientProtocol#addBlock}.
- * <p>
- *
- * For description of parameters and exceptions thrown see
- * {@link ClientProtocol#append(String, String, EnumSetWritable)}
- *
- * @return the last block locations if the block is partial or null otherwise
- */
- private LocatedBlock appendFileInternal(FSPermissionChecker pc,
- INodesInPath iip, String holder, String clientMachine, boolean newBlock,
- boolean logRetryCache) throws IOException {
- assert hasWriteLock();
- // Verify that the destination does not exist as a directory already.
- final INode inode = iip.getLastINode();
- final String src = iip.getPath();
- if (inode != null && inode.isDirectory()) {
- throw new FileAlreadyExistsException("Cannot append to directory " + src
- + "; already exists as a directory.");
- }
- if (isPermissionEnabled) {
- dir.checkPathAccess(pc, iip, FsAction.WRITE);
- }
-
- try {
- if (inode == null) {
- throw new FileNotFoundException("failed to append to non-existent file "
- + src + " for client " + clientMachine);
- }
- INodeFile myFile = INodeFile.valueOf(inode, src, true);
- final BlockStoragePolicy lpPolicy =
- blockManager.getStoragePolicy("LAZY_PERSIST");
- if (lpPolicy != null &&
- lpPolicy.getId() == myFile.getStoragePolicyID()) {
- throw new UnsupportedOperationException(
- "Cannot append to lazy persist file " + src);
- }
- // Opening an existing file for append - may need to recover lease.
- recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, src, holder,
- clientMachine, false);
-
- final BlockInfo lastBlock = myFile.getLastBlock();
- // Check that the block has at least minimum replication.
- if(lastBlock != null && lastBlock.isComplete() &&
- !getBlockManager().isSufficientlyReplicated(lastBlock)) {
- throw new IOException("append: lastBlock=" + lastBlock +
- " of src=" + src + " is not sufficiently replicated yet.");
- }
- return prepareFileForAppend(src, iip, holder, clientMachine, newBlock,
- true, logRetryCache);
- } catch (IOException ie) {
- NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
- throw ie;
- }
- }
-
- /**
- * Convert current node to under construction.
- * Recreate in-memory lease record.
- *
- * @param src path to the file
- * @param leaseHolder identifier of the lease holder on this file
- * @param clientMachine identifier of the client machine
- * @param newBlock if the data is appended to a new block
- * @param writeToEditLog whether to persist this change to the edit log
- * @param logRetryCache whether to record RPC ids in editlog for retry cache
- * rebuilding
- * @return the last block locations if the block is partial or null otherwise
- * @throws UnresolvedLinkException
- * @throws IOException
- */
- LocatedBlock prepareFileForAppend(String src, INodesInPath iip,
- String leaseHolder, String clientMachine, boolean newBlock,
- boolean writeToEditLog, boolean logRetryCache) throws IOException {
- final INodeFile file = iip.getLastINode().asFile();
- final QuotaCounts delta = verifyQuotaForUCBlock(file, iip);
-
- file.recordModification(iip.getLatestSnapshotId());
- file.toUnderConstruction(leaseHolder, clientMachine);
-
- leaseManager.addLease(
- file.getFileUnderConstructionFeature().getClientName(), file.getId());
-
- LocatedBlock ret = null;
- if (!newBlock) {
- ret = blockManager.convertLastBlockToUnderConstruction(file, 0);
- if (ret != null && delta != null) {
- Preconditions.checkState(delta.getStorageSpace() >= 0,
- "appending to a block with size larger than the preferred block size");
- dir.writeLock();
- try {
- dir.updateCountNoQuotaCheck(iip, iip.length() - 1, delta);
- } finally {
- dir.writeUnlock();
- }
- }
- } else {
- BlockInfo lastBlock = file.getLastBlock();
- if (lastBlock != null) {
- ExtendedBlock blk = new ExtendedBlock(this.getBlockPoolId(), lastBlock);
- ret = new LocatedBlock(blk, new DatanodeInfo[0]);
- }
- }
-
- if (writeToEditLog) {
- if (NameNodeLayoutVersion.supports(Feature.APPEND_NEW_BLOCK,
- getEffectiveLayoutVersion())) {
- getEditLog().logAppendFile(src, file, newBlock, logRetryCache);
- } else {
- getEditLog().logOpenFile(src, file, false, logRetryCache);
- }
- }
- return ret;
- }
-
- /**
- * Verify quota when using the preferred block size for UC block. This is
- * usually used by append and truncate
- * @throws QuotaExceededException when violating the storage quota
- * @return expected quota usage update. null means no change or no need to
- * update quota usage later
- */
- private QuotaCounts verifyQuotaForUCBlock(INodeFile file, INodesInPath iip)
- throws QuotaExceededException {
- if (!isImageLoaded() || dir.shouldSkipQuotaChecks()) {
- // Do not check quota if editlog is still being processed
- return null;
- }
- if (file.getLastBlock() != null) {
- final QuotaCounts delta = computeQuotaDeltaForUCBlock(file);
- dir.readLock();
- try {
- FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null);
- return delta;
- } finally {
- dir.readUnlock();
- }
- }
- return null;
- }
-
- /** Compute quota change for converting a complete block to a UC block */
- private QuotaCounts computeQuotaDeltaForUCBlock(INodeFile file) {
- final QuotaCounts delta = new QuotaCounts.Builder().build();
- final BlockInfo lastBlock = file.getLastBlock();
- if (lastBlock != null) {
- final long diff = file.getPreferredBlockSize() - lastBlock.getNumBytes();
- final short repl = file.getPreferredBlockReplication();
- delta.addStorageSpace(diff * repl);
- final BlockStoragePolicy policy = dir.getBlockStoragePolicySuite()
- .getPolicy(file.getStoragePolicyID());
- List<StorageType> types = policy.chooseStorageTypes(repl);
- for (StorageType t : types) {
- if (t.supportTypeQuota()) {
- delta.addTypeSpace(t, diff);
- }
- }
- }
- return delta;
- }
-
- /**
* Recover lease;
* Immediately revoke the lease of the current lease holder and start lease
* recovery so that the file can be forced to be closed.
@@ -2487,62 +2315,45 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/**
* Append to an existing file in the namespace.
*/
- LastBlockWithStatus appendFile(String src, String holder,
+ LastBlockWithStatus appendFile(String srcArg, String holder,
String clientMachine, EnumSet<CreateFlag> flag, boolean logRetryCache)
throws IOException {
boolean newBlock = flag.contains(CreateFlag.NEW_BLOCK);
if (newBlock) {
requireEffectiveLayoutVersionForFeature(Feature.APPEND_NEW_BLOCK);
}
- try {
- return appendFileInt(src, holder, clientMachine, newBlock, logRetryCache);
- } catch (AccessControlException e) {
- logAuditEvent(false, "append", src);
- throw e;
- }
- }
- private LastBlockWithStatus appendFileInt(final String srcArg, String holder,
- String clientMachine, boolean newBlock, boolean logRetryCache)
- throws IOException {
- String src = srcArg;
NameNode.stateChangeLog.debug(
"DIR* NameSystem.appendFile: src={}, holder={}, clientMachine={}",
- src, holder, clientMachine);
- boolean skipSync = false;
- LocatedBlock lb = null;
- HdfsFileStatus stat = null;
- FSPermissionChecker pc = getPermissionChecker();
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
- writeLock();
+ srcArg, holder, clientMachine);
try {
+ boolean skipSync = false;
+ LastBlockWithStatus lbs = null;
+ final FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
- checkNameNodeSafeMode("Cannot append to file" + src);
- src = dir.resolvePath(pc, src, pathComponents);
- final INodesInPath iip = dir.getINodesInPath4Write(src);
- lb = appendFileInternal(pc, iip, holder, clientMachine, newBlock,
- logRetryCache);
- stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
- FSDirectory.isReservedRawName(srcArg), true);
- } catch (StandbyException se) {
- skipSync = true;
- throw se;
- } finally {
- writeUnlock();
- // There might be transactions logged while trying to recover the lease.
- // They need to be sync'ed even when an exception was thrown.
- if (!skipSync) {
- getEditLog().logSync();
+ writeLock();
+ try {
+ checkOperation(OperationCategory.WRITE);
+ checkNameNodeSafeMode("Cannot append to file" + srcArg);
+ lbs = FSDirAppendOp.appendFile(this, srcArg, pc, holder, clientMachine,
+ newBlock, logRetryCache);
+ } catch (StandbyException se) {
+ skipSync = true;
+ throw se;
+ } finally {
+ writeUnlock();
+ // There might be transactions logged while trying to recover the lease
+ // They need to be sync'ed even when an exception was thrown.
+ if (!skipSync) {
+ getEditLog().logSync();
+ }
}
+ logAuditEvent(true, "append", srcArg);
+ return lbs;
+ } catch (AccessControlException e) {
+ logAuditEvent(false, "append", srcArg);
+ throw e;
}
- if (lb != null) {
- NameNode.stateChangeLog.debug(
- "DIR* NameSystem.appendFile: file {} for {} at {} block {} block" +
- " size {}", src, holder, clientMachine, lb.getBlock(),
- lb.getBlock().getNumBytes());
- }
- logAuditEvent(true, "append", srcArg);
- return new LastBlockWithStatus(lb, stat);
}
ExtendedBlock getExtendedBlock(Block blk) {