You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2013/07/26 03:34:19 UTC
svn commit: r1507176 - in
/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/main/java/org/apache/hadoop/hdfs/server/namenod...
Author: suresh
Date: Fri Jul 26 01:34:18 2013
New Revision: 1507176
URL: http://svn.apache.org/r1507176
Log:
HDFS-4979. Merge 1507173 from branch-2
Added:
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
- copied unchanged from r1507173, hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1507176&r1=1507175&r2=1507176&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jul 26 01:34:18 2013
@@ -75,6 +75,8 @@ Release 2.1.0-beta - 2013-07-02
HDFS-4974. Add Idempotent and AtMostOnce annotations to namenode
protocol methods. (suresh)
+
+ HDFS-4979. Implement retry cache on Namenode. (suresh)
IMPROVEMENTS
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1507176&r1=1507175&r2=1507176&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jul 26 01:34:18 2013
@@ -482,4 +482,11 @@ public class DFSConfigKeys extends Commo
public static final int DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000;
public static final int DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000;
public static final int DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000;
+
+ public static final String DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY = "dfs.namenode.enable.retrycache";
+ public static final boolean DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT = true;
+ public static final String DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY = "dfs.namenode.retrycache.expirytime.millis";
+ public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes
+ public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent";
+ public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f;
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1507176&r1=1507175&r2=1507176&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Jul 26 01:34:18 2013
@@ -86,7 +86,7 @@ import com.google.common.collect.Sets;
public class BlockManager {
static final Log LOG = LogFactory.getLog(BlockManager.class);
- static final Log blockLog = NameNode.blockStateChangeLog;
+ public static final Log blockLog = NameNode.blockStateChangeLog;
/** Default load factor of map */
public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
@@ -2659,64 +2659,58 @@ assert storedBlock.findDatanode(dn) < 0
* The given node is reporting incremental information about some blocks.
* This includes blocks that are starting to be received, completed being
* received, or deleted.
+ *
+ * This method must be called with FSNamesystem lock held.
*/
- public void processIncrementalBlockReport(final DatanodeID nodeID,
- final String poolId,
- final ReceivedDeletedBlockInfo blockInfos[]
- ) throws IOException {
- namesystem.writeLock();
+ public void processIncrementalBlockReport(final DatanodeID nodeID,
+ final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
+ throws IOException {
+ assert namesystem.hasWriteLock();
int received = 0;
int deleted = 0;
int receiving = 0;
- try {
- final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
- if (node == null || !node.isAlive) {
- blockLog
- .warn("BLOCK* processIncrementalBlockReport"
- + " is received from dead or unregistered node "
- + nodeID);
- throw new IOException(
- "Got incremental block report from unregistered or dead node");
- }
+ final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+ if (node == null || !node.isAlive) {
+ blockLog
+ .warn("BLOCK* processIncrementalBlockReport"
+ + " is received from dead or unregistered node "
+ + nodeID);
+ throw new IOException(
+ "Got incremental block report from unregistered or dead node");
+ }
- for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
- switch (rdbi.getStatus()) {
- case DELETED_BLOCK:
- removeStoredBlock(rdbi.getBlock(), node);
- deleted++;
- break;
- case RECEIVED_BLOCK:
- addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
- received++;
- break;
- case RECEIVING_BLOCK:
- receiving++;
- processAndHandleReportedBlock(node, rdbi.getBlock(),
- ReplicaState.RBW, null);
- break;
- default:
- String msg =
- "Unknown block status code reported by " + nodeID +
- ": " + rdbi;
- blockLog.warn(msg);
- assert false : msg; // if assertions are enabled, throw.
- break;
- }
- if (blockLog.isDebugEnabled()) {
- blockLog.debug("BLOCK* block "
- + (rdbi.getStatus()) + ": " + rdbi.getBlock()
- + " is received from " + nodeID);
- }
+ for (ReceivedDeletedBlockInfo rdbi : blockInfos) {
+ switch (rdbi.getStatus()) {
+ case DELETED_BLOCK:
+ removeStoredBlock(rdbi.getBlock(), node);
+ deleted++;
+ break;
+ case RECEIVED_BLOCK:
+ addBlock(node, rdbi.getBlock(), rdbi.getDelHints());
+ received++;
+ break;
+ case RECEIVING_BLOCK:
+ receiving++;
+ processAndHandleReportedBlock(node, rdbi.getBlock(),
+ ReplicaState.RBW, null);
+ break;
+ default:
+ String msg =
+ "Unknown block status code reported by " + nodeID +
+ ": " + rdbi;
+ blockLog.warn(msg);
+ assert false : msg; // if assertions are enabled, throw.
+ break;
+ }
+ if (blockLog.isDebugEnabled()) {
+ blockLog.debug("BLOCK* block "
+ + (rdbi.getStatus()) + ": " + rdbi.getBlock()
+ + " is received from " + nodeID);
}
- } finally {
- namesystem.writeUnlock();
- blockLog
- .debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
- + nodeID
- + " receiving: " + receiving + ", "
- + " received: " + received + ", "
- + " deleted: " + deleted);
}
+ blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: " + "from "
+ + nodeID + " receiving: " + receiving + ", " + " received: " + received
+ + ", " + " deleted: " + deleted);
}
/**
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1507176&r1=1507175&r2=1507176&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Jul 26 01:34:18 2013
@@ -66,8 +66,6 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.util.IdGenerator;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1507176&r1=1507175&r2=1507176&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jul 26 01:34:18 2013
@@ -47,6 +47,8 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
@@ -55,6 +57,10 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
@@ -194,8 +200,12 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RetryCache;
+import org.apache.hadoop.ipc.RetryCache.CacheEntry;
+import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.annotation.Metric;
@@ -247,7 +257,8 @@ public class FSNamesystem implements Nam
}
};
- private boolean isAuditEnabled() {
+ @VisibleForTesting
+ public boolean isAuditEnabled() {
return !isDefaultAuditLogger || auditLog.isInfoEnabled();
}
@@ -420,6 +431,8 @@ public class FSNamesystem implements Nam
private INodeId inodeId;
+ private final RetryCache retryCache;
+
/**
* Set the last allocated inode id when fsimage or editlog is loaded.
*/
@@ -654,6 +667,7 @@ public class FSNamesystem implements Nam
this.auditLoggers = initAuditLoggers(conf);
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
auditLoggers.get(0) instanceof DefaultAuditLogger;
+ this.retryCache = initRetryCache(conf);
} catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
@@ -664,6 +678,28 @@ public class FSNamesystem implements Nam
throw re;
}
}
+
+ @VisibleForTesting
+ static RetryCache initRetryCache(Configuration conf) {
+ boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
+ DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
+ LOG.info("Retry cache on namenode is " + (enable ? "enabled" : "disabled"));
+ if (enable) {
+ float heapPercent = conf.getFloat(
+ DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY,
+ DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT);
+ long entryExpiryMillis = conf.getLong(
+ DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_KEY,
+ DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT);
+ LOG.info("Retry cache will use " + heapPercent
+ + " of total heap and retry cache entry expiry time is "
+ + entryExpiryMillis + " millis");
+ long entryExpiryNanos = entryExpiryMillis * 1000 * 1000;
+ return new RetryCache("Namenode Retry Cache", heapPercent,
+ entryExpiryNanos);
+ }
+ return null;
+ }
private List<AuditLogger> initAuditLoggers(Configuration conf) {
// Initialize the custom access loggers if configured.
@@ -724,7 +760,6 @@ public class FSNamesystem implements Nam
if (!haEnabled) {
fsImage.openEditLogForWrite();
}
-
success = true;
} finally {
if (!success) {
@@ -801,6 +836,7 @@ public class FSNamesystem implements Nam
} finally {
writeUnlock();
}
+ RetryCache.clear(retryCache);
}
/**
@@ -1469,15 +1505,26 @@ public class FSNamesystem implements Nam
*/
void concat(String target, String [] srcs)
throws IOException, UnresolvedLinkException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+
+ // Either there is no previous request in progres or it has failed
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
" to " + target);
}
+
+ boolean success = false;
try {
concatInt(target, srcs);
+ success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
throw e;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
}
}
@@ -1686,17 +1733,25 @@ public class FSNamesystem implements Nam
void createSymlink(String target, String link,
PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
if (!DFSUtil.isValidName(link)) {
throw new InvalidPathException("Invalid link name: " + link);
}
if (FSDirectory.isReservedName(target)) {
throw new InvalidPathException("Invalid target name: " + target);
}
+ boolean success = false;
try {
createSymlinkInt(target, link, dirPerms, createParent);
+ success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "createSymlink", link, target, null);
throw e;
+ } finally {
+ RetryCache.setState(cacheEntry, success);
}
}
@@ -1835,13 +1890,17 @@ public class FSNamesystem implements Nam
}
}
}
-
+
/**
* Create a new file entry in the namespace.
*
- * For description of parameters and exceptions thrown see
- * {@link ClientProtocol#create()}, except it returns valid file status
- * upon success
+ * For description of parameters and exceptions thrown see
+ * {@link ClientProtocol#create()}, except it returns valid file status upon
+ * success
+ *
+ * For retryCache handling details see -
+ * {@link #getFileStatus(boolean, CacheEntryWithPayload)}
+ *
*/
HdfsFileStatus startFile(String src, PermissionStatus permissions,
String holder, String clientMachine, EnumSet<CreateFlag> flag,
@@ -1849,13 +1908,23 @@ public class FSNamesystem implements Nam
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
+ HdfsFileStatus status = null;
+ CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+ null);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return (HdfsFileStatus) cacheEntry.getPayload();
+ }
+
try {
- return startFileInt(src, permissions, holder, clientMachine, flag,
+ status = startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize);
} catch (AccessControlException e) {
logAuditEvent(false, "create", src);
throw e;
+ } finally {
+ RetryCache.setState(cacheEntry, status != null, status);
}
+ return status;
}
private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
@@ -1878,7 +1947,7 @@ public class FSNamesystem implements Nam
blockManager.verifyReplication(src, replication, clientMachine);
boolean skipSync = false;
- final HdfsFileStatus stat;
+ HdfsFileStatus stat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
if (blockSize < minBlockSize) {
@@ -1958,7 +2027,12 @@ public class FSNamesystem implements Nam
}
} else {
if (overwrite) {
- delete(src, true); // File exists - delete if overwrite
+ try {
+ deleteInt(src, true); // File exists - delete if overwrite
+ } catch (AccessControlException e) {
+ logAuditEvent(false, "delete", src);
+ throw e;
+ }
} else {
// If lease soft limit time is expired, recover the lease
recoverLeaseInternal(myFile, src, holder, clientMachine, false);
@@ -2207,16 +2281,28 @@ public class FSNamesystem implements Nam
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, IOException {
+ LocatedBlock lb = null;
+ CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+ null);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return (LocatedBlock) cacheEntry.getPayload();
+ }
+
+ boolean success = false;
try {
- return appendFileInt(src, holder, clientMachine);
+ lb = appendFileInt(src, holder, clientMachine);
+ success = true;
+ return lb;
} catch (AccessControlException e) {
logAuditEvent(false, "append", src);
throw e;
+ } finally {
+ RetryCache.setState(cacheEntry, success, lb);
}
}
- private LocatedBlock appendFileInt(String src, String holder, String clientMachine)
- throws AccessControlException, SafeModeException,
+ private LocatedBlock appendFileInt(String src, String holder,
+ String clientMachine) throws AccessControlException, SafeModeException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2779,12 +2865,20 @@ public class FSNamesystem implements Nam
@Deprecated
boolean renameTo(String src, String dst)
throws IOException, UnresolvedLinkException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return true; // Return previous response
+ }
+ boolean ret = false;
try {
- return renameToInt(src, dst);
+ ret = renameToInt(src, dst);
} catch (AccessControlException e) {
logAuditEvent(false, "rename", src, dst, null);
throw e;
+ } finally {
+ RetryCache.setState(cacheEntry, ret);
}
+ return ret;
}
private boolean renameToInt(String src, String dst)
@@ -2851,6 +2945,10 @@ public class FSNamesystem implements Nam
/** Rename src to dst */
void renameTo(String src, String dst, Options.Rename... options)
throws IOException, UnresolvedLinkException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+ src + " to " + dst);
@@ -2863,6 +2961,7 @@ public class FSNamesystem implements Nam
byte[][] srcComponents = FSDirectory.getPathComponentsForReservedPath(src);
byte[][] dstComponents = FSDirectory.getPathComponentsForReservedPath(dst);
HdfsFileStatus resultingStat = null;
+ boolean success = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@@ -2873,8 +2972,10 @@ public class FSNamesystem implements Nam
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
renameToInternal(pc, src, dst, options);
resultingStat = getAuditFileInfo(dst, false);
+ success = true;
} finally {
writeUnlock();
+ RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
if (resultingStat != null) {
@@ -2906,12 +3007,20 @@ public class FSNamesystem implements Nam
boolean delete(String src, boolean recursive)
throws AccessControlException, SafeModeException,
UnresolvedLinkException, IOException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return true; // Return previous response
+ }
+ boolean ret = false;
try {
- return deleteInt(src, recursive);
+ ret = deleteInt(src, recursive);
} catch (AccessControlException e) {
logAuditEvent(false, "delete", src);
throw e;
+ } finally {
+ RetryCache.setState(cacheEntry, ret);
}
+ return ret;
}
private boolean deleteInt(String src, boolean recursive)
@@ -2935,6 +3044,7 @@ public class FSNamesystem implements Nam
throw new AccessControlException(ioe);
}
}
+
/**
* Remove a file/directory from the namespace.
* <p>
@@ -2955,6 +3065,7 @@ public class FSNamesystem implements Nam
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ boolean ret = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@@ -2973,6 +3084,7 @@ public class FSNamesystem implements Nam
if (!dir.delete(src, collectedBlocks, removedINodes)) {
return false;
}
+ ret = true;
} finally {
writeUnlock();
}
@@ -2990,7 +3102,7 @@ public class FSNamesystem implements Nam
NameNode.stateChangeLog.debug("DIR* Namesystem.delete: "
+ src +" is removed");
}
- return true;
+ return ret;
}
/**
@@ -3156,12 +3268,14 @@ public class FSNamesystem implements Nam
*/
boolean mkdirs(String src, PermissionStatus permissions,
boolean createParent) throws IOException, UnresolvedLinkException {
+ boolean ret = false;
try {
- return mkdirsInt(src, permissions, createParent);
+ ret = mkdirsInt(src, permissions, createParent);
} catch (AccessControlException e) {
logAuditEvent(false, "mkdirs", src);
throw e;
}
+ return ret;
}
private boolean mkdirsInt(String src, PermissionStatus permissions,
@@ -3221,7 +3335,7 @@ public class FSNamesystem implements Nam
}
// validate that we have enough inodes. This is, at best, a
- // heuristic because the mkdirs() operation migth need to
+ // heuristic because the mkdirs() operation might need to
// create multiple inodes.
checkFsObjectLimit();
@@ -4021,8 +4135,13 @@ public class FSNamesystem implements Nam
* @throws IOException if
*/
void saveNamespace() throws AccessControlException, IOException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
checkSuperuserPrivilege();
checkOperation(OperationCategory.UNCHECKED);
+ boolean success = false;
readLock();
try {
checkOperation(OperationCategory.UNCHECKED);
@@ -4031,10 +4150,12 @@ public class FSNamesystem implements Nam
"in order to create namespace image.");
}
getFSImage().saveNamespace(this);
- LOG.info("New namespace image has been created");
+ success = true;
} finally {
readUnlock();
+ RetryCache.setState(cacheEntry, success);
}
+ LOG.info("New namespace image has been created");
}
/**
@@ -4592,6 +4713,7 @@ public class FSNamesystem implements Nam
try {
Thread.sleep(recheckInterval);
} catch (InterruptedException ie) {
+ // Ignored
}
}
if (!fsRunning) {
@@ -4850,30 +4972,51 @@ public class FSNamesystem implements Nam
}
}
- NamenodeCommand startCheckpoint(
- NamenodeRegistration bnReg, // backup node
- NamenodeRegistration nnReg) // active name-node
- throws IOException {
+ NamenodeCommand startCheckpoint(NamenodeRegistration backupNode,
+ NamenodeRegistration activeNamenode) throws IOException {
checkOperation(OperationCategory.CHECKPOINT);
+ CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+ null);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return (NamenodeCommand) cacheEntry.getPayload();
+ }
writeLock();
+ NamenodeCommand cmd = null;
try {
checkOperation(OperationCategory.CHECKPOINT);
if (isInSafeMode()) {
throw new SafeModeException("Checkpoint not started", safeMode);
}
- LOG.info("Start checkpoint for " + bnReg.getAddress());
- NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
+ LOG.info("Start checkpoint for " + backupNode.getAddress());
+ cmd = getFSImage().startCheckpoint(backupNode, activeNamenode);
getEditLog().logSync();
return cmd;
} finally {
writeUnlock();
+ RetryCache.setState(cacheEntry, cmd != null, cmd);
}
}
+ public void processIncrementalBlockReport(final DatanodeID nodeID,
+ final String poolId, final ReceivedDeletedBlockInfo blockInfos[])
+ throws IOException {
+ writeLock();
+ try {
+ blockManager.processIncrementalBlockReport(nodeID, poolId, blockInfos);
+ } finally {
+ writeUnlock();
+ }
+ }
+
void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
checkOperation(OperationCategory.CHECKPOINT);
+ boolean success = false;
readLock();
try {
checkOperation(OperationCategory.CHECKPOINT);
@@ -4883,8 +5026,10 @@ public class FSNamesystem implements Nam
}
LOG.info("End checkpoint for " + registration.getAddress());
getFSImage().endCheckpoint(sig);
+ success = true;
} finally {
readUnlock();
+ RetryCache.setState(cacheEntry, success);
}
}
@@ -5380,6 +5525,10 @@ public class FSNamesystem implements Nam
void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes)
throws IOException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
checkOperation(OperationCategory.WRITE);
LOG.info("updatePipeline(block=" + oldBlock
+ ", newGenerationStamp=" + newBlock.getGenerationStamp()
@@ -5388,17 +5537,19 @@ public class FSNamesystem implements Nam
+ ", clientName=" + clientName
+ ")");
writeLock();
+ boolean success = false;
try {
checkOperation(OperationCategory.WRITE);
-
if (isInSafeMode()) {
throw new SafeModeException("Pipeline not updated", safeMode);
}
assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+ oldBlock + " has different block identifier";
updatePipelineInternal(clientName, oldBlock, newBlock, newNodes);
+ success = true;
} finally {
writeUnlock();
+ RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
@@ -6178,9 +6329,14 @@ public class FSNamesystem implements Nam
*/
String createSnapshot(String snapshotRoot, String snapshotName)
throws SafeModeException, IOException {
+ CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+ null);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return (String) cacheEntry.getPayload();
+ }
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
- final String snapshotPath;
+ String snapshotPath = null;
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
@@ -6204,6 +6360,7 @@ public class FSNamesystem implements Nam
getEditLog().logCreateSnapshot(snapshotRoot, snapshotName);
} finally {
writeUnlock();
+ RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath);
}
getEditLog().logSync();
@@ -6223,8 +6380,13 @@ public class FSNamesystem implements Nam
*/
void renameSnapshot(String path, String snapshotOldName,
String snapshotNewName) throws SafeModeException, IOException {
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
+ boolean success = false;
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
@@ -6238,8 +6400,10 @@ public class FSNamesystem implements Nam
snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName);
+ success = true;
} finally {
writeUnlock();
+ RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
@@ -6332,6 +6496,11 @@ public class FSNamesystem implements Nam
void deleteSnapshot(String snapshotRoot, String snapshotName)
throws SafeModeException, IOException {
final FSPermissionChecker pc = getPermissionChecker();
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ boolean success = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@@ -6355,8 +6524,10 @@ public class FSNamesystem implements Nam
this.removeBlocks(collectedBlocks);
collectedBlocks.clear();
getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName);
+ success = true;
} finally {
writeUnlock();
+ RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1507176&r1=1507175&r2=1507176&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Fri Jul 26 01:34:18 2013
@@ -951,7 +951,7 @@ class NameNodeRpcServer implements Namen
+"from "+nodeReg+" "+receivedAndDeletedBlocks.length
+" blocks.");
}
- namesystem.getBlockManager().processIncrementalBlockReport(
+ namesystem.processIncrementalBlockReport(
nodeReg, poolId, receivedAndDeletedBlocks[0].getBlocks());
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1507176&r1=1507175&r2=1507176&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Jul 26 01:34:18 2013
@@ -1307,4 +1307,43 @@
</description>
</property>
+<property>
+ <name>dfs.namenode.enable.retrycache</name>
+ <value>true</value>
+ <description>
+ This enables the retry cache on the namenode. Namenode tracks for
+ non-idempotent requests the corresponding response. If a client retries the
+ request, the response from the retry cache is sent. Such operations
+ are tagged with annotation @AtMostOnce in namenode protocols. It is
+ recommended that this flag be set to true. Setting it to false, will result
+ in clients getting failure responses to retried request. This flag must
+ be enabled in HA setup for transparent fail-overs.
+
+ The entries in the cache have expiration time configurable
+ using dfs.namenode.retrycache.expirytime.millis.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.retrycache.expirytime.millis</name>
+ <value>600000</value>
+ <description>
+ The time for which retry cache entries are retained.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.retrycache.heap.percent</name>
+ <value>0.03f</value>
+ <description>
+ This parameter configures the heap size allocated for retry cache
+ (excluding the response cached). This corresponds to approximately
+ 4096 entries for every 64MB of namenode process java heap size.
+ Assuming retry cache entry expiration time (configured using
+ dfs.namenode.retrycache.expirytime.millis) of 10 minutes, this
+ enables retry cache to support 7 operations per second sustained
+ for 10 minutes. As the heap size is increased, the operation rate
+ linearly increases.
+ </description>
+</property>
</configuration>