You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2014/11/24 20:11:39 UTC
hadoop git commit: HDFS-7412. Move RetryCache to NameNodeRpcServer.
Contributed by Haohui Mai.
Repository: hadoop
Updated Branches:
refs/heads/branch-2 4435ac9af -> 9db888b89
HDFS-7412. Move RetryCache to NameNodeRpcServer. Contributed by Haohui Mai.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9db888b8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9db888b8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9db888b8
Branch: refs/heads/branch-2
Commit: 9db888b896430e631b44c6aece6152e47b4d0a1a
Parents: 4435ac9
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Nov 24 11:11:15 2014 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Mon Nov 24 11:11:35 2014 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../hdfs/server/namenode/FSNamesystem.java | 300 +++++------------
.../hdfs/server/namenode/NameNodeRpcServer.java | 322 +++++++++++++++++--
.../TestDefaultBlockPlacementPolicy.java | 22 +-
.../hdfs/server/namenode/TestFsLimits.java | 4 +-
5 files changed, 383 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9db888b8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index fec258d..add37f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -131,6 +131,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7331. Add Datanode network counts to datanode jmx page. (Charles Lamb
via atm)
+ HDFS-7412. Move RetryCache to NameNodeRpcServer. (wheat9)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9db888b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 77304ad..8ecb50e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -515,7 +515,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private volatile boolean startingActiveService = false;
private INodeId inodeId;
-
+
private final RetryCache retryCache;
private final NNConf nnConf;
@@ -1933,28 +1933,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @param srcs file that will be concatenated
* @throws IOException on error
*/
- void concat(String target, String [] srcs)
+ void concat(String target, String [] srcs, boolean logRetryCache)
throws IOException, UnresolvedLinkException {
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return; // Return previous response
- }
-
- // Either there is no previous request in progress or it has failed
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
" to " + target);
}
-
- boolean success = false;
+
try {
- concatInt(target, srcs, cacheEntry != null);
- success = true;
+ concatInt(target, srcs, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
throw e;
- } finally {
- RetryCache.setState(cacheEntry, success);
}
}
@@ -2172,7 +2162,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*/
@SuppressWarnings("deprecation")
void createSymlink(String target, String link,
- PermissionStatus dirPerms, boolean createParent)
+ PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
throws IOException, UnresolvedLinkException {
if (!FileSystem.areSymlinksEnabled()) {
throw new UnsupportedOperationException("Symlinks not supported");
@@ -2183,19 +2173,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (FSDirectory.isReservedName(target)) {
throw new InvalidPathException("Invalid target name: " + target);
}
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return; // Return previous response
- }
+
boolean success = false;
try {
- createSymlinkInt(target, link, dirPerms, createParent, cacheEntry != null);
+ createSymlinkInt(target, link, dirPerms, createParent, logRetryCache);
success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "createSymlink", link, target, null);
throw e;
- } finally {
- RetryCache.setState(cacheEntry, success);
}
}
@@ -2483,26 +2468,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
HdfsFileStatus startFile(String src, PermissionStatus permissions,
String holder, String clientMachine, EnumSet<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
- CryptoProtocolVersion[] supportedVersions)
+ CryptoProtocolVersion[] supportedVersions, boolean logRetryCache)
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
+
HdfsFileStatus status = null;
- CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
- null);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return (HdfsFileStatus) cacheEntry.getPayload();
- }
-
try {
status = startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize, supportedVersions,
- cacheEntry != null);
+ logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "create", src);
throw e;
- } finally {
- RetryCache.setState(cacheEntry, status != null, status);
}
return status;
}
@@ -3029,27 +3007,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/**
* Append to an existing file in the namespace.
*/
- LocatedBlock appendFile(String src, String holder, String clientMachine)
- throws AccessControlException, SafeModeException,
- FileAlreadyExistsException, FileNotFoundException,
- ParentNotDirectoryException, IOException {
- LocatedBlock lb = null;
- CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
- null);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return (LocatedBlock) cacheEntry.getPayload();
- }
-
- boolean success = false;
+ LocatedBlock appendFile(
+ String src, String holder, String clientMachine, boolean logRetryCache)
+ throws IOException {
try {
- lb = appendFileInt(src, holder, clientMachine, cacheEntry != null);
- success = true;
- return lb;
+ return appendFileInt(src, holder, clientMachine, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "append", src);
throw e;
- } finally {
- RetryCache.setState(cacheEntry, success, lb);
}
}
@@ -3717,20 +3682,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @deprecated Use {@link #renameTo(String, String, Options.Rename...)} instead.
*/
@Deprecated
- boolean renameTo(String src, String dst)
+ boolean renameTo(String src, String dst, boolean logRetryCache)
throws IOException, UnresolvedLinkException {
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return true; // Return previous response
- }
boolean ret = false;
try {
- ret = renameToInt(src, dst, cacheEntry != null);
+ ret = renameToInt(src, dst, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "rename", src, dst, null);
throw e;
- } finally {
- RetryCache.setState(cacheEntry, ret);
}
return ret;
}
@@ -3775,7 +3734,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return status;
}
- /** @deprecated See {@link #renameTo(String, String)} */
+ /** @deprecated See {@link #renameTo(String, String, boolean)} */
@Deprecated
private boolean renameToInternal(FSPermissionChecker pc, String src,
String dst, boolean logRetryCache) throws IOException,
@@ -3808,7 +3767,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/** Rename src to dst */
- void renameTo(final String srcArg, final String dstArg,
+ void renameTo(final String srcArg, final String dstArg, boolean logRetryCache,
Options.Rename... options) throws IOException, UnresolvedLinkException {
String src = srcArg;
String dst = dstArg;
@@ -3822,14 +3781,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return; // Return previous response
- }
+
byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
HdfsFileStatus resultingStat = null;
- boolean success = false;
writeLock();
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
try {
@@ -3837,13 +3792,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
checkNameNodeSafeMode("Cannot rename " + src);
src = dir.resolvePath(pc, src, srcComponents);
dst = dir.resolvePath(pc, dst, dstComponents);
- renameToInternal(pc, src, dst, cacheEntry != null,
- collectedBlocks, options);
+ renameToInternal(pc, src, dst, logRetryCache, collectedBlocks, options);
resultingStat = getAuditFileInfo(dst, false);
- success = true;
} finally {
writeUnlock();
- RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
if (!collectedBlocks.getToDeleteList().isEmpty()) {
@@ -3886,21 +3838,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @see ClientProtocol#delete(String, boolean) for detailed description and
* description of exceptions
*/
- boolean delete(String src, boolean recursive)
+ boolean delete(String src, boolean recursive, boolean logRetryCache)
throws AccessControlException, SafeModeException,
UnresolvedLinkException, IOException {
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return true; // Return previous response
- }
+
boolean ret = false;
try {
- ret = deleteInt(src, recursive, cacheEntry != null);
+ ret = deleteInt(src, recursive, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "delete", src);
throw e;
- } finally {
- RetryCache.setState(cacheEntry, ret);
}
return ret;
}
@@ -5478,12 +5425,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
void saveNamespace() throws AccessControlException, IOException {
checkOperation(OperationCategory.UNCHECKED);
checkSuperuserPrivilege();
-
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return; // Return previous response
- }
- boolean success = false;
+
+
readLock();
try {
checkOperation(OperationCategory.UNCHECKED);
@@ -5493,10 +5436,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ "in order to create namespace image.");
}
getFSImage().saveNamespace(this);
- success = true;
} finally {
readUnlock();
- RetryCache.setState(cacheEntry, success);
}
LOG.info("New namespace image has been created");
}
@@ -6327,24 +6268,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
NamenodeCommand startCheckpoint(NamenodeRegistration backupNode,
NamenodeRegistration activeNamenode) throws IOException {
checkOperation(OperationCategory.CHECKPOINT);
- CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
- null);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return (NamenodeCommand) cacheEntry.getPayload();
- }
writeLock();
- NamenodeCommand cmd = null;
try {
checkOperation(OperationCategory.CHECKPOINT);
checkNameNodeSafeMode("Checkpoint not started");
LOG.info("Start checkpoint for " + backupNode.getAddress());
- cmd = getFSImage().startCheckpoint(backupNode, activeNamenode);
+ NamenodeCommand cmd = getFSImage().startCheckpoint(backupNode,
+ activeNamenode);
getEditLog().logSync();
return cmd;
} finally {
writeUnlock();
- RetryCache.setState(cacheEntry, cmd != null, cmd);
}
}
@@ -6362,22 +6297,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
checkOperation(OperationCategory.CHECKPOINT);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return; // Return previous response
- }
- boolean success = false;
readLock();
try {
checkOperation(OperationCategory.CHECKPOINT);
-
checkNameNodeSafeMode("Checkpoint not ended");
LOG.info("End checkpoint for " + registration.getAddress());
getFSImage().endCheckpoint(sig);
- success = true;
} finally {
readUnlock();
- RetryCache.setState(cacheEntry, success);
}
}
@@ -6821,14 +6748,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @param newNodes datanodes in the pipeline
* @throws IOException if any error occurs
*/
- void updatePipeline(String clientName, ExtendedBlock oldBlock,
- ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
+ void updatePipeline(
+ String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock,
+ DatanodeID[] newNodes, String[] newStorageIDs, boolean logRetryCache)
throws IOException {
checkOperation(OperationCategory.WRITE);
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return; // Return previous response
- }
+
LOG.info("updatePipeline(" + oldBlock.getLocalBlock()
+ ", newGS=" + newBlock.getGenerationStamp()
+ ", newLength=" + newBlock.getNumBytes()
@@ -6837,18 +6762,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ ")");
waitForLoadingFSImage();
writeLock();
- boolean success = false;
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Pipeline not updated");
assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+ oldBlock + " has different block identifier";
updatePipelineInternal(clientName, oldBlock, newBlock, newNodes,
- newStorageIDs, cacheEntry != null);
- success = true;
+ newStorageIDs, logRetryCache);
} finally {
writeUnlock();
- RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
LOG.info("updatePipeline(" + oldBlock.getLocalBlock() + " => "
@@ -7844,15 +7766,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @param snapshotRoot The directory path where the snapshot is taken
* @param snapshotName The name of the snapshot
*/
- String createSnapshot(String snapshotRoot, String snapshotName)
+ String createSnapshot(String snapshotRoot, String snapshotName,
+ boolean logRetryCache)
throws SafeModeException, IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
- CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
- null);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return (String) cacheEntry.getPayload();
- }
+
String snapshotPath = null;
writeLock();
try {
@@ -7878,11 +7797,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
} finally {
dir.writeUnlock();
}
- getEditLog().logCreateSnapshot(snapshotRoot, snapshotName,
- cacheEntry != null);
+ getEditLog().logCreateSnapshot(snapshotRoot, snapshotName, logRetryCache);
} finally {
writeUnlock();
- RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath);
}
getEditLog().logSync();
@@ -7900,16 +7817,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws SafeModeException
* @throws IOException
*/
- void renameSnapshot(String path, String snapshotOldName,
- String snapshotNewName) throws SafeModeException, IOException {
+ void renameSnapshot(
+ String path, String snapshotOldName, String snapshotNewName,
+ boolean logRetryCache) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return; // Return previous response
- }
+
writeLock();
- boolean success = false;
+
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot rename snapshot for " + path);
@@ -7920,11 +7835,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName,
- cacheEntry != null);
- success = true;
+ logRetryCache);
} finally {
writeUnlock();
- RetryCache.setState(cacheEntry, success);
+
}
getEditLog().logSync();
@@ -8014,16 +7928,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws SafeModeException
* @throws IOException
*/
- void deleteSnapshot(String snapshotRoot, String snapshotName)
+ void deleteSnapshot(String snapshotRoot, String snapshotName,
+ boolean logRetryCache)
throws SafeModeException, IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
-
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return; // Return previous response
- }
- boolean success = false;
+
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
writeLock();
try {
@@ -8043,12 +7953,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
dir.writeUnlock();
}
removedINodes.clear();
- getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName,
- cacheEntry != null);
- success = true;
+ getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName, logRetryCache);
} finally {
writeUnlock();
- RetryCache.setState(cacheEntry, success);
+
}
getEditLog().logSync();
@@ -8246,20 +8154,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return new RollingUpgradeInfo(blockPoolId, false, startTime, finalizeTime);
}
- long addCacheDirective(CacheDirectiveInfo directive, EnumSet<CacheFlag> flags)
+ long addCacheDirective(CacheDirectiveInfo directive,
+ EnumSet<CacheFlag> flags, boolean logRetryCache)
throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
- CacheEntryWithPayload cacheEntry =
- RetryCache.waitForCompletion(retryCache, null);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return (Long) cacheEntry.getPayload();
- }
- boolean success = false;
+
if (!flags.contains(CacheFlag.FORCE)) {
cacheManager.waitForRescanIfNeeded();
}
+ boolean success = false;
writeLock();
String effectiveDirectiveStr = null;
Long result = null;
@@ -8275,8 +8180,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
CacheDirectiveInfo effectiveDirective =
cacheManager.addDirective(directive, pc, flags);
- getEditLog().logAddCacheDirectiveInfo(effectiveDirective,
- cacheEntry != null);
+ getEditLog().logAddCacheDirectiveInfo(effectiveDirective, logRetryCache);
result = effectiveDirective.getId();
effectiveDirectiveStr = effectiveDirective.toString();
success = true;
@@ -8288,21 +8192,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "addCacheDirective", effectiveDirectiveStr, null, null);
}
- RetryCache.setState(cacheEntry, success, result);
+
}
return result;
}
void modifyCacheDirective(CacheDirectiveInfo directive,
- EnumSet<CacheFlag> flags) throws IOException {
+ EnumSet<CacheFlag> flags, boolean logRetryCache) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
boolean success = false;
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return;
- }
+
if (!flags.contains(CacheFlag.FORCE)) {
cacheManager.waitForRescanIfNeeded();
}
@@ -8314,8 +8215,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
"Cannot add cache directive", safeMode);
}
cacheManager.modifyDirective(directive, pc, flags);
- getEditLog().logModifyCacheDirectiveInfo(directive,
- cacheEntry != null);
+ getEditLog().logModifyCacheDirectiveInfo(directive, logRetryCache);
success = true;
} finally {
writeUnlock();
@@ -8326,18 +8226,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
String idStr = "{id: " + directive.getId().toString() + "}";
logAuditEvent(success, "modifyCacheDirective", idStr, directive.toString(), null);
}
- RetryCache.setState(cacheEntry, success);
}
}
- void removeCacheDirective(Long id) throws IOException {
+ void removeCacheDirective(long id, boolean logRetryCache) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return;
- }
+
boolean success = false;
writeLock();
try {
@@ -8347,16 +8243,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
"Cannot remove cache directives", safeMode);
}
cacheManager.removeDirective(id, pc);
- getEditLog().logRemoveCacheDirectiveInfo(id, cacheEntry != null);
+ getEditLog().logRemoveCacheDirectiveInfo(id, logRetryCache);
success = true;
} finally {
writeUnlock();
if (isAuditEnabled() && isExternalInvocation()) {
- String idStr = "{id: " + id.toString() + "}";
+ String idStr = "{id: " + Long.toString(id) + "}";
logAuditEvent(success, "removeCacheDirective", idStr, null,
null);
}
- RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
}
@@ -8385,14 +8280,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return results;
}
- public void addCachePool(CachePoolInfo req) throws IOException {
+ public void addCachePool(CachePoolInfo req, boolean logRetryCache)
+ throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return; // Return previous response
- }
+
writeLock();
boolean success = false;
String poolInfoStr = null;
@@ -8407,27 +8300,24 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
CachePoolInfo info = cacheManager.addCachePool(req);
poolInfoStr = info.toString();
- getEditLog().logAddCachePool(info, cacheEntry != null);
+ getEditLog().logAddCachePool(info, logRetryCache);
success = true;
} finally {
writeUnlock();
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(success, "addCachePool", poolInfoStr, null, null);
}
- RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
}
- public void modifyCachePool(CachePoolInfo req) throws IOException {
+ public void modifyCachePool(CachePoolInfo req, boolean logRetryCache)
+ throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc =
isPermissionEnabled ? getPermissionChecker() : null;
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return; // Return previous response
- }
+
writeLock();
boolean success = false;
try {
@@ -8440,7 +8330,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
pc.checkSuperuserPrivilege();
}
cacheManager.modifyCachePool(req);
- getEditLog().logModifyCachePool(req, cacheEntry != null);
+ getEditLog().logModifyCachePool(req, logRetryCache);
success = true;
} finally {
writeUnlock();
@@ -8448,20 +8338,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
String poolNameStr = "{poolName: " + req.getPoolName() + "}";
logAuditEvent(success, "modifyCachePool", poolNameStr, req.toString(), null);
}
- RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
}
- public void removeCachePool(String cachePoolName) throws IOException {
+ public void removeCachePool(String cachePoolName, boolean logRetryCache)
+ throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc =
isPermissionEnabled ? getPermissionChecker() : null;
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return; // Return previous response
- }
+
writeLock();
boolean success = false;
try {
@@ -8474,7 +8361,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
pc.checkSuperuserPrivilege();
}
cacheManager.removeCachePool(cachePoolName);
- getEditLog().logRemoveCachePool(cachePoolName, cacheEntry != null);
+ getEditLog().logRemoveCachePool(cachePoolName, logRetryCache);
success = true;
} finally {
writeUnlock();
@@ -8482,7 +8369,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
String poolNameStr = "{poolName: " + cachePoolName + "}";
logAuditEvent(success, "removeCachePool", poolNameStr, null, null);
}
- RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
@@ -8675,15 +8561,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws UnresolvedLinkException if the path can't be resolved.
* @throws SafeModeException if the Namenode is in safe mode.
*/
- void createEncryptionZone(final String src, final String keyName)
+ void createEncryptionZone(final String src, final String keyName,
+ boolean logRetryCache)
throws IOException, UnresolvedLinkException,
SafeModeException, AccessControlException {
- final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return; // Return previous response
- }
-
- boolean success = false;
try {
if (provider == null) {
throw new IOException(
@@ -8709,13 +8590,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// If the provider supports pool for EDEKs, this will fill in the pool
generateEncryptedDataEncryptionKey(keyName);
createEncryptionZoneInt(src, metadata.getCipher(),
- keyName, cacheEntry != null);
- success = true;
+ keyName, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "createEncryptionZone", src);
throw e;
- } finally {
- RetryCache.setState(cacheEntry, success);
}
}
@@ -8820,22 +8698,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws UnresolvedLinkException
* @throws IOException
*/
- void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
+ void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
+ boolean logRetryCache)
throws AccessControlException, SafeModeException,
UnresolvedLinkException, IOException {
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return; // Return previous response
- }
- boolean success = false;
try {
- setXAttrInt(src, xAttr, flag, cacheEntry != null);
- success = true;
+ setXAttrInt(src, xAttr, flag, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "setXAttr", src);
throw e;
- } finally {
- RetryCache.setState(cacheEntry, success);
}
}
@@ -8985,20 +8856,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws UnresolvedLinkException
* @throws IOException
*/
- void removeXAttr(String src, XAttr xAttr) throws IOException {
- CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
- if (cacheEntry != null && cacheEntry.isSuccess()) {
- return; // Return previous response
- }
- boolean success = false;
+ void removeXAttr(String src, XAttr xAttr, boolean logRetryCache)
+ throws IOException {
try {
- removeXAttrInt(src, xAttr, cacheEntry != null);
- success = true;
+ removeXAttrInt(src, xAttr, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "removeXAttr", src);
throw e;
- } finally {
- RetryCache.setState(cacheEntry, success);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9db888b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 1b5d7e6..7ba0921 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -138,6 +138,9 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RetryCache;
+import org.apache.hadoop.ipc.RetryCache.CacheEntry;
+import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.ipc.RefreshRegistry;
@@ -166,7 +169,6 @@ import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappin
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.tracing.SpanReceiverInfo;
-import org.apache.hadoop.tracing.TraceAdminPB;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
@@ -190,7 +192,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
protected final FSNamesystem namesystem;
protected final NameNode nn;
private final NameNodeMetrics metrics;
-
+
+ private final RetryCache retryCache;
+
private final boolean serviceAuthEnabled;
/** The RPC server that listens to requests from DataNodes */
@@ -207,6 +211,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
throws IOException {
this.nn = nn;
this.namesystem = nn.getNamesystem();
+ this.retryCache = namesystem.getRetryCache();
this.metrics = NameNode.getNameNodeMetrics();
int handlerCount =
@@ -505,14 +510,36 @@ class NameNodeRpcServer implements NamenodeProtocols {
verifyRequest(registration);
if(!nn.isRole(NamenodeRole.NAMENODE))
throw new IOException("Only an ACTIVE node can invoke startCheckpoint.");
- return namesystem.startCheckpoint(registration, nn.setRegistration());
+
+ CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+ null);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return (NamenodeCommand) cacheEntry.getPayload();
+ }
+ NamenodeCommand ret = null;
+ try {
+ ret = namesystem.startCheckpoint(registration, nn.setRegistration());
+ } finally {
+ RetryCache.setState(cacheEntry, ret != null, ret);
+ }
+ return ret;
}
@Override // NamenodeProtocol
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
namesystem.checkSuperuserPrivilege();
- namesystem.endCheckpoint(registration, sig);
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ boolean success = false;
+ try {
+ namesystem.endCheckpoint(registration, sig);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
}
@Override // ClientProtocol
@@ -557,19 +584,32 @@ class NameNodeRpcServer implements NamenodeProtocols {
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.create: file "
- +src+" for "+clientName+" at "+clientMachine);
+ +src+" for "+clientName+" at "+clientMachine);
}
if (!checkPathLength(src)) {
throw new IOException("create: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
- HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus(
- getRemoteUser().getShortUserName(), null, masked),
- clientName, clientMachine, flag.get(), createParent, replication,
- blockSize, supportedVersions);
+
+ CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return (HdfsFileStatus) cacheEntry.getPayload();
+ }
+
+ HdfsFileStatus status = null;
+ try {
+ PermissionStatus perm = new PermissionStatus(getRemoteUser()
+ .getShortUserName(), null, masked);
+ status = namesystem.startFile(src, perm, clientName, clientMachine,
+ flag.get(), createParent, replication, blockSize, supportedVersions,
+ cacheEntry != null);
+ } finally {
+ RetryCache.setState(cacheEntry, status != null, status);
+ }
+
metrics.incrFilesCreated();
metrics.incrCreateFileOps();
- return fileStatus;
+ return status;
}
@Override // ClientProtocol
@@ -580,7 +620,20 @@ class NameNodeRpcServer implements NamenodeProtocols {
stateChangeLog.debug("*DIR* NameNode.append: file "
+src+" for "+clientName+" at "+clientMachine);
}
- LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
+ CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return (LocatedBlock) cacheEntry.getPayload();
+ }
+
+ LocatedBlock info = null;
+ boolean success = false;
+ try {
+ info = namesystem.appendFile(src, clientName, clientMachine,
+ cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success, info);
+ }
metrics.incrFilesAppended();
return info;
}
@@ -722,7 +775,19 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
throws IOException {
- namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes, newStorageIDs);
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+
+ boolean success = false;
+ try {
+ namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes,
+ newStorageIDs, cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
}
@Override // DatanodeProtocol
@@ -751,7 +816,18 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("rename: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
- boolean ret = namesystem.renameTo(src, dst);
+
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return true; // Return previous response
+ }
+
+ boolean ret = false;
+ try {
+ ret = namesystem.renameTo(src, dst, cacheEntry != null);
+ } finally {
+ RetryCache.setState(cacheEntry, ret);
+ }
if (ret) {
metrics.incrFilesRenamed();
}
@@ -760,7 +836,18 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public void concat(String trg, String[] src) throws IOException {
- namesystem.concat(trg, src);
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ boolean success = false;
+
+ try {
+ namesystem.concat(trg, src, cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
}
@Override // ClientProtocol
@@ -773,7 +860,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("rename: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
- namesystem.renameTo(src, dst, options);
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ boolean success = false;
+ try {
+ namesystem.renameTo(src, dst, cacheEntry != null, options);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
metrics.incrFilesRenamed();
}
@@ -783,7 +880,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
+ ", recursive=" + recursive);
}
- boolean ret = namesystem.delete(src, recursive);
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return true; // Return previous response
+ }
+
+ boolean ret = false;
+ try {
+ ret = namesystem.delete(src, recursive, cacheEntry != null);
+ } finally {
+ RetryCache.setState(cacheEntry, ret);
+ }
if (ret)
metrics.incrDeleteFileOps();
return ret;
@@ -899,7 +1006,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public void saveNamespace() throws IOException {
- namesystem.saveNamespace();
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ boolean success = false;
+ try {
+ namesystem.saveNamespace();
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
}
@Override // ClientProtocol
@@ -1020,6 +1137,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public void createSymlink(String target, String link, FsPermission dirPerms,
boolean createParent) throws IOException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+
metrics.incrCreateSymlinkOps();
/* We enforce the MAX_PATH_LENGTH limit even though a symlink target
* URI may refer to a non-HDFS file system.
@@ -1033,8 +1155,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("Invalid symlink target");
}
final UserGroupInformation ugi = getRemoteUser();
- namesystem.createSymlink(target, link,
- new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent);
+
+ boolean success = false;
+ try {
+ PermissionStatus perm = new PermissionStatus(ugi.getShortUserName(),
+ null, dirPerms);
+ namesystem.createSymlink(target, link, perm, createParent,
+ cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
}
@Override // ClientProtocol
@@ -1322,15 +1453,38 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("createSnapshot: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
+ CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+ null);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return (String) cacheEntry.getPayload();
+ }
+
metrics.incrCreateSnapshotOps();
- return namesystem.createSnapshot(snapshotRoot, snapshotName);
+ String ret = null;
+ try {
+ ret = namesystem.createSnapshot(snapshotRoot, snapshotName,
+ cacheEntry != null);
+ } finally {
+ RetryCache.setState(cacheEntry, ret != null, ret);
+ }
+ return ret;
}
@Override
public void deleteSnapshot(String snapshotRoot, String snapshotName)
throws IOException {
metrics.incrDeleteSnapshotOps();
- namesystem.deleteSnapshot(snapshotRoot, snapshotName);
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ boolean success = false;
+ try {
+ namesystem.deleteSnapshot(snapshotRoot, snapshotName, cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
}
@Override
@@ -1354,7 +1508,18 @@ class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("The new snapshot name is null or empty.");
}
metrics.incrRenameSnapshotOps();
- namesystem.renameSnapshot(snapshotRoot, snapshotOldName, snapshotNewName);
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ boolean success = false;
+ try {
+ namesystem.renameSnapshot(snapshotRoot, snapshotOldName,
+ snapshotNewName, cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
}
@Override // Client Protocol
@@ -1378,18 +1543,53 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override
public long addCacheDirective(
CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
- return namesystem.addCacheDirective(path, flags);
+ CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion
+ (retryCache, null);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return (Long) cacheEntry.getPayload();
+ }
+
+ boolean success = false;
+ long ret = 0;
+ try {
+ ret = namesystem.addCacheDirective(path, flags, cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success, ret);
+ }
+ return ret;
}
@Override
public void modifyCacheDirective(
CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
- namesystem.modifyCacheDirective(directive, flags);
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return;
+ }
+
+ boolean success = false;
+ try {
+ namesystem.modifyCacheDirective(directive, flags, cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
}
@Override
public void removeCacheDirective(long id) throws IOException {
- namesystem.removeCacheDirective(id);
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return;
+ }
+ boolean success = false;
+ try {
+ namesystem.removeCacheDirective(id, cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
}
@Override
@@ -1403,17 +1603,47 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override
public void addCachePool(CachePoolInfo info) throws IOException {
- namesystem.addCachePool(info);
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ boolean success = false;
+ try {
+ namesystem.addCachePool(info, cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
}
@Override
public void modifyCachePool(CachePoolInfo info) throws IOException {
- namesystem.modifyCachePool(info);
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ boolean success = false;
+ try {
+ namesystem.modifyCachePool(info, cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
}
@Override
public void removeCachePool(String cachePoolName) throws IOException {
- namesystem.removeCachePool(cachePoolName);
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return;
+ }
+ boolean success = false;
+ try {
+ namesystem.removeCachePool(cachePoolName, cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
}
@Override
@@ -1457,7 +1687,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override
public void createEncryptionZone(String src, String keyName)
throws IOException {
- namesystem.createEncryptionZone(src, keyName);
+ final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return;
+ }
+ boolean success = false;
+ try {
+ namesystem.createEncryptionZone(src, keyName, cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
}
@Override
@@ -1475,7 +1715,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override
public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
throws IOException {
- namesystem.setXAttr(src, xAttr, flag);
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ boolean success = false;
+ try {
+ namesystem.setXAttr(src, xAttr, flag, cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
}
@Override
@@ -1491,7 +1741,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override
public void removeXAttr(String src, XAttr xAttr) throws IOException {
- namesystem.removeXAttr(src, xAttr);
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ boolean success = false;
+ try {
+ namesystem.removeXAttr(src, xAttr, cacheEntry != null);
+ success = true;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9db888b8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
index 867c389..38daddc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
@@ -19,15 +19,11 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.*;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -37,16 +33,14 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.net.StaticMapping;
-import org.apache.hadoop.security.AccessControlException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestDefaultBlockPlacementPolicy {
- private Configuration conf;
- private final short REPLICATION_FACTOR = (short) 3;
- private final int DEFAULT_BLOCK_SIZE = 1024;
+ private static final short REPLICATION_FACTOR = (short) 3;
+ private static final int DEFAULT_BLOCK_SIZE = 1024;
private MiniDFSCluster cluster = null;
private NamenodeProtocols nameNodeRpc = null;
private FSNamesystem namesystem = null;
@@ -55,13 +49,12 @@ public class TestDefaultBlockPlacementPolicy {
@Before
public void setup() throws IOException {
StaticMapping.resetMap();
- conf = new HdfsConfiguration();
+ Configuration conf = new HdfsConfiguration();
final String[] racks = { "/RACK0", "/RACK0", "/RACK2", "/RACK3", "/RACK2" };
final String[] hosts = { "/host0", "/host1", "/host2", "/host3", "/host4" };
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
- conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
- DEFAULT_BLOCK_SIZE / 2);
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).racks(racks)
.hosts(hosts).build();
cluster.waitActive();
@@ -104,17 +97,14 @@ public class TestDefaultBlockPlacementPolicy {
}
private void testPlacement(String clientMachine,
- String clientRack) throws AccessControlException,
- SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
- FileNotFoundException, ParentNotDirectoryException, IOException,
- NotReplicatedYetException {
+ String clientRack) throws IOException {
// write 5 files and check whether all times block placed
for (int i = 0; i < 5; i++) {
String src = "/test-" + i;
// Create the file with client machine
HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
- REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null);
+ REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false);
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
null, null, fileStatus.getFileId(), null);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9db888b8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
index 577d505..945972d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
@@ -191,7 +191,7 @@ public class TestFsLimits {
lazyInitFSDirectory();
Class<?> generated = null;
try {
- fs.renameTo(src, dst, new Rename[] { });
+ fs.renameTo(src, dst, false, new Rename[] { });
} catch (Throwable e) {
generated = e.getClass();
}
@@ -204,7 +204,7 @@ public class TestFsLimits {
lazyInitFSDirectory();
Class<?> generated = null;
try {
- fs.renameTo(src, dst);
+ fs.renameTo(src, dst, false);
} catch (Throwable e) {
generated = e.getClass();
}