You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2013/07/26 03:09:27 UTC
svn commit: r1507170 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/resourc...
Author: suresh
Date: Fri Jul 26 01:09:27 2013
New Revision: 1507170
URL: http://svn.apache.org/r1507170
Log:
HDFS-4979. Implement retry cache on Namenode. Contributed by Suresh Srinivas.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1507170&r1=1507169&r2=1507170&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jul 26 01:09:27 2013
@@ -336,6 +336,8 @@ Release 2.1.0-beta - 2013-07-02
HDFS-4974. Add Idempotent and AtMostOnce annotations to namenode
protocol methods. (suresh)
+
+ HDFS-4979. Implement retry cache on Namenode. (suresh)
IMPROVEMENTS
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1507170&r1=1507169&r2=1507170&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jul 26 01:09:27 2013
@@ -488,4 +488,11 @@ public class DFSConfigKeys extends Commo
public static final String DFS_MAX_NUM_BLOCKS_TO_LOG_KEY = "dfs.namenode.max-num-blocks-to-log";
public static final long DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT = 1000l;
+
+ public static final String DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY = "dfs.namenode.enable.retrycache";
+ public static final boolean DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT = true;
+ public static final String DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY = "dfs.namenode.retrycache.expirytime.millis";
+ public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes
+ public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent";
+ public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1507170&r1=1507169&r2=1507170&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Jul 26 01:09:27 2013
@@ -87,7 +87,7 @@ import com.google.common.collect.Sets;
public class BlockManager {
static final Log LOG = LogFactory.getLog(BlockManager.class);
- static final Log blockLog = NameNode.blockStateChangeLog;
+ public static final Log blockLog = NameNode.blockStateChangeLog;
/** Default load factor of map */
public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
@@ -2686,64 +2686,58 @@ assert storedBlock.findDatanode(dn) < 0
* The given node is reporting incremental information about some blocks.
* This includes blocks that are starting to be received, completed being
* received, or deleted.
+ *
+ * This method must be called with FSNamesystem lock held.
*/
- public void processIncrementalBlockReport(final DatanodeID nodeID,
- final String poolId,
- final ReceivedDeletedBlockInfo blockInfos[]
- ) throws IOException {
- namesystem.writeLock();
+ public void processIncrementalBlockReport(final DatanodeID nodeID,
+ final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
+ throws IOException {
+ assert namesystem.hasWriteLock();
int received = 0;
int deleted = 0;
int receiving = 0;
- try {
- final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
- if (node == null || !node.isAlive) {
- blockLog
- .warn("BLOCK* processIncrementalBlockReport"
- + " is received from dead or unregistered node "
- + nodeID);
- throw new IOException(
- "Got incremental block report from unregistered or dead node");
- }
+ final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+ if (node == null || !node.isAlive) {
+ blockLog
+ .warn("BLOCK* processIncrementalBlockReport"
+ + " is received from dead or unregistered node "
+ + nodeID);
+ throw new IOException(
+ "Got incremental block report from unregistered or dead node");
+ }
- for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
- switch (rdbi.getStatus()) {
- case DELETED_BLOCK:
- removeStoredBlock(rdbi.getBlock(), node);
- deleted++;
- break;
- case RECEIVED_BLOCK:
- addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
- received++;
- break;
- case RECEIVING_BLOCK:
- receiving++;
- processAndHandleReportedBlock(node, rdbi.getBlock(),
- ReplicaState.RBW, null);
- break;
- default:
- String msg =
- "Unknown block status code reported by " + nodeID +
- ": " + rdbi;
- blockLog.warn(msg);
- assert false : msg; // if assertions are enabled, throw.
- break;
- }
- if (blockLog.isDebugEnabled()) {
- blockLog.debug("BLOCK* block "
- + (rdbi.getStatus()) + ": " + rdbi.getBlock()
- + " is received from " + nodeID);
- }
+ for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
+ switch (rdbi.getStatus()) {
+ case DELETED_BLOCK:
+ removeStoredBlock(rdbi.getBlock(), node);
+ deleted++;
+ break;
+ case RECEIVED_BLOCK:
+ addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
+ received++;
+ break;
+ case RECEIVING_BLOCK:
+ receiving++;
+ processAndHandleReportedBlock(node, rdbi.getBlock(),
+ ReplicaState.RBW, null);
+ break;
+ default:
+ String msg =
+ "Unknown block status code reported by " + nodeID +
+ ": " + rdbi;
+ blockLog.warn(msg);
+ assert false : msg; // if assertions are enabled, throw.
+ break;
+ }
+ if (blockLog.isDebugEnabled()) {
+ blockLog.debug("BLOCK* block "
+ + (rdbi.getStatus()) + ": " + rdbi.getBlock()
+ + " is received from " + nodeID);
}
- } finally {
- namesystem.writeUnlock();
- blockLog
- .debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
- + nodeID
- + " receiving: " + receiving + ", "
- + " received: " + received + ", "
- + " deleted: " + deleted);
}
+ blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
+ + nodeID + " receiving: " + receiving + ", " + " received: " + received
+ + ", " + " deleted: " + deleted);
}
/**
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1507170&r1=1507169&r2=1507170&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Jul 26 01:09:27 2013
@@ -63,8 +63,6 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.util.IdGenerator;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1507170&r1=1507169&r2=1507170&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jul 26 01:09:27 2013
@@ -47,6 +47,8 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
@@ -55,6 +57,10 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
@@ -195,8 +201,12 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RetryCache;
+import org.apache.hadoop.ipc.RetryCache.CacheEntry;
+import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.annotation.Metric;
@@ -264,7 +274,8 @@ public class FSNamesystem implements Nam
}
};
- private boolean isAuditEnabled() {
+ @VisibleForTesting
+ public boolean isAuditEnabled() {
return !isDefaultAuditLogger || auditLog.isInfoEnabled();
}
@@ -437,6 +448,8 @@ public class FSNamesystem implements Nam
private INodeId inodeId;
+ private final RetryCache retryCache;
+
/**
* Set the last allocated inode id when fsimage or editlog is loaded.
*/
@@ -671,6 +684,7 @@ public class FSNamesystem implements Nam
this.auditLoggers = initAuditLoggers(conf);
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
auditLoggers.get(0) instanceof DefaultAuditLogger;
+ this.retryCache = initRetryCache(conf);
} catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
@@ -681,6 +695,28 @@ public class FSNamesystem implements Nam
throw re;
}
}
+
+ @VisibleForTesting
+ static RetryCache initRetryCache(Configuration conf) {
+ boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
+ DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
+ LOG.info("Retry cache on namenode is " + (enable ? "enabled" : "disabled"));
+ if (enable) {
+ float heapPercent = conf.getFloat(
+ DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY,
+ DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT);
+ long entryExpiryMillis = conf.getLong(
+ DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY,
+ DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT);
+ LOG.info("Retry cache will use " + heapPercent
+ + " of total heap and retry cache entry expiry time is "
+ + entryExpiryMillis + " millis");
+ long entryExpiryNanos = entryExpiryMillis * 1000 * 1000;
+ return new RetryCache("Namenode Retry Cache", heapPercent,
+ entryExpiryNanos);
+ }
+ return null;
+ }
private List<AuditLogger> initAuditLoggers(Configuration conf) {
// Initialize the custom access loggers if configured.
@@ -741,7 +777,6 @@ public class FSNamesystem implements Nam
if (!haEnabled) {
fsImage.openEditLogForWrite();
}
-
success = true;
} finally {
if (!success) {
@@ -818,6 +853,7 @@ public class FSNamesystem implements Nam
} finally {
writeUnlock();
}
+ RetryCache.clear(retryCache);
}
/**
@@ -1487,15 +1523,26 @@ public class FSNamesystem implements Nam
*/
void concat(String target, String [] srcs)
throws IOException, UnresolvedLinkException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+
+ // Either there is no previous request in progres or it has failed
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
" to " + target);
}
+
+ boolean success = false;
try {
concatInt(target, srcs);
+ success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
throw e;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
}
}
@@ -1704,17 +1751,25 @@ public class FSNamesystem implements Nam
void createSymlink(String target, String link,
PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
if (!DFSUtil.isValidName(link)) {
throw new InvalidPathException("Invalid link name: " + link);
}
if (FSDirectory.isReservedName(target)) {
throw new InvalidPathException("Invalid target name: " + target);
}
+ boolean success = false;
try {
createSymlinkInt(target, link, dirPerms, createParent);
+ success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "createSymlink", link, target, null);
throw e;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
}
}
@@ -1853,13 +1908,17 @@ public class FSNamesystem implements Nam
}
}
}
-
+
/**
* Create a new file entry in the namespace.
*
- * For description of parameters and exceptions thrown see
- * {@link ClientProtocol#create()}, except it returns valid file status
- * upon success
+ * For description of parameters and exceptions thrown see
+ * {@link ClientProtocol#create()}, except it returns valid file status upon
+ * success
+ *
+ * For retryCache handling details see -
+ * {@link #getFileStatus(boolean, CacheEntryWithPayload)}
+ *
*/
HdfsFileStatus startFile(String src, PermissionStatus permissions,
String holder, String clientMachine, EnumSet<CreateFlag> flag,
@@ -1867,13 +1926,23 @@ public class FSNamesystem implements Nam
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
+ HdfsFileStatus status = null;
+ CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+ null);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return (HdfsFileStatus) cacheEntry.getPayload();
+ }
+
try {
- return startFileInt(src, permissions, holder, clientMachine, flag,
+ status = startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize);
} catch (AccessControlException e) {
logAuditEvent(false, "create", src);
throw e;
+ } finally {
+ RetryCache.setState(cacheEntry, status != null, status);
}
+ return status;
}
private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
@@ -1896,7 +1965,7 @@ public class FSNamesystem implements Nam
blockManager.verifyReplication(src, replication, clientMachine);
boolean skipSync = false;
- final HdfsFileStatus stat;
+ HdfsFileStatus stat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
if (blockSize < minBlockSize) {
@@ -1977,7 +2046,12 @@ public class FSNamesystem implements Nam
}
} else {
if (overwrite) {
- delete(src, true); // File exists - delete if overwrite
+ try {
+ deleteInt(src, true); // File exists - delete if overwrite
+ } catch (AccessControlException e) {
+ logAuditEvent(false, "delete", src);
+ throw e;
+ }
} else {
// If lease soft limit time is expired, recover the lease
recoverLeaseInternal(myFile, src, holder, clientMachine, false);
@@ -2226,16 +2300,28 @@ public class FSNamesystem implements Nam
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, IOException {
+ LocatedBlock lb = null;
+ CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+ null);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return (LocatedBlock) cacheEntry.getPayload();
+ }
+
+ boolean success = false;
try {
- return appendFileInt(src, holder, clientMachine);
+ lb = appendFileInt(src, holder, clientMachine);
+ success = true;
+ return lb;
} catch (AccessControlException e) {
logAuditEvent(false, "append", src);
throw e;
+ } finally {
+ RetryCache.setState(cacheEntry, success, lb);
}
}
- private LocatedBlock appendFileInt(String src, String holder, String clientMachine)
- throws AccessControlException, SafeModeException,
+ private LocatedBlock appendFileInt(String src, String holder,
+ String clientMachine) throws AccessControlException, SafeModeException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2798,12 +2884,20 @@ public class FSNamesystem implements Nam
@Deprecated
boolean renameTo(String src, String dst)
throws IOException, UnresolvedLinkException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return true; // Return previous response
+ }
+ boolean ret = false;
try {
- return renameToInt(src, dst);
+ ret = renameToInt(src, dst);
} catch (AccessControlException e) {
logAuditEvent(false, "rename", src, dst, null);
throw e;
+ } finally {
+ RetryCache.setState(cacheEntry, ret);
}
+ return ret;
}
private boolean renameToInt(String src, String dst)
@@ -2870,6 +2964,10 @@ public class FSNamesystem implements Nam
/** Rename src to dst */
void renameTo(String src, String dst, Options.Rename... options)
throws IOException, UnresolvedLinkException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+ src + " to " + dst);
@@ -2882,6 +2980,7 @@ public class FSNamesystem implements Nam
byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
HdfsFileStatus resultingStat = null;
+ boolean success = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@@ -2892,8 +2991,10 @@ public class FSNamesystem implements Nam
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
renameToInternal(pc, src, dst, options);
resultingStat = getAuditFileInfo(dst, false);
+ success = true;
} finally {
writeUnlock();
+ RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
if (resultingStat != null) {
@@ -2925,12 +3026,20 @@ public class FSNamesystem implements Nam
boolean delete(String src, boolean recursive)
throws AccessControlException, SafeModeException,
UnresolvedLinkException, IOException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return true; // Return previous response
+ }
+ boolean ret = false;
try {
- return deleteInt(src, recursive);
+ ret = deleteInt(src, recursive);
} catch (AccessControlException e) {
logAuditEvent(false, "delete", src);
throw e;
+ } finally {
+ RetryCache.setState(cacheEntry, ret);
}
+ return ret;
}
private boolean deleteInt(String src, boolean recursive)
@@ -2954,6 +3063,7 @@ public class FSNamesystem implements Nam
throw new AccessControlException(ioe);
}
}
+
/**
* Remove a file/directory from the namespace.
* <p>
@@ -2974,6 +3084,7 @@ public class FSNamesystem implements Nam
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ boolean ret = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@@ -2992,6 +3103,7 @@ public class FSNamesystem implements Nam
if (!dir.delete(src, collectedBlocks, removedINodes)) {
return false;
}
+ ret = true;
} finally {
writeUnlock();
}
@@ -3009,7 +3121,7 @@ public class FSNamesystem implements Nam
NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
+ src +" is removed");
}
- return true;
+ return ret;
}
/**
@@ -3175,12 +3287,14 @@ public class FSNamesystem implements Nam
*/
boolean mkdirs(String src, PermissionStatus permissions,
boolean createParent) throws IOException, UnresolvedLinkException {
+ boolean ret = false;
try {
- return mkdirsInt(src, permissions, createParent);
+ ret = mkdirsInt(src, permissions, createParent);
} catch (AccessControlException e) {
logAuditEvent(false, "mkdirs", src);
throw e;
}
+ return ret;
}
private boolean mkdirsInt(String src, PermissionStatus permissions,
@@ -3240,7 +3354,7 @@ public class FSNamesystem implements Nam
}
// validate that we have enough inodes. This is, at best, a
- // heuristic because the mkdirs() operation migth need to
+ // heuristic because the mkdirs() operation might need to
// create multiple inodes.
checkFsObjectLimit();
@@ -4040,8 +4154,13 @@ public class FSNamesystem implements Nam
* @throws IOException if
*/
void saveNamespace() throws AccessControlException, IOException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
checkSuperuserPrivilege();
checkOperation(OperationCategory.UNCHECKED);
+ boolean success = false;
readLock();
try {
checkOperation(OperationCategory.UNCHECKED);
@@ -4050,10 +4169,12 @@ public class FSNamesystem implements Nam
"in order to create namespace image.");
}
getFSImage().saveNamespace(this);
- LOG.info("New namespace image has been created");
+ success = true;
} finally {
readUnlock();
+ RetryCache.setState(cacheEntry, success);
}
+ LOG.info("New namespace image has been created");
}
/**
@@ -4611,6 +4732,7 @@ public class FSNamesystem implements Nam
try {
Thread.sleep(recheckInterval);
} catch (InterruptedException ie) {
+ // Ignored
}
}
if (!fsRunning) {
@@ -4869,30 +4991,51 @@ public class FSNamesystem implements Nam
}
}
- NamenodeCommand startCheckpoint(
- NamenodeRegistration bnReg, // backup node
- NamenodeRegistration nnReg) // active name-node
- throws IOException {
+ NamenodeCommand startCheckpoint(NamenodeRegistration backupNode,
+ NamenodeRegistration activeNamenode) throws IOException {
checkOperation(OperationCategory.CHECKPOINT);
+ CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+ null);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return (NamenodeCommand) cacheEntry.getPayload();
+ }
writeLock();
+ NamenodeCommand cmd = null;
try {
checkOperation(OperationCategory.CHECKPOINT);
if (isInSafeMode()) {
throw new SafeModeException("Checkpoint not started", safeMode);
}
- LOG.info("Start checkpoint for " + bnReg.getAddress());
- NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
+ LOG.info("Start checkpoint for " + backupNode.getAddress());
+ cmd = getFSImage().startCheckpoint(backupNode, activeNamenode);
getEditLog().logSync();
return cmd;
} finally {
writeUnlock();
+ RetryCache.setState(cacheEntry, cmd != null, cmd);
}
}
+ public void processIncrementalBlockReport(final DatanodeID nodeID,
+ final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
+ throws IOException {
+ writeLock();
+ try {
+ blockManager.processIncrementalBlockReport(nodeID, poolId, blockInfos);
+ } finally {
+ writeUnlock();
+ }
+ }
+
void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
checkOperation(OperationCategory.CHECKPOINT);
+ boolean success = false;
readLock();
try {
checkOperation(OperationCategory.CHECKPOINT);
@@ -4902,8 +5045,10 @@ public class FSNamesystem implements Nam
}
LOG.info("End checkpoint for " + registration.getAddress());
getFSImage().endCheckpoint(sig);
+ success = true;
} finally {
readUnlock();
+ RetryCache.setState(cacheEntry, success);
}
}
@@ -5421,6 +5566,10 @@ public class FSNamesystem implements Nam
void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes)
throws IOException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
checkOperation(OperationCategory.WRITE);
LOG.info("updatePipeline(block=" + oldBlock
+ ", newGenerationStamp=" + newBlock.getGenerationStamp()
@@ -5429,17 +5578,19 @@ public class FSNamesystem implements Nam
+ ", clientName=" + clientName
+ ")");
writeLock();
+ boolean success = false;
try {
checkOperation(OperationCategory.WRITE);
-
if (isInSafeMode()) {
throw new SafeModeException("Pipeline not updated", safeMode);
}
assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+ oldBlock + " has different block identifier";
updatePipelineInternal(clientName, oldBlock, newBlock, newNodes);
+ success = true;
} finally {
writeUnlock();
+ RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
@@ -6304,9 +6455,14 @@ public class FSNamesystem implements Nam
*/
String createSnapshot(String snapshotRoot, String snapshotName)
throws SafeModeException, IOException {
+ CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+ null);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return (String) cacheEntry.getPayload();
+ }
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
- final String snapshotPath;
+ String snapshotPath = null;
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
@@ -6330,6 +6486,7 @@ public class FSNamesystem implements Nam
getEditLog().logCreateSnapshot(snapshotRoot, snapshotName);
} finally {
writeUnlock();
+ RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath);
}
getEditLog().logSync();
@@ -6349,8 +6506,13 @@ public class FSNamesystem implements Nam
*/
void renameSnapshot(String path, String snapshotOldName,
String snapshotNewName) throws SafeModeException, IOException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
+ boolean success = false;
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
@@ -6364,8 +6526,10 @@ public class FSNamesystem implements Nam
snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName);
+ success = true;
} finally {
writeUnlock();
+ RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
@@ -6458,6 +6622,11 @@ public class FSNamesystem implements Nam
void deleteSnapshot(String snapshotRoot, String snapshotName)
throws SafeModeException, IOException {
final FSPermissionChecker pc = getPermissionChecker();
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ boolean success = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@@ -6481,8 +6650,10 @@ public class FSNamesystem implements Nam
this.removeBlocks(collectedBlocks);
collectedBlocks.clear();
getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName);
+ success = true;
} finally {
writeUnlock();
+ RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1507170&r1=1507169&r2=1507170&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Fri Jul 26 01:09:27 2013
@@ -960,7 +960,7 @@ class NameNodeRpcServer implements Namen
+"from "+nodeReg+" "+receivedAndDeletedBlocks.length
+" blocks.");
}
- namesystem.getBlockManager().processIncrementalBlockReport(
+ namesystem.processIncrementalBlockReport(
nodeReg, poolId, receivedAndDeletedBlocks[0].getBlocks());
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1507170&r1=1507169&r2=1507170&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Jul 26 01:09:27 2013
@@ -1352,4 +1352,43 @@
</description>
</property>
+<property>
+ <name>dfs.namenode.enable.retrycache</name>
+ <value>true</value>
+ <description>
+ This enables the retry cache on the namenode. Namenode tracks for
+ non-idempotent requests the corresponding response. If a client retries the
+ request, the response from the retry cache is sent. Such operations
+ are tagged with annotation @AtMostOnce in namenode protocols. It is
+ recommended that this flag be set to true. Setting it to false, will result
+ in clients getting failure responses to retried request. This flag must
+ be enabled in HA setup for transparent fail-overs.
+
+ The entries in the cache have expiration time configurable
+ using dfs.namenode.retrycache.expirytime.millis.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.retrycache.expirytime.millis</name>
+ <value>600000</value>
+ <description>
+ The time for which retry cache entries are retained.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.retrycache.heap.percent</name>
+ <value>0.03f</value>
+ <description>
+ This parameter configures the heap size allocated for retry cache
+ (excluding the response cached). This corresponds to approximately
+ 4096 entries for every 64MB of namenode process java heap size.
+ Assuming retry cache entry expiration time (configured using
+ dfs.namenode.retrycache.expirytime.millis) of 10 minutes, this
+ enables retry cache to support 7 operations per second sustained
+ for 10 minutes. As the heap size is increased, the operation rate
+ linearly increases.
+ </description>
+</property>
</configuration>
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java?rev=1507170&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java Fri Jul 26 01:09:27 2013
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.ipc.RPC.RpcKind;
+import org.apache.hadoop.ipc.RetryCache;
+import org.apache.hadoop.ipc.RpcConstants;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests for ensuring the namenode retry cache works correctly for
+ * non-idempotent requests.
+ *
+ * Retry cache works based on tracking previously received request based on the
+ * ClientId and CallId received in RPC requests and storing the response. The
+ * response is replayed on retry when the same request is received again.
+ *
+ * The test works by manipulating the Rpc {@link Server} current RPC call. For
+ * testing retried requests, an Rpc callId is generated only once using
+ * {@link #newCall()} and reused for many method calls. For testing non-retried
+ * request, a new callId is generated using {@link #newCall()}.
+ */
+public class TestNamenodeRetryCache {
+ private static final byte[] CLIENT_ID = StringUtils.getUuidBytes();
+ private static MiniDFSCluster cluster;
+ private static FSNamesystem namesystem;
+ private static PermissionStatus perm = new PermissionStatus(
+ "TestNamenodeRetryCache", null, FsPermission.getDefault());
+ private static FileSystem filesystem;
+ private static int callId = 100;
+
+ /** Start a cluster */
+ @BeforeClass
+ public static void setup() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "512");
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, true);
+ cluster = new MiniDFSCluster.Builder(conf).build();
+ cluster.waitActive();
+ namesystem = cluster.getNamesystem();
+ filesystem = cluster.getFileSystem();
+ }
+
+ /** Cleanup after the test
+ * @throws IOException
+ * @throws UnresolvedLinkException
+ * @throws SafeModeException
+ * @throws AccessControlException */
+ @After
+ public void cleanup() throws IOException {
+ namesystem.delete("/", true);
+ }
+
+ public static void incrementCallId() {
+ callId++;
+ }
+
+ /** Set the current Server RPC call */
+ public static void newCall() {
+ Server.Call call = new Server.Call(++callId, 1, null, null,
+ RpcKind.RPC_PROTOCOL_BUFFER, CLIENT_ID);
+ Server.getCurCall().set(call);
+ }
+
+ public static void resetCall() {
+ Server.Call call = new Server.Call(RpcConstants.INVALID_CALL_ID, 1, null,
+ null, RpcKind.RPC_PROTOCOL_BUFFER, RpcConstants.DUMMY_CLIENT_ID);
+ Server.getCurCall().set(call);
+ }
+
+ private void concatSetup(String file1, String file2) throws Exception {
+ DFSTestUtil.createFile(filesystem, new Path(file1), 512, (short)1, 0L);
+ DFSTestUtil.createFile(filesystem, new Path(file2), 512, (short)1, 0L);
+ }
+
+ /**
+ * Tests for concat call
+ */
+ @Test
+ public void testConcat() throws Exception {
+ resetCall();
+ String file1 = "/testNamenodeRetryCache/testConcat/file1";
+ String file2 = "/testNamenodeRetryCache/testConcat/file2";
+
+ // Two retried concat calls succeed
+ concatSetup(file1, file2);
+ newCall();
+ namesystem.concat(file1, new String[]{file2});
+ namesystem.concat(file1, new String[]{file2});
+ namesystem.concat(file1, new String[]{file2});
+
+ // A non-retried concat request fails
+ newCall();
+ try {
+ // Second non-retry call should fail with an exception
+ namesystem.concat(file1, new String[]{file2});
+ Assert.fail("testConcat - expected exception is not thrown");
+ } catch (IOException e) {
+ // Expected
+ }
+ }
+
+ /**
+ * Tests for delete call
+ */
+ @Test
+ public void testDelete() throws Exception {
+ String dir = "/testNamenodeRetryCache/testDelete";
+ // Two retried calls to create a non existent file
+ newCall();
+ namesystem.mkdirs(dir, perm, true);
+ newCall();
+ Assert.assertTrue(namesystem.delete(dir, false));
+ Assert.assertTrue(namesystem.delete(dir, false));
+ Assert.assertTrue(namesystem.delete(dir, false));
+
+ // non-retried call fails and gets false as return
+ newCall();
+ Assert.assertFalse(namesystem.delete(dir, false));
+ }
+
+ /**
+ * Test for createSymlink
+ */
+ @Test
+ public void testCreateSymlink() throws Exception {
+ String target = "/testNamenodeRetryCache/testCreateSymlink/target";
+
+ // Two retried symlink calls succeed
+ newCall();
+ namesystem.createSymlink(target, "/a/b", perm, true);
+ namesystem.createSymlink(target, "/a/b", perm, true);
+ namesystem.createSymlink(target, "/a/b", perm, true);
+
+ // non-retried call fails
+ newCall();
+ try {
+ // Second non-retry call should fail with an exception
+ namesystem.createSymlink(target, "/a/b", perm, true);
+ Assert.fail("testCreateSymlink - expected exception is not thrown");
+ } catch (IOException e) {
+ // Expected
+ }
+ }
+
+ /**
+ * Test for create file
+ */
+ @Test
+ public void testCreate() throws Exception {
+ String src = "/testNamenodeRetryCache/testCreate/file";
+ // Two retried calls succeed
+ newCall();
+ HdfsFileStatus status = namesystem.startFile(src, perm, "holder",
+ "clientmachine", EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512);
+ Assert.assertEquals(status, namesystem.startFile(src, perm,
+ "holder", "clientmachine", EnumSet.of(CreateFlag.CREATE),
+ true, (short) 1, 512));
+ Assert.assertEquals(status, namesystem.startFile(src, perm,
+ "holder", "clientmachine", EnumSet.of(CreateFlag.CREATE),
+ true, (short) 1, 512));
+
+ // A non-retried call fails
+ newCall();
+ try {
+ namesystem.startFile(src, perm, "holder", "clientmachine",
+ EnumSet.of(CreateFlag.CREATE), true, (short) 1, 512);
+ Assert.fail("testCreate - expected exception is not thrown");
+ } catch (IOException e) {
+ // expected
+ }
+ }
+
+ /**
+ * Test for rename1
+ */
+ @Test
+ public void testAppend() throws Exception {
+ String src = "/testNamenodeRetryCache/testAppend/src";
+ resetCall();
+ // Create a file with partial block
+ DFSTestUtil.createFile(filesystem, new Path(src), 128, (short)1, 0L);
+
+ // Retried append requests succeed
+ newCall();
+ LocatedBlock b = namesystem.appendFile(src, "holder", "clientMachine");
+ Assert.assertEquals(b, namesystem.appendFile(src, "holder", "clientMachine"));
+ Assert.assertEquals(b, namesystem.appendFile(src, "holder", "clientMachine"));
+
+ // non-retried call fails
+ newCall();
+ try {
+ namesystem.appendFile(src, "holder", "clientMachine");
+ Assert.fail("testAppend - expected exception is not thrown");
+ } catch (Exception e) {
+ // Expected
+ }
+ }
+
+ /**
+ * Test for rename1
+ */
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testRename1() throws Exception {
+ String src = "/testNamenodeRetryCache/testRename1/src";
+ String target = "/testNamenodeRetryCache/testRename1/target";
+ resetCall();
+ namesystem.mkdirs(src, perm, true);
+
+ // Retried renames succeed
+ newCall();
+ Assert.assertTrue(namesystem.renameTo(src, target));
+ Assert.assertTrue(namesystem.renameTo(src, target));
+ Assert.assertTrue(namesystem.renameTo(src, target));
+
+ // A non-retried request fails
+ newCall();
+ Assert.assertFalse(namesystem.renameTo(src, target));
+ }
+
+ /**
+ * Test for rename2
+ */
+ @Test
+ public void testRename2() throws Exception {
+ String src = "/testNamenodeRetryCache/testRename2/src";
+ String target = "/testNamenodeRetryCache/testRename2/target";
+ resetCall();
+ namesystem.mkdirs(src, perm, true);
+
+ // Retried renames succeed
+ newCall();
+ namesystem.renameTo(src, target, Rename.NONE);
+ namesystem.renameTo(src, target, Rename.NONE);
+ namesystem.renameTo(src, target, Rename.NONE);
+
+ // A non-retried request fails
+ newCall();
+ try {
+ namesystem.renameTo(src, target, Rename.NONE);
+ Assert.fail("testRename 2 expected exception is not thrown");
+ } catch (IOException e) {
+ // expected
+ }
+ }
+
+ /**
+ * Test for crateSnapshot
+ */
+ @Test
+ public void testSnapshotMethods() throws Exception {
+ String dir = "/testNamenodeRetryCache/testCreateSnapshot/src";
+ resetCall();
+ namesystem.mkdirs(dir, perm, true);
+ namesystem.allowSnapshot(dir);
+
+ // Test retry of create snapshot
+ newCall();
+ String name = namesystem.createSnapshot(dir, "snap1");
+ Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1"));
+ Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1"));
+ Assert.assertEquals(name, namesystem.createSnapshot(dir, "snap1"));
+
+ // Non retried calls should fail
+ newCall();
+ try {
+ namesystem.createSnapshot(dir, "snap1");
+ Assert.fail("testSnapshotMethods expected exception is not thrown");
+ } catch (IOException e) {
+ // exptected
+ }
+
+ // Test retry of rename snapshot
+ newCall();
+ namesystem.renameSnapshot(dir, "snap1", "snap2");
+ namesystem.renameSnapshot(dir, "snap1", "snap2");
+ namesystem.renameSnapshot(dir, "snap1", "snap2");
+
+ // Non retried calls should fail
+ newCall();
+ try {
+ namesystem.renameSnapshot(dir, "snap1", "snap2");
+ Assert.fail("testSnapshotMethods expected exception is not thrown");
+ } catch (IOException e) {
+ // expected
+ }
+
+ // Test retry of delete snapshot
+ newCall();
+ namesystem.deleteSnapshot(dir, "snap2");
+ namesystem.deleteSnapshot(dir, "snap2");
+ namesystem.deleteSnapshot(dir, "snap2");
+
+ // Non retried calls should fail
+ newCall();
+ try {
+ namesystem.deleteSnapshot(dir, "snap2");
+ Assert.fail("testSnapshotMethods expected exception is not thrown");
+ } catch (IOException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testRetryCacheConfig() {
+ // By default retry configuration should be enabled
+ Configuration conf = new HdfsConfiguration();
+ Assert.assertNotNull(FSNamesystem.initRetryCache(conf));
+
+ // If retry cache is disabled, it should not be created
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY, false);
+ Assert.assertNull(FSNamesystem.initRetryCache(conf));
+ }
+}