You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2013/07/30 10:01:01 UTC
svn commit: r1508336 [1/2] - in
/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/
src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/m...
Author: suresh
Date: Tue Jul 30 08:01:00 2013
New Revision: 1508336
URL: http://svn.apache.org/r1508336
Log:
HDFS-5025. Merge 1508335 from branch-2
Added:
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
- copied unchanged from r1508335, hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Jul 30 08:01:00 2013
@@ -77,6 +77,9 @@ Release 2.1.0-beta - 2013-07-02
protocol methods. (suresh)
HDFS-4979. Implement retry cache on Namenode. (suresh)
+
+ HDFS-5025. Record ClientId and CallId in EditLog to enable rebuilding
+ retry cache in case of HA failover. (Jing Zhao via suresh)
IMPROVEMENTS
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Jul 30 08:01:00 2013
@@ -449,7 +449,8 @@ public class DFSClient implements java.i
* Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
* Exactly one of nameNodeUri or rpcNamenode must be null.
*/
- DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
+ @VisibleForTesting
+ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
// Copy only the required DFSClient configuration
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Tue Jul 30 08:01:00 2013
@@ -68,6 +68,7 @@ import org.apache.hadoop.tools.GetUserMa
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@@ -307,7 +308,8 @@ public class NameNodeProxies {
}
/** Gets the configured Failover proxy provider's class */
- private static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
+ @VisibleForTesting
+ public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass(
Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
if (nameNodeUri == null) {
return null;
@@ -344,7 +346,8 @@ public class NameNodeProxies {
}
/** Creates the Failover proxy provider instance*/
- private static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
+ @VisibleForTesting
+ public static <T> FailoverProxyProvider<T> createFailoverProxyProvider(
Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass,
Class<T> xface, URI nameNodeUri) throws IOException {
Preconditions.checkArgument(
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java Tue Jul 30 08:01:00 2013
@@ -104,7 +104,9 @@ public class LayoutVersion {
OPTIMIZE_SNAPSHOT_INODES(-45, -43,
"Reduce snapshot inode memory footprint", false),
SEQUENTIAL_BLOCK_ID(-46, "Allocate block IDs sequentially and store " +
- "block IDs in the edits log and image files");
+ "block IDs in the edits log and image files"),
+ EDITLOG_SUPPORT_RETRYCACHE(-47, "Record ClientId and CallId in editlog to "
+ + "enable rebuilding retry cache in case of HA failover");
final int lv;
final int ancestorLV;
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Jul 30 08:01:00 2013
@@ -381,12 +381,13 @@ public class FSDirectory implements Clos
/**
* Persist the block list for the inode.
*/
- void persistBlocks(String path, INodeFileUnderConstruction file) {
+ void persistBlocks(String path, INodeFileUnderConstruction file,
+ boolean logRetryCache) {
waitForReady();
writeLock();
try {
- fsImage.getEditLog().logUpdateBlocks(path, file);
+ fsImage.getEditLog().logUpdateBlocks(path, file, logRetryCache);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
+path+" with "+ file.getBlocks().length
@@ -459,7 +460,7 @@ public class FSDirectory implements Clos
* @deprecated Use {@link #renameTo(String, String, Rename...)} instead.
*/
@Deprecated
- boolean renameTo(String src, String dst)
+ boolean renameTo(String src, String dst, boolean logRetryCache)
throws QuotaExceededException, UnresolvedLinkException,
FileAlreadyExistsException, SnapshotAccessControlException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -475,14 +476,15 @@ public class FSDirectory implements Clos
} finally {
writeUnlock();
}
- fsImage.getEditLog().logRename(src, dst, now);
+ fsImage.getEditLog().logRename(src, dst, now, logRetryCache);
return true;
}
/**
* @see #unprotectedRenameTo(String, String, long, Options.Rename...)
*/
- void renameTo(String src, String dst, Options.Rename... options)
+ void renameTo(String src, String dst, boolean logRetryCache,
+ Options.Rename... options)
throws FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, QuotaExceededException,
UnresolvedLinkException, IOException {
@@ -500,7 +502,7 @@ public class FSDirectory implements Clos
} finally {
writeUnlock();
}
- fsImage.getEditLog().logRename(src, dst, now, options);
+ fsImage.getEditLog().logRename(src, dst, now, logRetryCache, options);
}
/**
@@ -1176,7 +1178,7 @@ public class FSDirectory implements Clos
/**
* Concat all the blocks from srcs to trg and delete the srcs files
*/
- void concat(String target, String [] srcs)
+ void concat(String target, String [] srcs, boolean supportRetryCache)
throws UnresolvedLinkException, QuotaExceededException,
SnapshotAccessControlException, SnapshotException {
writeLock();
@@ -1186,7 +1188,8 @@ public class FSDirectory implements Clos
long timestamp = now();
unprotectedConcat(target, srcs, timestamp);
// do the commit
- fsImage.getEditLog().logConcat(target, srcs, timestamp);
+ fsImage.getEditLog().logConcat(target, srcs, timestamp,
+ supportRetryCache);
} finally {
writeUnlock();
}
@@ -1261,10 +1264,12 @@ public class FSDirectory implements Clos
* @param src Path of a directory to delete
* @param collectedBlocks Blocks under the deleted directory
* @param removedINodes INodes that should be removed from {@link #inodeMap}
+ * @param logRetryCache Whether to record RPC IDs in editlog to support retry
+ * cache rebuilding.
* @return true on successful deletion; else false
*/
boolean delete(String src, BlocksMapUpdateInfo collectedBlocks,
- List<INode> removedINodes) throws IOException {
+ List<INode> removedINodes, boolean logRetryCache) throws IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
}
@@ -1299,7 +1304,7 @@ public class FSDirectory implements Clos
if (filesRemoved < 0) {
return false;
}
- fsImage.getEditLog().logDelete(src, now);
+ fsImage.getEditLog().logDelete(src, now, logRetryCache);
incrDeletedFileCount(filesRemoved);
// Blocks/INodes will be handled later by the caller of this method
getFSNamesystem().removePathAndBlocks(src, null, null);
@@ -2523,7 +2528,7 @@ public class FSDirectory implements Clos
/**
* Create FileStatus by file INode
*/
- private HdfsFileStatus createFileStatus(byte[] path, INode node,
+ HdfsFileStatus createFileStatus(byte[] path, INode node,
Snapshot snapshot) {
long size = 0; // length is zero for directories
short replication = 0;
@@ -2596,7 +2601,7 @@ public class FSDirectory implements Clos
* Add the given symbolic link to the fs. Record it in the edits log.
*/
INodeSymlink addSymlink(String path, String target,
- PermissionStatus dirPerms, boolean createParent)
+ PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
throws UnresolvedLinkException, FileAlreadyExistsException,
QuotaExceededException, SnapshotAccessControlException {
waitForReady();
@@ -2622,7 +2627,8 @@ public class FSDirectory implements Clos
NameNode.stateChangeLog.info("DIR* addSymlink: failed to add " + path);
return null;
}
- fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode);
+ fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode,
+ logRetryCache);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* addSymlink: " + path + " is added");
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Jul 30 08:01:00 2013
@@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.conf.Configuration;
@@ -662,11 +663,20 @@ public class FSEditLog implements LogsPu
LOG.info(buf);
}
+ /** Record the RPC IDs if necessary */
+ private void logRpcIds(FSEditLogOp op, boolean toLogRpcIds) {
+ if (toLogRpcIds) {
+ op.setRpcClientId(Server.getClientId());
+ op.setRpcCallId(Server.getCallId());
+ }
+ }
+
/**
* Add open lease record to edit log.
* Records the block locations of the last block.
*/
- public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
+ public void logOpenFile(String path, INodeFileUnderConstruction newNode,
+ boolean toLogRpcIds) {
AddOp op = AddOp.getInstance(cache.get())
.setInodeId(newNode.getId())
.setPath(path)
@@ -678,8 +688,8 @@ public class FSEditLog implements LogsPu
.setPermissionStatus(newNode.getPermissionStatus())
.setClientName(newNode.getClientName())
.setClientMachine(newNode.getClientMachine());
-
- logEdit(op);
+ logRpcIds(op, toLogRpcIds);
+ logEdit(op);
}
/**
@@ -698,10 +708,12 @@ public class FSEditLog implements LogsPu
logEdit(op);
}
- public void logUpdateBlocks(String path, INodeFileUnderConstruction file) {
+ public void logUpdateBlocks(String path, INodeFileUnderConstruction file,
+ boolean toLogRpcIds) {
UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get())
.setPath(path)
.setBlocks(file.getBlocks());
+ logRpcIds(op, toLogRpcIds);
logEdit(op);
}
@@ -721,23 +733,26 @@ public class FSEditLog implements LogsPu
* Add rename record to edit log
* TODO: use String parameters until just before writing to disk
*/
- void logRename(String src, String dst, long timestamp) {
+ void logRename(String src, String dst, long timestamp, boolean toLogRpcIds) {
RenameOldOp op = RenameOldOp.getInstance(cache.get())
.setSource(src)
.setDestination(dst)
.setTimestamp(timestamp);
+ logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Add rename record to edit log
*/
- void logRename(String src, String dst, long timestamp, Options.Rename... options) {
+ void logRename(String src, String dst, long timestamp, boolean toLogRpcIds,
+ Options.Rename... options) {
RenameOp op = RenameOp.getInstance(cache.get())
.setSource(src)
.setDestination(dst)
.setTimestamp(timestamp)
.setOptions(options);
+ logRpcIds(op, toLogRpcIds);
logEdit(op);
}
@@ -784,21 +799,23 @@ public class FSEditLog implements LogsPu
/**
* concat(trg,src..) log
*/
- void logConcat(String trg, String [] srcs, long timestamp) {
+ void logConcat(String trg, String[] srcs, long timestamp, boolean toLogRpcIds) {
ConcatDeleteOp op = ConcatDeleteOp.getInstance(cache.get())
.setTarget(trg)
.setSources(srcs)
.setTimestamp(timestamp);
+ logRpcIds(op, toLogRpcIds);
logEdit(op);
}
/**
* Add delete file record to edit log
*/
- void logDelete(String src, long timestamp) {
+ void logDelete(String src, long timestamp, boolean toLogRpcIds) {
DeleteOp op = DeleteOp.getInstance(cache.get())
.setPath(src)
.setTimestamp(timestamp);
+ logRpcIds(op, toLogRpcIds);
logEdit(op);
}
@@ -843,8 +860,8 @@ public class FSEditLog implements LogsPu
/**
* Add a create symlink record.
*/
- void logSymlink(String path, String value, long mtime,
- long atime, INodeSymlink node) {
+ void logSymlink(String path, String value, long mtime, long atime,
+ INodeSymlink node, boolean toLogRpcIds) {
SymlinkOp op = SymlinkOp.getInstance(cache.get())
.setId(node.getId())
.setPath(path)
@@ -852,6 +869,7 @@ public class FSEditLog implements LogsPu
.setModificationTime(mtime)
.setAccessTime(atime)
.setPermissionStatus(node.getPermissionStatus());
+ logRpcIds(op, toLogRpcIds);
logEdit(op);
}
@@ -896,22 +914,26 @@ public class FSEditLog implements LogsPu
logEdit(op);
}
- void logCreateSnapshot(String snapRoot, String snapName) {
+ void logCreateSnapshot(String snapRoot, String snapName, boolean toLogRpcIds) {
CreateSnapshotOp op = CreateSnapshotOp.getInstance(cache.get())
.setSnapshotRoot(snapRoot).setSnapshotName(snapName);
+ logRpcIds(op, toLogRpcIds);
logEdit(op);
}
- void logDeleteSnapshot(String snapRoot, String snapName) {
+ void logDeleteSnapshot(String snapRoot, String snapName, boolean toLogRpcIds) {
DeleteSnapshotOp op = DeleteSnapshotOp.getInstance(cache.get())
.setSnapshotRoot(snapRoot).setSnapshotName(snapName);
+ logRpcIds(op, toLogRpcIds);
logEdit(op);
}
- void logRenameSnapshot(String path, String snapOldName, String snapNewName) {
+ void logRenameSnapshot(String path, String snapOldName, String snapNewName,
+ boolean toLogRpcIds) {
RenameSnapshotOp op = RenameSnapshotOp.getInstance(cache.get())
.setSnapshotRoot(path).setSnapshotOldName(snapOldName)
.setSnapshotNewName(snapNewName);
+ logRpcIds(op, toLogRpcIds);
logEdit(op);
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Tue Jul 30 08:01:00 2013
@@ -34,8 +34,10 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.common.Storage;
@@ -277,7 +279,8 @@ public class FSEditLogLoader {
if (LOG.isTraceEnabled()) {
LOG.trace("replaying edit log: " + op);
}
-
+ final boolean toAddRetryCache = fsNamesys.hasRetryCache() && op.hasRpcIds();
+
switch (op.opCode) {
case OP_ADD: {
AddCloseOp addCloseOp = (AddCloseOp)op;
@@ -300,8 +303,8 @@ public class FSEditLogLoader {
if (oldFile == null) { // this is OP_ADD on a new file (case 1)
// versions > 0 support per file replication
// get name and replication
- final short replication = fsNamesys.getBlockManager(
- ).adjustReplication(addCloseOp.replication);
+ final short replication = fsNamesys.getBlockManager()
+ .adjustReplication(addCloseOp.replication);
assert addCloseOp.blocks.length == 0;
// add to the file tree
@@ -313,6 +316,13 @@ public class FSEditLogLoader {
addCloseOp.clientName, addCloseOp.clientMachine);
fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path);
+ // add the op into retry cache if necessary
+ if (toAddRetryCache) {
+ HdfsFileStatus stat = fsNamesys.dir.createFileStatus(
+ HdfsFileStatus.EMPTY_NAME, newFile, null);
+ fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
+ addCloseOp.rpcCallId, stat);
+ }
} else { // This is OP_ADD on an existing file
if (!oldFile.isUnderConstruction()) {
// This is case 3: a call to append() on an already-closed file.
@@ -320,11 +330,17 @@ public class FSEditLogLoader {
FSNamesystem.LOG.debug("Reopening an already-closed file " +
"for append");
}
- fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
- addCloseOp.clientName, addCloseOp.clientMachine, null,
- false, iip.getLatestSnapshot());
+ LocatedBlock lb = fsNamesys.prepareFileForWrite(addCloseOp.path,
+ oldFile, addCloseOp.clientName, addCloseOp.clientMachine, null,
+ false, iip.getLatestSnapshot(), false);
newFile = INodeFile.valueOf(fsDir.getINode(addCloseOp.path),
addCloseOp.path, true);
+
+ // add the op into retry cache is necessary
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
+ addCloseOp.rpcCallId, lb);
+ }
}
}
// Fall-through for case 2.
@@ -384,6 +400,10 @@ public class FSEditLogLoader {
updateOp.path);
// Update in-memory data structures
updateBlocks(fsDir, updateOp, oldFile);
+
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
+ }
break;
}
@@ -399,17 +419,30 @@ public class FSEditLogLoader {
ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs,
concatDeleteOp.timestamp);
+
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId,
+ concatDeleteOp.rpcCallId);
+ }
break;
}
case OP_RENAME_OLD: {
RenameOldOp renameOp = (RenameOldOp)op;
fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
renameOp.timestamp);
+
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
+ }
break;
}
case OP_DELETE: {
DeleteOp deleteOp = (DeleteOp)op;
fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp);
+
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(deleteOp.rpcClientId, deleteOp.rpcCallId);
+ }
break;
}
case OP_MKDIR: {
@@ -474,12 +507,20 @@ public class FSEditLogLoader {
fsDir.unprotectedAddSymlink(inodeId, symlinkOp.path,
symlinkOp.value, symlinkOp.mtime,
symlinkOp.atime, symlinkOp.permissionStatus);
+
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(symlinkOp.rpcClientId, symlinkOp.rpcCallId);
+ }
break;
}
case OP_RENAME: {
RenameOp renameOp = (RenameOp)op;
fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
renameOp.timestamp, renameOp.options);
+
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
+ }
break;
}
case OP_GET_DELEGATION_TOKEN: {
@@ -532,8 +573,12 @@ public class FSEditLogLoader {
}
case OP_CREATE_SNAPSHOT: {
CreateSnapshotOp createSnapshotOp = (CreateSnapshotOp) op;
- fsNamesys.getSnapshotManager().createSnapshot(
+ String path = fsNamesys.getSnapshotManager().createSnapshot(
createSnapshotOp.snapshotRoot, createSnapshotOp.snapshotName);
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntryWithPayload(createSnapshotOp.rpcClientId,
+ createSnapshotOp.rpcCallId, path);
+ }
break;
}
case OP_DELETE_SNAPSHOT: {
@@ -547,6 +592,11 @@ public class FSEditLogLoader {
collectedBlocks.clear();
fsNamesys.dir.removeFromInodeMap(removedINodes);
removedINodes.clear();
+
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(deleteSnapshotOp.rpcClientId,
+ deleteSnapshotOp.rpcCallId);
+ }
break;
}
case OP_RENAME_SNAPSHOT: {
@@ -554,6 +604,11 @@ public class FSEditLogLoader {
fsNamesys.getSnapshotManager().renameSnapshot(
renameSnapshotOp.snapshotRoot, renameSnapshotOp.snapshotOldName,
renameSnapshotOp.snapshotNewName);
+
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(renameSnapshotOp.rpcClientId,
+ renameSnapshotOp.rpcCallId);
+ }
break;
}
case OP_ALLOW_SNAPSHOT: {
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Tue Jul 30 08:01:00 2013
@@ -17,55 +17,88 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import java.util.zip.CheckedInputStream;
-import java.util.zip.Checksum;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLEAR_NS_QUOTA;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CLOSE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CONCAT_DELETE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CREATE_SNAPSHOT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DELETE_SNAPSHOT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_DISALLOW_SNAPSHOT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_END_LOG_SEGMENT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V1;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_GENSTAMP_V2;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_NS_QUOTA;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_PERMISSIONS;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_REPLICATION;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_START_LOG_SEGMENT;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SYMLINK;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_BLOCKS;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_UPDATE_MASTER_KEY;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.List;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
-import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.util.PureJavaCrc32;
-
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.util.XMLUtils;
+import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
+import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
+import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.hdfs.util.XMLUtils;
-import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
-import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DeprecatedUTF8;
+import org.apache.hadoop.ipc.ClientId;
+import org.apache.hadoop.ipc.RpcConstants;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.PureJavaCrc32;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.AttributesImpl;
import com.google.common.base.Preconditions;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.EOFException;
-
/**
* Helper classes for reading the ops from an InputStream.
* All ops derive from FSEditLogOp and are only
@@ -76,6 +109,8 @@ import java.io.EOFException;
public abstract class FSEditLogOp {
public final FSEditLogOpCodes opCode;
long txid;
+ byte[] rpcClientId = RpcConstants.DUMMY_CLIENT_ID;
+ int rpcCallId = RpcConstants.INVALID_CALL_ID;
@SuppressWarnings("deprecation")
final public static class OpInstanceCache {
@@ -150,6 +185,31 @@ public abstract class FSEditLogOp {
public void setTransactionId(long txid) {
this.txid = txid;
}
+
+ public boolean hasRpcIds() {
+ return rpcClientId != RpcConstants.DUMMY_CLIENT_ID
+ && rpcCallId != RpcConstants.INVALID_CALL_ID;
+ }
+
+ /** this has to be called after calling {@link #hasRpcIds()} */
+ public byte[] getClientId() {
+ Preconditions.checkState(rpcClientId != RpcConstants.DUMMY_CLIENT_ID);
+ return rpcClientId;
+ }
+
+ public void setRpcClientId(byte[] clientId) {
+ this.rpcClientId = clientId;
+ }
+
+ /** this has to be called after calling {@link #hasRpcIds()} */
+ public int getCallId() {
+ Preconditions.checkState(rpcCallId != RpcConstants.INVALID_CALL_ID);
+ return rpcCallId;
+ }
+
+ public void setRpcCallId(int callId) {
+ this.rpcCallId = callId;
+ }
abstract void readFields(DataInputStream in, int logVersion)
throws IOException;
@@ -163,6 +223,46 @@ public abstract class FSEditLogOp {
boolean shouldCompleteLastBlock();
}
+ private static void writeRpcIds(final byte[] clientId, final int callId,
+ DataOutputStream out) throws IOException {
+ FSImageSerialization.writeBytes(clientId, out);
+ FSImageSerialization.writeInt(callId, out);
+ }
+
+ void readRpcIds(DataInputStream in, int logVersion)
+ throws IOException {
+ if (LayoutVersion.supports(Feature.EDITLOG_SUPPORT_RETRYCACHE,
+ logVersion)) {
+ this.rpcClientId = FSImageSerialization.readBytes(in);
+ this.rpcCallId = FSImageSerialization.readInt(in);
+ }
+ }
+
+ void readRpcIdsFromXml(Stanza st) {
+ this.rpcClientId = st.hasChildren("RPC_CLIENTID") ?
+ ClientId.toBytes(st.getValue("RPC_CLIENTID"))
+ : RpcConstants.DUMMY_CLIENT_ID;
+ this.rpcCallId = st.hasChildren("RPC_CALLID") ?
+ Integer.valueOf(st.getValue("RPC_CALLID"))
+ : RpcConstants.INVALID_CALL_ID;
+ }
+
+ private static void appendRpcIdsToString(final StringBuilder builder,
+ final byte[] clientId, final int callId) {
+ builder.append(", RpcClientId=");
+ builder.append(ClientId.toString(clientId));
+ builder.append(", RpcCallId=");
+ builder.append(callId);
+ }
+
+ private static void appendRpcIdsToXml(ContentHandler contentHandler,
+ final byte[] clientId, final int callId) throws SAXException {
+ XMLUtils.addSaxString(contentHandler, "RPC_CLIENTID",
+ ClientId.toString(clientId));
+ XMLUtils.addSaxString(contentHandler, "RPC_CALLID",
+ Integer.valueOf(callId).toString());
+ }
+
@SuppressWarnings("unchecked")
static abstract class AddCloseOp extends FSEditLogOp implements BlockListUpdatingOp {
int length;
@@ -176,7 +276,7 @@ public abstract class FSEditLogOp {
PermissionStatus permissions;
String clientName;
String clientMachine;
-
+
private AddCloseOp(FSEditLogOpCodes opCode) {
super(opCode);
assert(opCode == OP_ADD || opCode == OP_CLOSE);
@@ -247,8 +347,7 @@ public abstract class FSEditLogOp {
}
@Override
- public
- void writeFields(DataOutputStream out) throws IOException {
+ public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(inodeId, out);
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeShort(replication, out);
@@ -261,6 +360,8 @@ public abstract class FSEditLogOp {
if (this.opCode == OP_ADD) {
FSImageSerialization.writeString(clientName,out);
FSImageSerialization.writeString(clientMachine,out);
+ // write clientId and callId
+ writeRpcIds(rpcClientId, rpcCallId, out);
}
}
@@ -317,6 +418,8 @@ public abstract class FSEditLogOp {
if (this.opCode == OP_ADD) {
this.clientName = FSImageSerialization.readString(in);
this.clientMachine = FSImageSerialization.readString(in);
+ // read clientId and callId
+ readRpcIds(in, logVersion);
} else {
this.clientName = "";
this.clientMachine = "";
@@ -368,6 +471,9 @@ public abstract class FSEditLogOp {
builder.append(clientName);
builder.append(", clientMachine=");
builder.append(clientMachine);
+ if (this.opCode == OP_ADD) {
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
+ }
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@@ -397,9 +503,13 @@ public abstract class FSEditLogOp {
FSEditLogOp.blockToXml(contentHandler, b);
}
FSEditLogOp.permissionStatusToXml(contentHandler, permissions);
+ if (this.opCode == OP_ADD) {
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+ }
}
- @Override void fromXml(Stanza st) throws InvalidXmlException {
+ @Override
+ void fromXml(Stanza st) throws InvalidXmlException {
this.length = Integer.valueOf(st.getValue("LENGTH"));
this.inodeId = Long.valueOf(st.getValue("INODEID"));
this.path = st.getValue("PATH");
@@ -420,9 +530,14 @@ public abstract class FSEditLogOp {
}
this.permissions =
permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
+ readRpcIdsFromXml(st);
}
}
+ /**
+ * {@literal @AtMostOnce} for {@link ClientProtocol#startFile} and
+ * {@link ClientProtocol#appendFile}
+ */
static class AddOp extends AddCloseOp {
private AddOp() {
super(OP_ADD);
@@ -446,6 +561,11 @@ public abstract class FSEditLogOp {
}
}
+ /**
+ * Although {@link ClientProtocol#appendFile} may also log a close op, we do
+ * not need to record the rpc ids here since a successful appendFile op will
+ * finally log an AddOp.
+ */
static class CloseOp extends AddCloseOp {
private CloseOp() {
super(OP_CLOSE);
@@ -469,6 +589,10 @@ public abstract class FSEditLogOp {
}
}
+ /**
+ * {@literal @AtMostOnce} for {@link ClientProtocol#updatePipeline}, but
+ * {@literal @Idempotent} for some other ops.
+ */
static class UpdateBlocksOp extends FSEditLogOp implements BlockListUpdatingOp {
String path;
Block[] blocks;
@@ -481,7 +605,6 @@ public abstract class FSEditLogOp {
return (UpdateBlocksOp)cache.get(OP_UPDATE_BLOCKS);
}
-
UpdateBlocksOp setPath(String path) {
this.path = path;
return this;
@@ -507,6 +630,8 @@ public abstract class FSEditLogOp {
void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeCompactBlockArray(blocks, out);
+ // clientId and callId
+ writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@@ -514,6 +639,7 @@ public abstract class FSEditLogOp {
path = FSImageSerialization.readString(in);
this.blocks = FSImageSerialization.readCompactBlockArray(
in, logVersion);
+ readRpcIds(in, logVersion);
}
@Override
@@ -527,8 +653,9 @@ public abstract class FSEditLogOp {
sb.append("UpdateBlocksOp [path=")
.append(path)
.append(", blocks=")
- .append(Arrays.toString(blocks))
- .append("]");
+ .append(Arrays.toString(blocks));
+ appendRpcIdsToString(sb, rpcClientId, rpcCallId);
+ sb.append("]");
return sb.toString();
}
@@ -538,6 +665,7 @@ public abstract class FSEditLogOp {
for (Block b : blocks) {
FSEditLogOp.blockToXml(contentHandler, b);
}
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@@ -547,9 +675,11 @@ public abstract class FSEditLogOp {
for (int i = 0; i < blocks.size(); i++) {
this.blocks[i] = FSEditLogOp.blockFromXml(blocks.get(i));
}
+ readRpcIdsFromXml(st);
}
}
+ /** {@literal @Idempotent} for {@link ClientProtocol#setReplication} */
static class SetReplicationOp extends FSEditLogOp {
String path;
short replication;
@@ -618,6 +748,7 @@ public abstract class FSEditLogOp {
}
}
+ /** {@literal @AtMostOnce} for {@link ClientProtocol#concat} */
static class ConcatDeleteOp extends FSEditLogOp {
int length;
String trg;
@@ -654,8 +785,7 @@ public abstract class FSEditLogOp {
}
@Override
- public
- void writeFields(DataOutputStream out) throws IOException {
+ public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(trg, out);
DeprecatedUTF8 info[] = new DeprecatedUTF8[srcs.length];
@@ -666,6 +796,9 @@ public abstract class FSEditLogOp {
new ArrayWritable(DeprecatedUTF8.class, info).write(out);
FSImageSerialization.writeLong(timestamp, out);
+
+ // rpc ids
+ writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@@ -704,6 +837,8 @@ public abstract class FSEditLogOp {
} else {
this.timestamp = readLong(in);
}
+ // read RPC ids if necessary
+ readRpcIds(in, logVersion);
}
@Override
@@ -717,6 +852,7 @@ public abstract class FSEditLogOp {
builder.append(Arrays.toString(srcs));
builder.append(", timestamp=");
builder.append(timestamp);
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@@ -738,6 +874,7 @@ public abstract class FSEditLogOp {
"SOURCE" + (i + 1), srcs[i]);
}
contentHandler.endElement("", "", "SOURCES");
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@@ -755,9 +892,11 @@ public abstract class FSEditLogOp {
for (i = 0; i < srcs.length; i++) {
srcs[i] = sources.get(0).getValue("SOURCE" + (i + 1));
}
+ readRpcIdsFromXml(st);
}
}
+ /** {@literal @AtMostOnce} for {@link ClientProtocol#rename} */
static class RenameOldOp extends FSEditLogOp {
int length;
String src;
@@ -793,6 +932,7 @@ public abstract class FSEditLogOp {
FSImageSerialization.writeString(src, out);
FSImageSerialization.writeString(dst, out);
FSImageSerialization.writeLong(timestamp, out);
+ writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@@ -812,6 +952,9 @@ public abstract class FSEditLogOp {
} else {
this.timestamp = readLong(in);
}
+
+ // read RPC ids if necessary
+ readRpcIds(in, logVersion);
}
@Override
@@ -825,6 +968,7 @@ public abstract class FSEditLogOp {
builder.append(dst);
builder.append(", timestamp=");
builder.append(timestamp);
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@@ -841,16 +985,21 @@ public abstract class FSEditLogOp {
XMLUtils.addSaxString(contentHandler, "DST", dst);
XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
Long.valueOf(timestamp).toString());
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
- @Override void fromXml(Stanza st) throws InvalidXmlException {
+ @Override
+ void fromXml(Stanza st) throws InvalidXmlException {
this.length = Integer.valueOf(st.getValue("LENGTH"));
this.src = st.getValue("SRC");
this.dst = st.getValue("DST");
this.timestamp = Long.valueOf(st.getValue("TIMESTAMP"));
+
+ readRpcIdsFromXml(st);
}
}
+ /** {@literal @AtMostOnce} for {@link ClientProtocol#delete} */
static class DeleteOp extends FSEditLogOp {
int length;
String path;
@@ -879,6 +1028,7 @@ public abstract class FSEditLogOp {
void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeLong(timestamp, out);
+ writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@@ -896,6 +1046,8 @@ public abstract class FSEditLogOp {
} else {
this.timestamp = readLong(in);
}
+ // read RPC ids if necessary
+ readRpcIds(in, logVersion);
}
@Override
@@ -907,6 +1059,7 @@ public abstract class FSEditLogOp {
builder.append(path);
builder.append(", timestamp=");
builder.append(timestamp);
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@@ -922,15 +1075,19 @@ public abstract class FSEditLogOp {
XMLUtils.addSaxString(contentHandler, "PATH", path);
XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
Long.valueOf(timestamp).toString());
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
this.length = Integer.valueOf(st.getValue("LENGTH"));
this.path = st.getValue("PATH");
this.timestamp = Long.valueOf(st.getValue("TIMESTAMP"));
+
+ readRpcIdsFromXml(st);
}
}
-
+
+ /** {@literal @Idempotent} for {@link ClientProtocol#mkdirs} */
static class MkdirOp extends FSEditLogOp {
int length;
long inodeId;
@@ -1056,6 +1213,13 @@ public abstract class FSEditLogOp {
}
}
+ /**
+ * The corresponding operations are either {@literal @Idempotent} (
+ * {@link ClientProtocol#updateBlockForPipeline},
+ * {@link ClientProtocol#recoverLease}, {@link ClientProtocol#addBlock}) or
+ * already bound with other editlog op which records rpc ids (
+ * {@link ClientProtocol#startFile}). Thus no need to record rpc ids here.
+ */
static class SetGenstampV1Op extends FSEditLogOp {
long genStampV1;
@@ -1108,6 +1272,7 @@ public abstract class FSEditLogOp {
}
}
+ /** Similar with {@link SetGenstampV1Op} */
static class SetGenstampV2Op extends FSEditLogOp {
long genStampV2;
@@ -1160,6 +1325,7 @@ public abstract class FSEditLogOp {
}
}
+ /** {@literal @Idempotent} for {@link ClientProtocol#addBlock} */
static class AllocateBlockIdOp extends FSEditLogOp {
long blockId;
@@ -1212,6 +1378,7 @@ public abstract class FSEditLogOp {
}
}
+ /** {@literal @Idempotent} for {@link ClientProtocol#setPermission} */
static class SetPermissionsOp extends FSEditLogOp {
String src;
FsPermission permissions;
@@ -1277,6 +1444,7 @@ public abstract class FSEditLogOp {
}
}
+ /** {@literal @Idempotent} for {@link ClientProtocol#setOwner} */
static class SetOwnerOp extends FSEditLogOp {
String src;
String username;
@@ -1357,7 +1525,7 @@ public abstract class FSEditLogOp {
st.getValue("GROUPNAME") : null;
}
}
-
+
static class SetNSQuotaOp extends FSEditLogOp {
String src;
long nsQuota;
@@ -1457,6 +1625,7 @@ public abstract class FSEditLogOp {
}
}
+ /** {@literal @Idempotent} for {@link ClientProtocol#setQuota} */
static class SetQuotaOp extends FSEditLogOp {
String src;
long nsQuota;
@@ -1534,6 +1703,7 @@ public abstract class FSEditLogOp {
}
}
+ /** {@literal @Idempotent} for {@link ClientProtocol#setTimes} */
static class TimesOp extends FSEditLogOp {
int length;
String path;
@@ -1629,6 +1799,7 @@ public abstract class FSEditLogOp {
}
}
+ /** {@literal @AtMostOnce} for {@link ClientProtocol#createSymlink} */
static class SymlinkOp extends FSEditLogOp {
int length;
long inodeId;
@@ -1677,14 +1848,14 @@ public abstract class FSEditLogOp {
}
@Override
- public
- void writeFields(DataOutputStream out) throws IOException {
+ public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeLong(inodeId, out);
FSImageSerialization.writeString(path, out);
FSImageSerialization.writeString(value, out);
FSImageSerialization.writeLong(mtime, out);
FSImageSerialization.writeLong(atime, out);
permissionStatus.write(out);
+ writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@@ -1714,6 +1885,9 @@ public abstract class FSEditLogOp {
this.atime = readLong(in);
}
this.permissionStatus = PermissionStatus.read(in);
+
+ // read RPC ids if necessary
+ readRpcIds(in, logVersion);
}
@Override
@@ -1733,6 +1907,7 @@ public abstract class FSEditLogOp {
builder.append(atime);
builder.append(", permissionStatus=");
builder.append(permissionStatus);
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@@ -1754,9 +1929,11 @@ public abstract class FSEditLogOp {
XMLUtils.addSaxString(contentHandler, "ATIME",
Long.valueOf(atime).toString());
FSEditLogOp.permissionStatusToXml(contentHandler, permissionStatus);
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
- @Override void fromXml(Stanza st) throws InvalidXmlException {
+ @Override
+ void fromXml(Stanza st) throws InvalidXmlException {
this.length = Integer.valueOf(st.getValue("LENGTH"));
this.inodeId = Long.valueOf(st.getValue("INODEID"));
this.path = st.getValue("PATH");
@@ -1765,9 +1942,12 @@ public abstract class FSEditLogOp {
this.atime = Long.valueOf(st.getValue("ATIME"));
this.permissionStatus =
permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
+
+ readRpcIdsFromXml(st);
}
}
+ /** {@literal @AtMostOnce} for {@link ClientProtocol#rename2} */
static class RenameOp extends FSEditLogOp {
int length;
String src;
@@ -1810,6 +1990,7 @@ public abstract class FSEditLogOp {
FSImageSerialization.writeString(dst, out);
FSImageSerialization.writeLong(timestamp, out);
toBytesWritable(options).write(out);
+ writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@@ -1830,6 +2011,9 @@ public abstract class FSEditLogOp {
this.timestamp = readLong(in);
}
this.options = readRenameOptions(in);
+
+ // read RPC ids if necessary
+ readRpcIds(in, logVersion);
}
private static Rename[] readRenameOptions(DataInputStream in) throws IOException {
@@ -1866,6 +2050,7 @@ public abstract class FSEditLogOp {
builder.append(timestamp);
builder.append(", options=");
builder.append(Arrays.toString(options));
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
@@ -1889,6 +2074,7 @@ public abstract class FSEditLogOp {
prefix = "|";
}
XMLUtils.addSaxString(contentHandler, "OPTIONS", bld.toString());
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@@ -1910,9 +2096,15 @@ public abstract class FSEditLogOp {
}
}
}
+ readRpcIdsFromXml(st);
}
}
-
+
+ /**
+ * {@literal @Idempotent} for {@link ClientProtocol#recoverLease}. In the
+ * meanwhile, startFile and appendFile both have their own corresponding
+ * editlog op.
+ */
static class ReassignLeaseOp extends FSEditLogOp {
String leaseHolder;
String path;
@@ -1988,6 +2180,7 @@ public abstract class FSEditLogOp {
}
}
+ /** {@literal @Idempotent} for {@link ClientProtocol#getDelegationToken} */
static class GetDelegationTokenOp extends FSEditLogOp {
DelegationTokenIdentifier token;
long expiryTime;
@@ -2059,6 +2252,7 @@ public abstract class FSEditLogOp {
}
}
+ /** {@literal @Idempotent} for {@link ClientProtocol#renewDelegationToken} */
static class RenewDelegationTokenOp extends FSEditLogOp {
DelegationTokenIdentifier token;
long expiryTime;
@@ -2130,6 +2324,7 @@ public abstract class FSEditLogOp {
}
}
+ /** {@literal @Idempotent} for {@link ClientProtocol#cancelDelegationToken} */
static class CancelDelegationTokenOp extends FSEditLogOp {
DelegationTokenIdentifier token;
@@ -2323,7 +2518,8 @@ public abstract class FSEditLogOp {
}
/**
- * Operation corresponding to creating a snapshot
+ * Operation corresponding to creating a snapshot.
+ * {@literal @AtMostOnce} for {@link ClientProtocol#createSnapshot}.
*/
static class CreateSnapshotOp extends FSEditLogOp {
String snapshotRoot;
@@ -2351,24 +2547,31 @@ public abstract class FSEditLogOp {
void readFields(DataInputStream in, int logVersion) throws IOException {
snapshotRoot = FSImageSerialization.readString(in);
snapshotName = FSImageSerialization.readString(in);
+
+ // read RPC ids if necessary
+ readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(snapshotRoot, out);
FSImageSerialization.writeString(snapshotName, out);
+ writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName);
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
snapshotRoot = st.getValue("SNAPSHOTROOT");
snapshotName = st.getValue("SNAPSHOTNAME");
+
+ readRpcIdsFromXml(st);
}
@Override
@@ -2378,13 +2581,15 @@ public abstract class FSEditLogOp {
builder.append(snapshotRoot);
builder.append(", snapshotName=");
builder.append(snapshotName);
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}
/**
- * Operation corresponding to delete a snapshot
+ * Operation corresponding to delete a snapshot.
+ * {@literal @AtMostOnce} for {@link ClientProtocol#deleteSnapshot}.
*/
static class DeleteSnapshotOp extends FSEditLogOp {
String snapshotRoot;
@@ -2412,24 +2617,31 @@ public abstract class FSEditLogOp {
void readFields(DataInputStream in, int logVersion) throws IOException {
snapshotRoot = FSImageSerialization.readString(in);
snapshotName = FSImageSerialization.readString(in);
+
+ // read RPC ids if necessary
+ readRpcIds(in, logVersion);
}
@Override
public void writeFields(DataOutputStream out) throws IOException {
FSImageSerialization.writeString(snapshotRoot, out);
FSImageSerialization.writeString(snapshotName, out);
+ writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
XMLUtils.addSaxString(contentHandler, "SNAPSHOTNAME", snapshotName);
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
void fromXml(Stanza st) throws InvalidXmlException {
snapshotRoot = st.getValue("SNAPSHOTROOT");
snapshotName = st.getValue("SNAPSHOTNAME");
+
+ readRpcIdsFromXml(st);
}
@Override
@@ -2439,13 +2651,15 @@ public abstract class FSEditLogOp {
builder.append(snapshotRoot);
builder.append(", snapshotName=");
builder.append(snapshotName);
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
}
/**
- * Operation corresponding to rename a snapshot
+ * Operation corresponding to rename a snapshot.
+ * {@literal @AtMostOnce} for {@link ClientProtocol#renameSnapshot}.
*/
static class RenameSnapshotOp extends FSEditLogOp {
String snapshotRoot;
@@ -2480,6 +2694,9 @@ public abstract class FSEditLogOp {
snapshotRoot = FSImageSerialization.readString(in);
snapshotOldName = FSImageSerialization.readString(in);
snapshotNewName = FSImageSerialization.readString(in);
+
+ // read RPC ids if necessary
+ readRpcIds(in, logVersion);
}
@Override
@@ -2487,6 +2704,8 @@ public abstract class FSEditLogOp {
FSImageSerialization.writeString(snapshotRoot, out);
FSImageSerialization.writeString(snapshotOldName, out);
FSImageSerialization.writeString(snapshotNewName, out);
+
+ writeRpcIds(rpcClientId, rpcCallId, out);
}
@Override
@@ -2494,6 +2713,7 @@ public abstract class FSEditLogOp {
XMLUtils.addSaxString(contentHandler, "SNAPSHOTROOT", snapshotRoot);
XMLUtils.addSaxString(contentHandler, "SNAPSHOTOLDNAME", snapshotOldName);
XMLUtils.addSaxString(contentHandler, "SNAPSHOTNEWNAME", snapshotNewName);
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@Override
@@ -2501,6 +2721,8 @@ public abstract class FSEditLogOp {
snapshotRoot = st.getValue("SNAPSHOTROOT");
snapshotOldName = st.getValue("SNAPSHOTOLDNAME");
snapshotNewName = st.getValue("SNAPSHOTNEWNAME");
+
+ readRpcIdsFromXml(st);
}
@Override
@@ -2512,6 +2734,7 @@ public abstract class FSEditLogOp {
builder.append(snapshotOldName);
builder.append(", snapshotNewName=");
builder.append(snapshotNewName);
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
}
@@ -2520,7 +2743,7 @@ public abstract class FSEditLogOp {
/**
* Operation corresponding to allow creating snapshot on a directory
*/
- static class AllowSnapshotOp extends FSEditLogOp {
+ static class AllowSnapshotOp extends FSEditLogOp { // @Idempotent
String snapshotRoot;
public AllowSnapshotOp() {
@@ -2574,7 +2797,7 @@ public abstract class FSEditLogOp {
/**
* Operation corresponding to disallow creating snapshot on a directory
*/
- static class DisallowSnapshotOp extends FSEditLogOp {
+ static class DisallowSnapshotOp extends FSEditLogOp { // @Idempotent
String snapshotRoot;
public DisallowSnapshotOp() {
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Tue Jul 30 08:01:00 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ShortWritable;
import org.apache.hadoop.io.Text;
@@ -80,6 +81,7 @@ public class FSImageSerialization {
static private final class TLData {
final DeprecatedUTF8 U_STR = new DeprecatedUTF8();
final ShortWritable U_SHORT = new ShortWritable();
+ final IntWritable U_INT = new IntWritable();
final LongWritable U_LONG = new LongWritable();
final FsPermission FILE_PERM = new FsPermission((short) 0);
}
@@ -350,9 +352,9 @@ public class FSImageSerialization {
/** read the long value */
static long readLong(DataInput in) throws IOException {
- LongWritable ustr = TL_DATA.get().U_LONG;
- ustr.readFields(in);
- return ustr.get();
+ LongWritable uLong = TL_DATA.get().U_LONG;
+ uLong.readFields(in);
+ return uLong.get();
}
/** write the long value */
@@ -361,6 +363,20 @@ public class FSImageSerialization {
uLong.set(value);
uLong.write(out);
}
+
+ /** read the int value */
+ static int readInt(DataInput in) throws IOException {
+ IntWritable uInt = TL_DATA.get().U_INT;
+ uInt.readFields(in);
+ return uInt.get();
+ }
+
+ /** write the int value */
+ static void writeInt(int value, DataOutputStream out) throws IOException {
+ IntWritable uInt = TL_DATA.get().U_INT;
+ uInt.set(value);
+ uInt.write(out);
+ }
/** read short value */
static short readShort(DataInput in) throws IOException {
@@ -414,8 +430,13 @@ public class FSImageSerialization {
private static void writeLocalName(INodeAttributes inode, DataOutput out)
throws IOException {
final byte[] name = inode.getLocalNameBytes();
- out.writeShort(name.length);
- out.write(name);
+ writeBytes(name, out);
+ }
+
+ public static void writeBytes(byte[] data, DataOutput out)
+ throws IOException {
+ out.writeShort(data.length);
+ out.write(data);
}
/**
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Jul 30 08:01:00 2013
@@ -547,7 +547,7 @@ public class FSNamesystem implements Nam
FSImage fsImage = new FSImage(conf,
FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf));
- FSNamesystem namesystem = new FSNamesystem(conf, fsImage);
+ FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
StartupOption startOpt = NameNode.getStartupOption(conf);
if (startOpt == StartupOption.RECOVER) {
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
@@ -565,7 +565,11 @@ public class FSNamesystem implements Nam
}
return namesystem;
}
-
+
+ FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
+ this(conf, fsImage, false);
+ }
+
/**
* Create an FSNamesystem associated with the specified image.
*
@@ -574,9 +578,12 @@ public class FSNamesystem implements Nam
*
* @param conf configuration
* @param fsImage The FSImage to associate with
+ * @param ignoreRetryCache Whether or not should ignore the retry cache setup
+ * step. For Secondary NN this should be set to true.
* @throws IOException on bad configuration
*/
- FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
+ FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
+ throws IOException {
try {
resourceRecheckInterval = conf.getLong(
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
@@ -667,7 +674,7 @@ public class FSNamesystem implements Nam
this.auditLoggers = initAuditLoggers(conf);
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
auditLoggers.get(0) instanceof DefaultAuditLogger;
- this.retryCache = initRetryCache(conf);
+ this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
} catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
@@ -680,6 +687,28 @@ public class FSNamesystem implements Nam
}
@VisibleForTesting
+ public RetryCache getRetryCache() {
+ return retryCache;
+ }
+
+ /** Whether or not retry cache is enabled */
+ boolean hasRetryCache() {
+ return retryCache != null;
+ }
+
+ void addCacheEntryWithPayload(byte[] clientId, int callId, Object payload) {
+ if (retryCache != null) {
+ retryCache.addCacheEntryWithPayload(clientId, callId, payload);
+ }
+ }
+
+ void addCacheEntry(byte[] clientId, int callId) {
+ if (retryCache != null) {
+ retryCache.addCacheEntry(clientId, callId);
+ }
+ }
+
+ @VisibleForTesting
static RetryCache initRetryCache(Configuration conf) {
boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
@@ -1518,7 +1547,7 @@ public class FSNamesystem implements Nam
boolean success = false;
try {
- concatInt(target, srcs);
+ concatInt(target, srcs, cacheEntry != null);
success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "concat", Arrays.toString(srcs), target, null);
@@ -1528,8 +1557,8 @@ public class FSNamesystem implements Nam
}
}
- private void concatInt(String target, String [] srcs)
- throws IOException, UnresolvedLinkException {
+ private void concatInt(String target, String [] srcs,
+ boolean logRetryCache) throws IOException, UnresolvedLinkException {
// verify args
if(target.isEmpty()) {
throw new IllegalArgumentException("Target file name is empty");
@@ -1558,7 +1587,7 @@ public class FSNamesystem implements Nam
if (isInSafeMode()) {
throw new SafeModeException("Cannot concat " + target, safeMode);
}
- concatInternal(pc, target, srcs);
+ concatInternal(pc, target, srcs, logRetryCache);
resultingStat = getAuditFileInfo(target, false);
} finally {
writeUnlock();
@@ -1568,8 +1597,9 @@ public class FSNamesystem implements Nam
}
/** See {@link #concat(String, String[])} */
- private void concatInternal(FSPermissionChecker pc, String target, String [] srcs)
- throws IOException, UnresolvedLinkException {
+ private void concatInternal(FSPermissionChecker pc, String target,
+ String[] srcs, boolean logRetryCache) throws IOException,
+ UnresolvedLinkException {
assert hasWriteLock();
// write permission for the target
@@ -1673,7 +1703,7 @@ public class FSNamesystem implements Nam
Arrays.toString(srcs) + " to " + target);
}
- dir.concat(target,srcs);
+ dir.concat(target,srcs, logRetryCache);
}
/**
@@ -1745,7 +1775,7 @@ public class FSNamesystem implements Nam
}
boolean success = false;
try {
- createSymlinkInt(target, link, dirPerms, createParent);
+ createSymlinkInt(target, link, dirPerms, createParent, cacheEntry != null);
success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "createSymlink", link, target, null);
@@ -1756,7 +1786,7 @@ public class FSNamesystem implements Nam
}
private void createSymlinkInt(String target, String link,
- PermissionStatus dirPerms, boolean createParent)
+ PermissionStatus dirPerms, boolean createParent, boolean logRetryCache)
throws IOException, UnresolvedLinkException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target="
@@ -1787,7 +1817,7 @@ public class FSNamesystem implements Nam
checkFsObjectLimit();
// add symbolic link to namespace
- dir.addSymlink(link, target, dirPerms, createParent);
+ dir.addSymlink(link, target, dirPerms, createParent, logRetryCache);
resultingStat = getAuditFileInfo(link, false);
} finally {
writeUnlock();
@@ -1917,7 +1947,7 @@ public class FSNamesystem implements Nam
try {
status = startFileInt(src, permissions, holder, clientMachine, flag,
- createParent, replication, blockSize);
+ createParent, replication, blockSize, cacheEntry != null);
} catch (AccessControlException e) {
logAuditEvent(false, "create", src);
throw e;
@@ -1929,8 +1959,8 @@ public class FSNamesystem implements Nam
private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
String holder, String clientMachine, EnumSet<CreateFlag> flag,
- boolean createParent, short replication, long blockSize)
- throws AccessControlException, SafeModeException,
+ boolean createParent, short replication, long blockSize,
+ boolean logRetryCache) throws AccessControlException, SafeModeException,
FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -1965,8 +1995,8 @@ public class FSNamesystem implements Nam
throw new SafeModeException("Cannot create file" + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
- startFileInternal(pc, src, permissions, holder, clientMachine,
- create, overwrite, createParent, replication, blockSize);
+ startFileInternal(pc, src, permissions, holder, clientMachine, create,
+ overwrite, createParent, replication, blockSize, logRetryCache);
stat = dir.getFileInfo(src, false);
} catch (StandbyException se) {
skipSync = true;
@@ -1995,8 +2025,9 @@ public class FSNamesystem implements Nam
private void startFileInternal(FSPermissionChecker pc, String src,
PermissionStatus permissions, String holder, String clientMachine,
boolean create, boolean overwrite, boolean createParent,
- short replication, long blockSize) throws FileAlreadyExistsException,
- AccessControlException, UnresolvedLinkException, FileNotFoundException,
+ short replication, long blockSize, boolean logRetryEntry)
+ throws FileAlreadyExistsException, AccessControlException,
+ UnresolvedLinkException, FileNotFoundException,
ParentNotDirectoryException, IOException {
assert hasWriteLock();
// Verify that the destination does not exist as a directory already.
@@ -2028,7 +2059,7 @@ public class FSNamesystem implements Nam
} else {
if (overwrite) {
try {
- deleteInt(src, true); // File exists - delete if overwrite
+ deleteInt(src, true, false); // File exists - delete if overwrite
} catch (AccessControlException e) {
logAuditEvent(false, "delete", src);
throw e;
@@ -2054,7 +2085,7 @@ public class FSNamesystem implements Nam
leaseManager.addLease(newNode.getClientName(), src);
// record file record in log, record new generation stamp
- getEditLog().logOpenFile(src, newNode);
+ getEditLog().logOpenFile(src, newNode, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
+"add "+src+" to namespace for "+holder);
@@ -2083,8 +2114,9 @@ public class FSNamesystem implements Nam
* @return the last block locations if the block is partial or null otherwise
*/
private LocatedBlock appendFileInternal(FSPermissionChecker pc, String src,
- String holder, String clientMachine) throws AccessControlException,
- UnresolvedLinkException, FileNotFoundException, IOException {
+ String holder, String clientMachine, boolean logRetryCache)
+ throws AccessControlException, UnresolvedLinkException,
+ FileNotFoundException, IOException {
assert hasWriteLock();
// Verify that the destination does not exist as a directory already.
final INodesInPath iip = dir.getINodesInPath4Write(src);
@@ -2109,7 +2141,7 @@ public class FSNamesystem implements Nam
final DatanodeDescriptor clientNode =
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode,
- true, iip.getLatestSnapshot());
+ true, iip.getLatestSnapshot(), logRetryCache);
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
throw ie;
@@ -2126,13 +2158,16 @@ public class FSNamesystem implements Nam
* @param clientMachine identifier of the client machine
* @param clientNode if the client is collocated with a DN, that DN's descriptor
* @param writeToEditLog whether to persist this change to the edit log
+ * @param logRetryCache whether to record RPC ids in editlog for retry cache
+ * rebuilding
* @return the last block locations if the block is partial or null otherwise
* @throws UnresolvedLinkException
* @throws IOException
*/
LocatedBlock prepareFileForWrite(String src, INodeFile file,
String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
- boolean writeToEditLog, Snapshot latestSnapshot) throws IOException {
+ boolean writeToEditLog, Snapshot latestSnapshot, boolean logRetryCache)
+ throws IOException {
file = file.recordModification(latestSnapshot, dir.getINodeMap());
final INodeFileUnderConstruction cons = file.toUnderConstruction(
leaseHolder, clientMachine, clientNode);
@@ -2142,7 +2177,7 @@ public class FSNamesystem implements Nam
LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
if (writeToEditLog) {
- getEditLog().logOpenFile(src, cons);
+ getEditLog().logOpenFile(src, cons, logRetryCache);
}
return ret;
}
@@ -2290,7 +2325,7 @@ public class FSNamesystem implements Nam
boolean success = false;
try {
- lb = appendFileInt(src, holder, clientMachine);
+ lb = appendFileInt(src, holder, clientMachine, cacheEntry != null);
success = true;
return lb;
} catch (AccessControlException e) {
@@ -2302,7 +2337,8 @@ public class FSNamesystem implements Nam
}
private LocatedBlock appendFileInt(String src, String holder,
- String clientMachine) throws AccessControlException, SafeModeException,
+ String clientMachine, boolean logRetryCache)
+ throws AccessControlException, SafeModeException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2328,7 +2364,7 @@ public class FSNamesystem implements Nam
throw new SafeModeException("Cannot append to file" + src, safeMode);
}
src = FSDirectory.resolvePath(src, pathComponents, dir);
- lb = appendFileInternal(pc, src, holder, clientMachine);
+ lb = appendFileInternal(pc, src, holder, clientMachine, logRetryCache);
} catch (StandbyException se) {
skipSync = true;
throw se;
@@ -2452,7 +2488,7 @@ public class FSNamesystem implements Nam
newBlock = createNewBlock();
saveAllocatedBlock(src, inodesInPath, newBlock, targets);
- dir.persistBlocks(src, pendingFile);
+ dir.persistBlocks(src, pendingFile, false);
offset = pendingFile.computeFileSize();
} finally {
writeUnlock();
@@ -2654,7 +2690,7 @@ public class FSNamesystem implements Nam
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+ b + " is removed from pendingCreates");
}
- dir.persistBlocks(src, file);
+ dir.persistBlocks(src, file, false);
} finally {
writeUnlock();
}
@@ -2871,7 +2907,7 @@ public class FSNamesystem implements Nam
}
boolean ret = false;
try {
- ret = renameToInt(src, dst);
+ ret = renameToInt(src, dst, cacheEntry != null);
} catch (AccessControlException e) {
logAuditEvent(false, "rename", src, dst, null);
throw e;
@@ -2881,7 +2917,7 @@ public class FSNamesystem implements Nam
return ret;
}
- private boolean renameToInt(String src, String dst)
+ private boolean renameToInt(String src, String dst, boolean logRetryCache)
throws IOException, UnresolvedLinkException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
@@ -2905,7 +2941,7 @@ public class FSNamesystem implements Nam
src = FSDirectory.resolvePath(src, srcComponents, dir);
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
checkOperation(OperationCategory.WRITE);
- status = renameToInternal(pc, src, dst);
+ status = renameToInternal(pc, src, dst, logRetryCache);
if (status) {
resultingStat = getAuditFileInfo(dst, false);
}
@@ -2921,8 +2957,9 @@ public class FSNamesystem implements Nam
/** @deprecated See {@link #renameTo(String, String)} */
@Deprecated
- private boolean renameToInternal(FSPermissionChecker pc, String src, String dst)
- throws IOException, UnresolvedLinkException {
+ private boolean renameToInternal(FSPermissionChecker pc, String src,
+ String dst, boolean logRetryCache) throws IOException,
+ UnresolvedLinkException {
assert hasWriteLock();
if (isPermissionEnabled) {
//We should not be doing this. This is move() not renameTo().
@@ -2935,7 +2972,7 @@ public class FSNamesystem implements Nam
checkAncestorAccess(pc, actualdst, FsAction.WRITE);
}
- if (dir.renameTo(src, dst)) {
+ if (dir.renameTo(src, dst, logRetryCache)) {
return true;
}
return false;
@@ -2970,7 +3007,7 @@ public class FSNamesystem implements Nam
}
src = FSDirectory.resolvePath(src, srcComponents, dir);
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
- renameToInternal(pc, src, dst, options);
+ renameToInternal(pc, src, dst, cacheEntry != null, options);
resultingStat = getAuditFileInfo(dst, false);
success = true;
} finally {
@@ -2988,14 +3025,14 @@ public class FSNamesystem implements Nam
}
private void renameToInternal(FSPermissionChecker pc, String src, String dst,
- Options.Rename... options) throws IOException {
+ boolean logRetryCache, Options.Rename... options) throws IOException {
assert hasWriteLock();
if (isPermissionEnabled) {
checkParentAccess(pc, src, FsAction.WRITE);
checkAncestorAccess(pc, dst, FsAction.WRITE);
}
- dir.renameTo(src, dst, options);
+ dir.renameTo(src, dst, logRetryCache, options);
}
/**
@@ -3013,7 +3050,7 @@ public class FSNamesystem implements Nam
}
boolean ret = false;
try {
- ret = deleteInt(src, recursive);
+ ret = deleteInt(src, recursive, cacheEntry != null);
} catch (AccessControlException e) {
logAuditEvent(false, "delete", src);
throw e;
@@ -3023,13 +3060,13 @@ public class FSNamesystem implements Nam
return ret;
}
- private boolean deleteInt(String src, boolean recursive)
+ private boolean deleteInt(String src, boolean recursive, boolean logRetryCache)
throws AccessControlException, SafeModeException,
UnresolvedLinkException, IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
}
- boolean status = deleteInternal(src, recursive, true);
+ boolean status = deleteInternal(src, recursive, true, logRetryCache);
if (status) {
logAuditEvent(true, "delete", src);
}
@@ -3057,7 +3094,7 @@ public class FSNamesystem implements Nam
* @see ClientProtocol#delete(String, boolean) for description of exceptions
*/
private boolean deleteInternal(String src, boolean recursive,
- boolean enforcePermission)
+ boolean enforcePermission, boolean logRetryCache)
throws AccessControlException, SafeModeException, UnresolvedLinkException,
IOException {
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
@@ -3081,7 +3118,7 @@ public class FSNamesystem implements Nam
FsAction.ALL, false);
}
// Unlink the target directory from directory tree
- if (!dir.delete(src, collectedBlocks, removedINodes)) {
+ if (!dir.delete(src, collectedBlocks, removedINodes, logRetryCache)) {
return false;
}
ret = true;
@@ -3410,7 +3447,7 @@ public class FSNamesystem implements Nam
if (lastBlockLength > 0) {
pendingFile.updateLengthOfLastBlock(lastBlockLength);
}
- dir.persistBlocks(src, pendingFile);
+ dir.persistBlocks(src, pendingFile, false);
} finally {
writeUnlock();
}
@@ -3707,7 +3744,7 @@ public class FSNamesystem implements Nam
src = closeFileCommitBlocks(pendingFile, storedBlock);
} else {
// If this commit does not want to close the file, persist blocks
- src = persistBlocks(pendingFile);
+ src = persistBlocks(pendingFile, false);
}
} finally {
writeUnlock();
@@ -3756,10 +3793,10 @@ public class FSNamesystem implements Nam
* @throws IOException
*/
@VisibleForTesting
- String persistBlocks(INodeFileUnderConstruction pendingFile)
- throws IOException {
+ String persistBlocks(INodeFileUnderConstruction pendingFile,
+ boolean logRetryCache) throws IOException {
String src = leaseManager.findPath(pendingFile);
- dir.persistBlocks(src, pendingFile);
+ dir.persistBlocks(src, pendingFile, logRetryCache);
return src;
}
@@ -5545,7 +5582,8 @@ public class FSNamesystem implements Nam
}
assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+ oldBlock + " has different block identifier";
- updatePipelineInternal(clientName, oldBlock, newBlock, newNodes);
+ updatePipelineInternal(clientName, oldBlock, newBlock, newNodes,
+ cacheEntry != null);
success = true;
} finally {
writeUnlock();
@@ -5557,7 +5595,7 @@ public class FSNamesystem implements Nam
/** @see #updatePipeline(String, ExtendedBlock, ExtendedBlock, DatanodeID[]) */
private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
- ExtendedBlock newBlock, DatanodeID[] newNodes)
+ ExtendedBlock newBlock, DatanodeID[] newNodes, boolean logRetryCache)
throws IOException {
assert hasWriteLock();
// check the vadility of the block and lease holder name
@@ -5592,7 +5630,7 @@ public class FSNamesystem implements Nam
blockinfo.setExpectedLocations(descriptors);
String src = leaseManager.findPath(pendingFile);
- dir.persistBlocks(src, pendingFile);
+ dir.persistBlocks(src, pendingFile, logRetryCache);
}
// rename was successful. If any part of the renamed subtree had
@@ -6357,7 +6395,8 @@ public class FSNamesystem implements Nam
} finally {
dir.writeUnlock();
}
- getEditLog().logCreateSnapshot(snapshotRoot, snapshotName);
+ getEditLog().logCreateSnapshot(snapshotRoot, snapshotName,
+ cacheEntry != null);
} finally {
writeUnlock();
RetryCache.setState(cacheEntry, snapshotPath != null, snapshotPath);
@@ -6399,7 +6438,8 @@ public class FSNamesystem implements Nam
dir.verifySnapshotName(snapshotNewName, path);
snapshotManager.renameSnapshot(path, snapshotOldName, snapshotNewName);
- getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName);
+ getEditLog().logRenameSnapshot(path, snapshotOldName, snapshotNewName,
+ cacheEntry != null);
success = true;
} finally {
writeUnlock();
@@ -6523,7 +6563,8 @@ public class FSNamesystem implements Nam
removedINodes.clear();
this.removeBlocks(collectedBlocks);
collectedBlocks.clear();
- getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName);
+ getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName,
+ cacheEntry != null);
success = true;
} finally {
writeUnlock();
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1508336&r1=1508335&r2=1508336&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Jul 30 08:01:00 2013
@@ -726,7 +726,8 @@ public class NameNode {
}
/** get FSImage */
- FSImage getFSImage() {
+ @VisibleForTesting
+ public FSImage getFSImage() {
return namesystem.dir.fsImage;
}