You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2014/11/25 00:44:20 UTC
hadoop git commit: HDFS-7436. Consolidate implementation of concat().
Contributed by Haohui Mai.
Repository: hadoop
Updated Branches:
refs/heads/branch-2 57d62d4de -> 9d462fb60
HDFS-7436. Consolidate implementation of concat(). Contributed by Haohui Mai.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9d462fb6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9d462fb6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9d462fb6
Branch: refs/heads/branch-2
Commit: 9d462fb60c6cb46797e5a82bd75a27908d713aad
Parents: 57d62d4
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Nov 24 15:42:38 2014 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Mon Nov 24 15:42:54 2014 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../hdfs/server/namenode/FSDirConcatOp.java | 233 +++++++++++++++++++
.../hdfs/server/namenode/FSDirectory.java | 99 ++------
.../hdfs/server/namenode/FSEditLogLoader.java | 2 +-
.../hdfs/server/namenode/FSNamesystem.java | 174 +-------------
5 files changed, 268 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d462fb6/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index af7abe6..7e84948 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -136,6 +136,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7419. Improve error messages for DataNode hot swap drive feature (Lei
Xu via Colin P. Mccabe)
+ HDFS-7436. Consolidate implementation of concat(). (wheat9)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d462fb6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
new file mode 100644
index 0000000..12feb33
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.hadoop.util.Time.now;
+
+class FSDirConcatOp {
+ static HdfsFileStatus concat(
+ FSDirectory fsd, String target, String[] srcs,
+ boolean logRetryCache) throws IOException {
+ Preconditions.checkArgument(!target.isEmpty(), "Target file name is empty");
+ Preconditions.checkArgument(srcs != null && srcs.length > 0,
+ "No sources given");
+ assert srcs != null;
+
+ FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target);
+ // We require all files be in the same directory
+ String trgParent =
+ target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
+ for (String s : srcs) {
+ String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
+ if (!srcParent.equals(trgParent)) {
+ throw new IllegalArgumentException(
+ "Sources and target are not in the same directory");
+ }
+ }
+
+ // write permission for the target
+ if (fsd.isPermissionEnabled()) {
+ FSPermissionChecker pc = fsd.getPermissionChecker();
+ fsd.checkPathAccess(pc, target, FsAction.WRITE);
+
+ // and srcs
+ for(String aSrc: srcs) {
+ fsd.checkPathAccess(pc, aSrc, FsAction.READ); // read the file
+ fsd.checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete
+ }
+ }
+
+ // to make sure no two files are the same
+ Set<INode> si = new HashSet<INode>();
+
+ // we put the following prerequisite for the operation
+ // replication and blocks sizes should be the same for ALL the blocks
+
+ // check the target
+ final INodesInPath trgIip = fsd.getINodesInPath4Write(target);
+ if (fsd.getEZForPath(trgIip) != null) {
+ throw new HadoopIllegalArgumentException(
+ "concat can not be called for files in an encryption zone.");
+ }
+ final INodeFile trgInode = INodeFile.valueOf(trgIip.getLastINode(), target);
+ if(trgInode.isUnderConstruction()) {
+ throw new HadoopIllegalArgumentException("concat: target file "
+ + target + " is under construction");
+ }
+ // per design target shouldn't be empty and all the blocks same size
+ if(trgInode.numBlocks() == 0) {
+ throw new HadoopIllegalArgumentException("concat: target file "
+ + target + " is empty");
+ }
+ if (trgInode.isWithSnapshot()) {
+ throw new HadoopIllegalArgumentException("concat: target file "
+ + target + " is in a snapshot");
+ }
+
+ long blockSize = trgInode.getPreferredBlockSize();
+
+ // check the end block to be full
+ final BlockInfo last = trgInode.getLastBlock();
+ if(blockSize != last.getNumBytes()) {
+ throw new HadoopIllegalArgumentException("The last block in " + target
+ + " is not full; last block size = " + last.getNumBytes()
+ + " but file block size = " + blockSize);
+ }
+
+ si.add(trgInode);
+ final short repl = trgInode.getFileReplication();
+
+ // now check the srcs
+ boolean endSrc = false; // final src file doesn't have to have full end block
+ for(int i=0; i< srcs.length; i++) {
+ String src = srcs[i];
+ if(i== srcs.length-1)
+ endSrc=true;
+
+ final INodeFile srcInode = INodeFile.valueOf(fsd.getINode4Write(src), src);
+ if(src.isEmpty()
+ || srcInode.isUnderConstruction()
+ || srcInode.numBlocks() == 0) {
+ throw new HadoopIllegalArgumentException("concat: source file " + src
+ + " is invalid or empty or underConstruction");
+ }
+
+ // check replication and blocks size
+ if(repl != srcInode.getBlockReplication()) {
+ throw new HadoopIllegalArgumentException("concat: the source file "
+ + src + " and the target file " + target
+ + " should have the same replication: source replication is "
+ + srcInode.getBlockReplication()
+ + " but target replication is " + repl);
+ }
+
+ //boolean endBlock=false;
+ // verify that all the blocks are of the same length as target
+ // should be enough to check the end blocks
+ final BlockInfo[] srcBlocks = srcInode.getBlocks();
+ int idx = srcBlocks.length-1;
+ if(endSrc)
+ idx = srcBlocks.length-2; // end block of endSrc is OK not to be full
+ if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) {
+ throw new HadoopIllegalArgumentException("concat: the source file "
+ + src + " and the target file " + target
+ + " should have the same blocks sizes: target block size is "
+ + blockSize + " but the size of source block " + idx + " is "
+ + srcBlocks[idx].getNumBytes());
+ }
+
+ si.add(srcInode);
+ }
+
+ // make sure no two files are the same
+ if(si.size() < srcs.length+1) { // trg + srcs
+ // it means at least two files are the same
+ throw new HadoopIllegalArgumentException(
+ "concat: at least two of the source files are the same");
+ }
+
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
+ Arrays.toString(srcs) + " to " + target);
+ }
+
+ long timestamp = now();
+ fsd.writeLock();
+ try {
+ unprotectedConcat(fsd, target, srcs, timestamp);
+ } finally {
+ fsd.writeUnlock();
+ }
+ fsd.getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
+ return fsd.getAuditFileInfo(target, false);
+ }
+
+ /**
+ * Concat all the blocks from srcs to trg and delete the srcs files
+ * @param fsd FSDirectory
+ * @param target target file to move the blocks to
+ * @param srcs list of file to move the blocks from
+ */
+ static void unprotectedConcat(
+ FSDirectory fsd, String target, String[] srcs, long timestamp)
+ throws IOException {
+ assert fsd.hasWriteLock();
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
+ }
+ // do the move
+
+ final INodesInPath trgIIP = fsd.getINodesInPath4Write(target, true);
+ final INode[] trgINodes = trgIIP.getINodes();
+ final INodeFile trgInode = trgIIP.getLastINode().asFile();
+ INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory();
+ final int trgLatestSnapshot = trgIIP.getLatestSnapshotId();
+
+ final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
+ for(int i = 0; i < srcs.length; i++) {
+ final INodesInPath iip = fsd.getINodesInPath4Write(srcs[i]);
+ final int latest = iip.getLatestSnapshotId();
+ final INode inode = iip.getLastINode();
+
+ // check if the file in the latest snapshot
+ if (inode.isInLatestSnapshot(latest)) {
+ throw new SnapshotException("Concat: the source file " + srcs[i]
+ + " is in snapshot " + latest);
+ }
+
+ // check if the file has other references.
+ if (inode.isReference() && ((INodeReference.WithCount)
+ inode.asReference().getReferredINode()).getReferenceCount() > 1) {
+ throw new SnapshotException("Concat: the source file " + srcs[i]
+ + " is referred by some other reference in some snapshot.");
+ }
+
+ allSrcInodes[i] = inode.asFile();
+ }
+ trgInode.concatBlocks(allSrcInodes);
+
+ // since we are in the same dir - we can use same parent to remove files
+ int count = 0;
+ for(INodeFile nodeToRemove: allSrcInodes) {
+ if(nodeToRemove == null) continue;
+
+ nodeToRemove.setBlocks(null);
+ trgParent.removeChild(nodeToRemove, trgLatestSnapshot);
+ fsd.getINodeMap().remove(nodeToRemove);
+ count++;
+ }
+
+ trgInode.setModificationTime(timestamp, trgLatestSnapshot);
+ trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
+ // update quota on the parent directory ('count' files removed, 0 space)
+ FSDirectory.unprotectedUpdateCount(trgIIP, trgINodes.length - 1, -count, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d462fb6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 5187fcf..46b0460 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -98,6 +98,8 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Both FSDirectory and FSNamesystem manage the state of the namespace.
@@ -108,6 +110,7 @@ import org.apache.hadoop.security.UserGroupInformation;
**/
@InterfaceAudience.Private
public class FSDirectory implements Closeable {
+ static final Logger LOG = LoggerFactory.getLogger(FSDirectory.class);
private static INodeDirectory createRoot(FSNamesystem namesystem) {
final INodeDirectory r = new INodeDirectory(
INodeId.ROOT_INODE_ID,
@@ -157,6 +160,8 @@ public class FSDirectory implements Closeable {
private final String fsOwnerShortUserName;
private final String supergroup;
+ private final FSEditLog editLog;
+
// utility methods to acquire and release read lock and write lock
void readLock() {
this.dirLock.readLock().lock();
@@ -250,7 +255,7 @@ public class FSDirectory implements Closeable {
+ " times");
nameCache = new NameCache<ByteArray>(threshold);
namesystem = ns;
-
+ this.editLog = ns.getEditLog();
ezManager = new EncryptionZoneManager(this, conf);
}
@@ -267,6 +272,14 @@ public class FSDirectory implements Closeable {
return rootDir;
}
+ boolean isPermissionEnabled() {
+ return isPermissionEnabled;
+ }
+
+ FSEditLog getEditLog() {
+ return editLog;
+ }
+
/**
* Shutdown the filestore
*/
@@ -1174,81 +1187,6 @@ public class FSDirectory implements Closeable {
}
/**
- * Concat all the blocks from srcs to trg and delete the srcs files
- */
- void concat(String target, String[] srcs, long timestamp)
- throws UnresolvedLinkException, QuotaExceededException,
- SnapshotAccessControlException, SnapshotException {
- writeLock();
- try {
- // actual move
- unprotectedConcat(target, srcs, timestamp);
- } finally {
- writeUnlock();
- }
- }
-
- /**
- * Concat all the blocks from srcs to trg and delete the srcs files
- * @param target target file to move the blocks to
- * @param srcs list of file to move the blocks from
- */
- void unprotectedConcat(String target, String [] srcs, long timestamp)
- throws UnresolvedLinkException, QuotaExceededException,
- SnapshotAccessControlException, SnapshotException {
- assert hasWriteLock();
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
- }
- // do the move
-
- final INodesInPath trgIIP = getINodesInPath4Write(target, true);
- final INode[] trgINodes = trgIIP.getINodes();
- final INodeFile trgInode = trgIIP.getLastINode().asFile();
- INodeDirectory trgParent = trgINodes[trgINodes.length-2].asDirectory();
- final int trgLatestSnapshot = trgIIP.getLatestSnapshotId();
-
- final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
- for(int i = 0; i < srcs.length; i++) {
- final INodesInPath iip = getINodesInPath4Write(srcs[i]);
- final int latest = iip.getLatestSnapshotId();
- final INode inode = iip.getLastINode();
-
- // check if the file in the latest snapshot
- if (inode.isInLatestSnapshot(latest)) {
- throw new SnapshotException("Concat: the source file " + srcs[i]
- + " is in snapshot " + latest);
- }
-
- // check if the file has other references.
- if (inode.isReference() && ((INodeReference.WithCount)
- inode.asReference().getReferredINode()).getReferenceCount() > 1) {
- throw new SnapshotException("Concat: the source file " + srcs[i]
- + " is referred by some other reference in some snapshot.");
- }
-
- allSrcInodes[i] = inode.asFile();
- }
- trgInode.concatBlocks(allSrcInodes);
-
- // since we are in the same dir - we can use same parent to remove files
- int count = 0;
- for(INodeFile nodeToRemove: allSrcInodes) {
- if(nodeToRemove == null) continue;
-
- nodeToRemove.setBlocks(null);
- trgParent.removeChild(nodeToRemove, trgLatestSnapshot);
- inodeMap.remove(nodeToRemove);
- count++;
- }
-
- trgInode.setModificationTime(timestamp, trgLatestSnapshot);
- trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
- // update quota on the parent directory ('count' files removed, 0 space)
- unprotectedUpdateCount(trgIIP, trgINodes.length-1, -count, 0);
- }
-
- /**
* Delete the target directory and collect the blocks under it
*
* @param src Path of a directory to delete
@@ -1790,8 +1728,7 @@ public class FSDirectory implements Closeable {
* updates quota without verification
* callers responsibility is to make sure quota is not exceeded
*/
- private static void unprotectedUpdateCount(INodesInPath inodesInPath,
- int numOfINodes, long nsDelta, long dsDelta) {
+ static void unprotectedUpdateCount(INodesInPath inodesInPath, int numOfINodes, long nsDelta, long dsDelta) {
final INode[] inodes = inodesInPath.getINodes();
for(int i=0; i < numOfINodes; i++) {
if (inodes[i].isQuotaSet()) { // a directory with quota
@@ -3415,4 +3352,10 @@ public class FSDirectory implements Closeable {
}
}
}
+
+ HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink)
+ throws IOException {
+ return (namesystem.isAuditEnabled() && namesystem.isExternalInvocation())
+ ? getFileInfo(path, resolveSymlink, false, false) : null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d462fb6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 4603513..8df7d48 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -491,7 +491,7 @@ public class FSEditLogLoader {
srcs[i] =
renameReservedPathsOnUpgrade(concatDeleteOp.srcs[i], logVersion);
}
- fsDir.unprotectedConcat(trg, srcs, concatDeleteOp.timestamp);
+ FSDirConcatOp.unprotectedConcat(fsDir, trg, srcs, concatDeleteOp.timestamp);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d462fb6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 8ecb50e..db1a938 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -262,8 +262,6 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.RetryCache;
-import org.apache.hadoop.ipc.RetryCache.CacheEntry;
-import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.annotation.Metric;
@@ -331,8 +329,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private HdfsFileStatus getAuditFileInfo(String path, boolean resolveSymlink)
throws IOException {
- return (isAuditEnabled() && isExternalInvocation())
- ? dir.getFileInfo(path, resolveSymlink, false, false) : null;
+ return dir.getAuditFileInfo(path, resolveSymlink);
}
private void logAuditEvent(boolean succeeded, String cmd, String src)
@@ -1934,175 +1931,26 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws IOException on error
*/
void concat(String target, String [] srcs, boolean logRetryCache)
- throws IOException, UnresolvedLinkException {
- if(FSNamesystem.LOG.isDebugEnabled()) {
- FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
- " to " + target);
- }
-
- try {
- concatInt(target, srcs, logRetryCache);
- } catch (AccessControlException e) {
- logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
- throw e;
- }
- }
-
- private void concatInt(String target, String [] srcs,
- boolean logRetryCache) throws IOException, UnresolvedLinkException {
- // verify args
- if(target.isEmpty()) {
- throw new IllegalArgumentException("Target file name is empty");
- }
- if(srcs == null || srcs.length == 0) {
- throw new IllegalArgumentException("No sources given");
- }
-
- // We require all files be in the same directory
- String trgParent =
- target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
- for (String s : srcs) {
- String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
- if (!srcParent.equals(trgParent)) {
- throw new IllegalArgumentException(
- "Sources and target are not in the same directory");
- }
- }
-
- HdfsFileStatus resultingStat = null;
- FSPermissionChecker pc = getPermissionChecker();
+ throws IOException {
checkOperation(OperationCategory.WRITE);
waitForLoadingFSImage();
+ HdfsFileStatus stat = null;
+ boolean success = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot concat " + target);
- concatInternal(pc, target, srcs, logRetryCache);
- resultingStat = getAuditFileInfo(target, false);
+ stat = FSDirConcatOp.concat(dir, target, srcs, logRetryCache);
+ success = true;
} finally {
writeUnlock();
- }
- getEditLog().logSync();
- logAuditEvent(true, "concat", Arrays.toString(srcs), target, resultingStat);
- }
-
- /** See {@link #concat(String, String[])} */
- private void concatInternal(FSPermissionChecker pc, String target,
- String[] srcs, boolean logRetryCache) throws IOException,
- UnresolvedLinkException {
- assert hasWriteLock();
-
- // write permission for the target
- if (isPermissionEnabled) {
- checkPathAccess(pc, target, FsAction.WRITE);
-
- // and srcs
- for(String aSrc: srcs) {
- checkPathAccess(pc, aSrc, FsAction.READ); // read the file
- checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete
+ if (success) {
+ getEditLog().logSync();
}
+ logAuditEvent(success, "concat", Arrays.toString(srcs), target, stat);
}
-
- // to make sure no two files are the same
- Set<INode> si = new HashSet<INode>();
-
- // we put the following prerequisite for the operation
- // replication and blocks sizes should be the same for ALL the blocks
-
- // check the target
- final INodesInPath trgIip = dir.getINodesInPath4Write(target);
- if (dir.getEZForPath(trgIip) != null) {
- throw new HadoopIllegalArgumentException(
- "concat can not be called for files in an encryption zone.");
- }
- final INodeFile trgInode = INodeFile.valueOf(trgIip.getLastINode(),
- target);
- if(trgInode.isUnderConstruction()) {
- throw new HadoopIllegalArgumentException("concat: target file "
- + target + " is under construction");
- }
- // per design target shouldn't be empty and all the blocks same size
- if(trgInode.numBlocks() == 0) {
- throw new HadoopIllegalArgumentException("concat: target file "
- + target + " is empty");
- }
- if (trgInode.isWithSnapshot()) {
- throw new HadoopIllegalArgumentException("concat: target file "
- + target + " is in a snapshot");
- }
-
- long blockSize = trgInode.getPreferredBlockSize();
-
- // check the end block to be full
- final BlockInfo last = trgInode.getLastBlock();
- if(blockSize != last.getNumBytes()) {
- throw new HadoopIllegalArgumentException("The last block in " + target
- + " is not full; last block size = " + last.getNumBytes()
- + " but file block size = " + blockSize);
- }
-
- si.add(trgInode);
- final short repl = trgInode.getFileReplication();
-
- // now check the srcs
- boolean endSrc = false; // final src file doesn't have to have full end block
- for(int i=0; i<srcs.length; i++) {
- String src = srcs[i];
- if(i==srcs.length-1)
- endSrc=true;
-
- final INodeFile srcInode = INodeFile.valueOf(dir.getINode4Write(src), src);
- if(src.isEmpty()
- || srcInode.isUnderConstruction()
- || srcInode.numBlocks() == 0) {
- throw new HadoopIllegalArgumentException("concat: source file " + src
- + " is invalid or empty or underConstruction");
- }
-
- // check replication and blocks size
- if(repl != srcInode.getBlockReplication()) {
- throw new HadoopIllegalArgumentException("concat: the source file "
- + src + " and the target file " + target
- + " should have the same replication: source replication is "
- + srcInode.getBlockReplication()
- + " but target replication is " + repl);
- }
-
- //boolean endBlock=false;
- // verify that all the blocks are of the same length as target
- // should be enough to check the end blocks
- final BlockInfo[] srcBlocks = srcInode.getBlocks();
- int idx = srcBlocks.length-1;
- if(endSrc)
- idx = srcBlocks.length-2; // end block of endSrc is OK not to be full
- if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) {
- throw new HadoopIllegalArgumentException("concat: the source file "
- + src + " and the target file " + target
- + " should have the same blocks sizes: target block size is "
- + blockSize + " but the size of source block " + idx + " is "
- + srcBlocks[idx].getNumBytes());
- }
-
- si.add(srcInode);
- }
-
- // make sure no two files are the same
- if(si.size() < srcs.length+1) { // trg + srcs
- // it means at least two files are the same
- throw new HadoopIllegalArgumentException(
- "concat: at least two of the source files are the same");
- }
-
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
- Arrays.toString(srcs) + " to " + target);
- }
-
- long timestamp = now();
- dir.concat(target, srcs, timestamp);
- getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
}
-
+
/**
* stores the modification and access time for this inode.
* The access time is precise up to an hour. The transaction, if needed, is
@@ -7240,7 +7088,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* Client invoked methods are invoked over RPC and will be in
* RPC call context even if the client exits.
*/
- private boolean isExternalInvocation() {
+ boolean isExternalInvocation() {
return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation();
}