You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/29 18:28:51 UTC
svn commit: r1152295 [1/10] - in /hadoop/common/trunk/hdfs: ./ bin/ ivy/
src/docs/src/documentation/content/xdocs/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/common/ src/j...
Author: todd
Date: Fri Jul 29 16:28:45 2011
New Revision: 1152295
URL: http://svn.apache.org/viewvc?rev=1152295&view=rev
Log:
HDFS-1073. Redesign the NameNode's storage layout for image checkpoints and edit logs to introduce transaction IDs and be more robust. Contributed by Todd Lipcon and Ivan Kelly.
Added:
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/util/AtomicFileOutputStream.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/util/MD5FileUtils.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/common/StorageAdapter.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionFunctional.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/util/TestAtomicFileOutputStream.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/util/TestMD5FileUtils.java
Removed:
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
Modified:
hadoop/common/trunk/hdfs/CHANGES.txt
hadoop/common/trunk/hdfs/bin/hdfs
hadoop/common/trunk/hdfs/ivy.xml
hadoop/common/trunk/hdfs/ivy/libraries.properties
hadoop/common/trunk/hdfs/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
hadoop/common/trunk/hdfs/src/java/hdfs-default.xml
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/CheckpointCommand.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeRegistration.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsViewer.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/Tokenizer.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java
hadoop/common/trunk/hdfs/src/test/findbugsExcludeFile.xml
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSStorageStateRecovery.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/UpgradeUtilities.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNameEditsConfigs.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestParallelImageWrite.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSaveNamespace.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartupOptionUpgrade.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineEditsViewer/editsStored
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineEditsViewer/editsStored.xml
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/test/GenericTestUtils.java
Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Fri Jul 29 16:28:45 2011
@@ -11,6 +11,10 @@ Trunk (unreleased changes)
HDFS-2210. Remove hdfsproxy. (eli)
+ HDFS-1073. Redesign the NameNode's storage layout for image checkpoints
+ and edit logs to introduce transaction IDs and be more robust.
+ Please see HDFS-1073 section below for breakout of individual patches.
+
NEW FEATURES
HDFS-1359. Add BlockPoolID to Block. (suresh)
@@ -904,6 +908,57 @@ Trunk (unreleased changes)
HDFS-1776. Bug in Concat code. (Bharath Mundlapudi via Dmytro Molkov)
+ BREAKDOWN OF HDFS-1073 SUBTASKS
+
+ HDFS-1521. Persist transaction ID on disk between NN restarts.
+ (Ivan Kelly and Todd Lipcon via todd)
+ HDFS-1538. Refactor more startup and image loading code out of FSImage.
+ (todd)
+ HDFS-1729. Add code to detect valid length of an edits file. (todd)
+ HDFS-1793. Add code to inspect a storage directory with txid-based
+ filenames (todd)
+ HDFS-1794. Add code to list which edit logs are available on a remote NN
+ (todd)
+ HDFS-1858. Add state management variables to FSEditLog (Ivan Kelly and Todd
+ Lipcon via todd)
+ HDFS-1859. Add some convenience functions to iterate over edit log streams
+ (Ivan Kelly and Todd Lipcon via todd)
+ HDFS-1894. Add constants for LAYOUT_VERSIONs in edits log branch (todd)
+ HDFS-1892. Fix EditLogFileInputStream.getValidLength to be aware of
+ OP_INVALID filler (todd)
+ HDFS-1799. Refactor log rolling and filename management out of FSEditLog
+ (Ivan Kelly and Todd Lipcon via todd)
+ HDFS-1801. Remove use of timestamps to identify checkpoints and logs (todd)
+ HDFS-1930. TestDFSUpgrade failing in HDFS-1073 branch (todd)
+ HDFS-1800. Extend image checksumming to function with multiple fsimage
+ files per directory. (todd)
+ HDFS-1725. Set storage directories only at FSImage construction (Ivan Kelly
+ via todd)
+ HDFS-1926. Remove references to StorageDirectory from JournalManager
+ interface (Ivan Kelly via todd)
+ HDFS-1893. Change edit logs and images to be named based on txid (todd)
+ HDFS-1985. Clean up image transfer servlet (todd)
+ HDFS-1984. Enable multiple secondary namenodes to run simultaneously (todd)
+ HDFS-1987. Re-enable TestCheckpoint.testSecondaryImageDownload which was
+ not running previously. (todd)
+ HDFS-1993. TestCheckpoint needs to clean up between cases (todd)
+ HDFS-1992. Remove vestiges of NNStorageListener. (todd)
+ HDFS-1991. Some refactoring of Secondary NameNode to be able to share more
+ code with the BackupNode or CheckpointNode. (todd)
+ HDFS-1994. Fix race conditions when running two rapidly checkpointing
+ Secondary NameNodes. (todd)
+ HDFS-2001. Remove use of previous.checkpoint and lastcheckpoint.tmp
+ directories (todd)
+ HDFS-2015. Remove checkpointTxId from VERSION file. (todd)
+ HDFS-2016. Add infrastructure to remove or archive old and unneeded storage
+ files within the name directories. (todd)
+ HDFS-2047. Improve TestNamespace and TestEditLog in HDFS-1073 branch.
+ (todd)
+ HDFS-2048. Add upgrade tests and fix upgrade from 0.22 with corrupt image.
+ (todd)
+ HDFS-2027. Image inspector should return finalized logs before unfinalized
+ logs. (todd)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hdfs/bin/hdfs
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/bin/hdfs?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/bin/hdfs (original)
+++ hadoop/common/trunk/hdfs/bin/hdfs Fri Jul 29 16:28:45 2011
@@ -106,9 +106,7 @@ elif [ "$COMMAND" = "getconf" ] ; then
elif [ "$COMMAND" = "groups" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.GetGroups
else
- echo $COMMAND - invalid command
- print_usage
- exit
+ CLASS="$COMMAND"
fi
# for developers, add hdfs classes to CLASSPATH
Modified: hadoop/common/trunk/hdfs/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/ivy.xml?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/ivy.xml (original)
+++ hadoop/common/trunk/hdfs/ivy.xml Fri Jul 29 16:28:45 2011
@@ -67,6 +67,7 @@
<dependency org="commons-logging" name="commons-logging" rev="${commons-logging.version}" conf="compile->master"/>
<dependency org="commons-daemon" name="commons-daemon" rev="${commons-daemon.version}" conf="hdfs->default" />
<dependency org="log4j" name="log4j" rev="${log4j.version}" conf="common->master"/>
+ <dependency org="com.google.guava" name="guava" rev="${guava.version}" conf="hdfs->default" />
<dependency org="com.google.protobuf" name="protobuf-java" rev="2.4.0a" conf="common->master"/>
<dependency org="org.apache.hadoop" name="avro" rev="${avro.version}" conf="compile->master">
<exclude module="ant"/>
Modified: hadoop/common/trunk/hdfs/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/ivy/libraries.properties?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/ivy/libraries.properties (original)
+++ hadoop/common/trunk/hdfs/ivy/libraries.properties Fri Jul 29 16:28:45 2011
@@ -34,6 +34,8 @@ commons-net.version=1.4.1
core.version=3.1.1
coreplugin.version=1.3.2
+guava.version=r09
+
hadoop-common.version=0.23.0-SNAPSHOT
hadoop-hdfs.version=0.23.0-SNAPSHOT
Modified: hadoop/common/trunk/hdfs/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml (original)
+++ hadoop/common/trunk/hdfs/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml Fri Jul 29 16:28:45 2011
@@ -271,9 +271,9 @@
the maximum delay between two consecutive checkpoints, and
</li>
<li>
- <code>dfs.namenode.checkpoint.size</code>, set to 64MB by default, defines the
- size of the edits log file that forces an urgent checkpoint even if
- the maximum checkpoint delay is not reached.
+ <code>dfs.namenode.checkpoint.txns</code>, set to 40000 default, defines the
+ number of uncheckpointed transactions on the NameNode which will force
+ an urgent checkpoint, even if the checkpoint period has not been reached.
</li>
</ul>
<p>
@@ -322,9 +322,9 @@
the maximum delay between two consecutive checkpoints
</li>
<li>
- <code>dfs.namenode.checkpoint.size</code>, set to 64MB by default, defines the
- size of the edits log file that forces an urgent checkpoint even if
- the maximum checkpoint delay is not reached.
+ <code>dfs.namenode.checkpoint.txns</code>, set to 40000 default, defines the
+ number of uncheckpointed transactions on the NameNode which will force
+ an urgent checkpoint, even if the checkpoint period has not been reached.
</li>
</ul>
<p>
Modified: hadoop/common/trunk/hdfs/src/java/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/hdfs-default.xml?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/hdfs-default.xml (original)
+++ hadoop/common/trunk/hdfs/src/java/hdfs-default.xml Fri Jul 29 16:28:45 2011
@@ -582,10 +582,30 @@ creations/deletions), or "all".</descrip
</property>
<property>
- <name>dfs.namenode.checkpoint.size</name>
- <value>67108864</value>
- <description>The size of the current edit log (in bytes) that triggers
- a periodic checkpoint even if the dfs.namenode.checkpoint.period hasn't expired.
+ <name>dfs.namenode.checkpoint.txns</name>
+ <value>40000</value>
+ <description>The Secondary NameNode or CheckpointNode will create a checkpoint
+ of the namespace every 'dfs.namenode.checkpoint.txns' transactions, regardless
+ of whether 'dfs.namenode.checkpoint.period' has expired.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.checkpoint.check.period</name>
+ <value>60</value>
+ <description>The SecondaryNameNode and CheckpointNode will poll the NameNode
+ every 'dfs.namenode.checkpoint.check.period' seconds to query the number
+ of uncheckpointed transactions.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.num.checkpoints.retained</name>
+ <value>2</value>
+ <description>The number of image checkpoint files that will be retained by
+ the NameNode and Secondary NameNode in their storage directories. All edit
+ logs necessary to recover an up-to-date namespace from the oldest retained
+ checkpoint will also be retained.
</description>
</property>
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jul 29 16:28:45 2011
@@ -73,10 +73,12 @@ public class DFSConfigKeys extends Commo
public static final int DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT = 0;
public static final String DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY = "dfs.namenode.secondary.http-address";
public static final String DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50090";
+ public static final String DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY = "dfs.namenode.checkpoint.check.period";
+ public static final long DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT = 60;
public static final String DFS_NAMENODE_CHECKPOINT_PERIOD_KEY = "dfs.namenode.checkpoint.period";
public static final long DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT = 3600;
- public static final String DFS_NAMENODE_CHECKPOINT_SIZE_KEY = "dfs.namenode.checkpoint.size";
- public static final long DFS_NAMENODE_CHECKPOINT_SIZE_DEFAULT = 4194304;
+ public static final String DFS_NAMENODE_CHECKPOINT_TXNS_KEY = "dfs.namenode.checkpoint.txns";
+ public static final long DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 40000;
public static final String DFS_NAMENODE_UPGRADE_PERMISSION_KEY = "dfs.namenode.upgrade.permission";
public static final int DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT = 00777;
public static final String DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY = "dfs.namenode.heartbeat.recheck-interval";
@@ -110,6 +112,9 @@ public class DFSConfigKeys extends Commo
public static final boolean DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT = false;
public static final String DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_KEY = "dfs.namenode.support.allow.format";
public static final boolean DFS_NAMENODE_SUPPORT_ALLOW_FORMAT_DEFAULT = true;
+ public static final String DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY = "dfs.namenode.num.checkpoints.retained";
+ public static final int DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT = 2;
+
public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
public static final String DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated";
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/HdfsConfiguration.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/HdfsConfiguration.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/HdfsConfiguration.java Fri Jul 29 16:28:45 2011
@@ -85,7 +85,6 @@ public class HdfsConfiguration extends C
deprecate("fs.checkpoint.dir", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY);
deprecate("fs.checkpoint.edits.dir", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY);
deprecate("fs.checkpoint.period", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY);
- deprecate("fs.checkpoint.size", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_SIZE_KEY);
deprecate("dfs.upgrade.permission", DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY);
deprecate("heartbeat.recheck.interval", DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
deprecate("StorageId", DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY);
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Fri Jul 29 16:28:45 2011
@@ -65,6 +65,9 @@ public interface FSConstants {
// type of the datanode report
public static enum DatanodeReportType {ALL, LIVE, DEAD }
+
+ // An invalid transaction ID that will never be seen in a real namesystem.
+ public static final long INVALID_TXID = -12345;
/**
* Distributed upgrade actions:
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java Fri Jul 29 16:28:45 2011
@@ -78,7 +78,9 @@ public class LayoutVersion {
RESERVED_REL22(-33, -27, "Reserved for release 0.22"),
RESERVED_REL23(-34, -30, "Reserved for release 0.23"),
FEDERATION(-35, "Support for namenode federation"),
- LEASE_REASSIGNMENT(-36, "Support for persisting lease holder reassignment");
+ LEASE_REASSIGNMENT(-36, "Support for persisting lease holder reassignment"),
+ STORED_TXIDS(-37, "Transaction IDs are stored in edits log and image files"),
+ TXID_BASED_LAYOUT(-38, "File names in NN Storage are based on transaction IDs");
final int lv;
final int ancestorLV;
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/Storage.java Fri Jul 29 16:28:45 2011
@@ -181,6 +181,16 @@ public abstract class Storage extends St
return new DirIterator(dirType);
}
+ public Iterable<StorageDirectory> dirIterable(final StorageDirType dirType) {
+ return new Iterable<StorageDirectory>() {
+ @Override
+ public Iterator<StorageDirectory> iterator() {
+ return dirIterator(dirType);
+ }
+ };
+ }
+
+
/**
* generate storage list (debug line)
*/
@@ -568,13 +578,17 @@ public abstract class Storage extends St
LOG.info("Locking is disabled");
return;
}
- this.lock = tryLock();
- if (lock == null) {
+ FileLock newLock = tryLock();
+ if (newLock == null) {
String msg = "Cannot lock storage " + this.root
+ ". The directory is already locked.";
LOG.info(msg);
throw new IOException(msg);
}
+ // Don't overwrite lock until success - this way if we accidentally
+ // call lock twice, the internal state won't be cleared by the second
+ // (failed) lock attempt
+ lock = newLock;
}
/**
@@ -614,6 +628,45 @@ public abstract class Storage extends St
lock.channel().close();
lock = null;
}
+
+ @Override
+ public String toString() {
+ return "Storage Directory " + this.root;
+ }
+
+ /**
+ * Check whether underlying file system supports file locking.
+ *
+ * @return <code>true</code> if exclusive locks are supported or
+ * <code>false</code> otherwise.
+ * @throws IOException
+ * @see StorageDirectory#lock()
+ */
+ public boolean isLockSupported() throws IOException {
+ FileLock firstLock = null;
+ FileLock secondLock = null;
+ try {
+ firstLock = lock;
+ if(firstLock == null) {
+ firstLock = tryLock();
+ if(firstLock == null)
+ return true;
+ }
+ secondLock = tryLock();
+ if(secondLock == null)
+ return true;
+ } finally {
+ if(firstLock != null && firstLock != lock) {
+ firstLock.release();
+ firstLock.channel().close();
+ }
+ if(secondLock != null) {
+ secondLock.release();
+ secondLock.channel().close();
+ }
+ }
+ return false;
+ }
}
/**
@@ -829,41 +882,6 @@ public abstract class Storage extends St
}
}
- /**
- * Check whether underlying file system supports file locking.
- *
- * @return <code>true</code> if exclusive locks are supported or
- * <code>false</code> otherwise.
- * @throws IOException
- * @see StorageDirectory#lock()
- */
- public boolean isLockSupported(int idx) throws IOException {
- StorageDirectory sd = storageDirs.get(idx);
- FileLock firstLock = null;
- FileLock secondLock = null;
- try {
- firstLock = sd.lock;
- if(firstLock == null) {
- firstLock = sd.tryLock();
- if(firstLock == null)
- return true;
- }
- secondLock = sd.tryLock();
- if(secondLock == null)
- return true;
- } finally {
- if(firstLock != null && firstLock != sd.lock) {
- firstLock.release();
- firstLock.channel().close();
- }
- if(secondLock != null) {
- secondLock.release();
- secondLock.channel().close();
- }
- }
- return false;
- }
-
public static String getBuildVersion() {
return VersionInfo.getRevision();
}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java Fri Jul 29 16:28:45 2011
@@ -25,6 +25,8 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import com.google.common.base.Joiner;
+
/**
* Common class for storage information.
*
@@ -105,4 +107,9 @@ public class StorageInfo implements Writ
.append(";nsid=").append(namespaceID).append(";c=").append(cTime);
return sb.toString();
}
+
+ public String toColonSeparatedString() {
+ return Joiner.on(":").join(
+ layoutVersion, namespaceID, cTime, clusterID);
+ }
}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Fri Jul 29 16:28:45 2011
@@ -19,29 +19,21 @@ package org.apache.hadoop.hdfs.server.na
import java.io.BufferedInputStream;
import java.io.DataInputStream;
-import java.io.File;
import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
import java.util.Iterator;
-import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.namenode.FSImage;
-import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
-import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogLoadPlan;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Preconditions;
/**
* Extension of FSImage for the backup node.
@@ -50,29 +42,56 @@ import org.apache.hadoop.io.LongWritable
*/
@InterfaceAudience.Private
public class BackupImage extends FSImage {
- // Names of the journal spool directory and the spool file
- private static final String STORAGE_JSPOOL_DIR = "jspool";
- private static final String STORAGE_JSPOOL_FILE =
- NNStorage.NameNodeFile.EDITS_NEW.getName();
-
/** Backup input stream for loading edits into memory */
- private EditLogBackupInputStream backupInputStream;
-
- /** Is journal spooling in progress */
- volatile JSpoolState jsState;
-
- static enum JSpoolState {
- OFF,
- INPROGRESS,
- WAIT;
+ private EditLogBackupInputStream backupInputStream =
+ new EditLogBackupInputStream("Data from remote NameNode");
+
+ /**
+ * Current state of the BackupNode. The BackupNode's state
+ * transitions are as follows:
+ *
+ * Initial: DROP_UNTIL_NEXT_ROLL
+ * - Transitions to JOURNAL_ONLY the next time the log rolls
+ * - Transitions to IN_SYNC in convergeJournalSpool
+ * - Transitions back to JOURNAL_ONLY if the log rolls while
+ * stopApplyingOnNextRoll is true.
+ */
+ volatile BNState bnState;
+ static enum BNState {
+ /**
+ * Edits from the NN should be dropped. On the next log roll,
+ * transition to JOURNAL_ONLY state
+ */
+ DROP_UNTIL_NEXT_ROLL,
+ /**
+ * Edits from the NN should be written to the local edits log
+ * but not applied to the namespace.
+ */
+ JOURNAL_ONLY,
+ /**
+ * Edits should be written to the local edits log and applied
+ * to the local namespace.
+ */
+ IN_SYNC;
}
/**
+ * Flag to indicate that the next time the NN rolls, the BN
+ * should transition from to JOURNAL_ONLY state.
+ * {@see #freezeNamespaceAtNextRoll()}
+ */
+ private boolean stopApplyingEditsOnNextRoll = false;
+
+ /**
+ * Construct a backup image.
+ * @param conf Configuration
+ * @throws IOException if storage cannot be initialised.
*/
- BackupImage() {
- super();
+ BackupImage(Configuration conf) throws IOException {
+ super(conf);
storage.setDisablePreUpgradableLayoutCheck(true);
- jsState = JSpoolState.OFF;
+ bnState = BNState.DROP_UNTIL_NEXT_ROLL;
+ editLog.initJournals();
}
/**
@@ -81,14 +100,9 @@ public class BackupImage extends FSImage
* Read VERSION and fstime files if exist.<br>
* Do not load image or edits.
*
- * @param imageDirs list of image directories as URI.
- * @param editsDirs list of edits directories URI.
* @throws IOException if the node should shutdown.
*/
- void recoverCreateRead(Collection<URI> imageDirs,
- Collection<URI> editsDirs) throws IOException {
- storage.setStorageDirectories(imageDirs, editsDirs);
- storage.setCheckpointTime(0L);
+ void recoverCreateRead() throws IOException {
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
StorageState curState;
@@ -123,282 +137,260 @@ public class BackupImage extends FSImage
}
/**
- * Reset storage directories.
- * <p>
- * Unlock the storage.
- * Rename <code>current</code> to <code>lastcheckpoint.tmp</code>
- * and recreate empty <code>current</code>.
- * @throws IOException
+ * Save meta-data into fsimage files.
+ * and create empty edits.
*/
- synchronized void reset() throws IOException {
- // reset NameSpace tree
- FSDirectory fsDir = getFSNamesystem().dir;
- fsDir.reset();
-
- // unlock, close and rename storage directories
- storage.unlockAll();
- // recover from unsuccessful checkpoint if necessary
- recoverCreateRead(storage.getImageDirectories(),
- storage.getEditsDirectories());
- // rename and recreate
- for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
- StorageDirectory sd = it.next();
- // rename current to lastcheckpoint.tmp
- storage.moveCurrent(sd);
- }
+ void saveCheckpoint() throws IOException {
+ saveNamespace();
}
/**
- * Load checkpoint from local files only if the memory state is empty.<br>
- * Set new checkpoint time received from the name-node.<br>
- * Move <code>lastcheckpoint.tmp</code> to <code>previous.checkpoint</code>.
+ * Receive a batch of edits from the NameNode.
+ *
+ * Depending on bnState, different actions are taken. See
+ * {@link BackupImage.BNState}
+ *
+ * @param firstTxId first txid in batch
+ * @param numTxns number of transactions
+ * @param data serialized journal records.
* @throws IOException
+ * @see #convergeJournalSpool()
*/
- void loadCheckpoint(CheckpointSignature sig) throws IOException {
- // load current image and journal if it is not in memory already
- if(!editLog.isOpen())
- editLog.open();
-
- FSDirectory fsDir = getFSNamesystem().dir;
- if(fsDir.isEmpty()) {
- Iterator<StorageDirectory> itImage
- = storage.dirIterator(NameNodeDirType.IMAGE);
- Iterator<StorageDirectory> itEdits
- = storage.dirIterator(NameNodeDirType.EDITS);
- if(!itImage.hasNext() || ! itEdits.hasNext())
- throw new IOException("Could not locate checkpoint directories");
- StorageDirectory sdName = itImage.next();
- StorageDirectory sdEdits = itEdits.next();
- getFSDirectoryRootLock().writeLock();
- try { // load image under rootDir lock
- loadFSImage(NNStorage.getStorageFile(sdName, NameNodeFile.IMAGE));
- } finally {
- getFSDirectoryRootLock().writeUnlock();
- }
- loadFSEdits(sdEdits);
- }
+ synchronized void journal(long firstTxId, int numTxns, byte[] data) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Got journal, " +
+ "state = " + bnState +
+ "; firstTxId = " + firstTxId +
+ "; numTxns = " + numTxns);
+ }
+
+ switch(bnState) {
+ case DROP_UNTIL_NEXT_ROLL:
+ return;
- // set storage fields
- storage.setStorageInfo(sig);
- storage.setImageDigest(sig.imageDigest);
- storage.setCheckpointTime(sig.checkpointTime);
+ case IN_SYNC:
+ // update NameSpace in memory
+ applyEdits(firstTxId, numTxns, data);
+ break;
+
+ case JOURNAL_ONLY:
+ break;
+
+ default:
+ throw new AssertionError("Unhandled state: " + bnState);
+ }
+
+ // write to BN's local edit log.
+ logEditsLocally(firstTxId, numTxns, data);
}
/**
- * Save meta-data into fsimage files.
- * and create empty edits.
+ * Write the batch of edits to the local copy of the edit logs.
*/
- void saveCheckpoint() throws IOException {
- saveNamespace(false);
- }
-
- private FSDirectory getFSDirectoryRootLock() {
- return getFSNamesystem().dir;
- }
-
- static File getJSpoolDir(StorageDirectory sd) {
- return new File(sd.getRoot(), STORAGE_JSPOOL_DIR);
- }
-
- static File getJSpoolFile(StorageDirectory sd) {
- return new File(getJSpoolDir(sd), STORAGE_JSPOOL_FILE);
+ private void logEditsLocally(long firstTxId, int numTxns, byte[] data) {
+ long expectedTxId = editLog.getLastWrittenTxId() + 1;
+ Preconditions.checkState(firstTxId == expectedTxId,
+ "received txid batch starting at %s but expected txn %s",
+ firstTxId, expectedTxId);
+ editLog.setNextTxId(firstTxId + numTxns - 1);
+ editLog.logEdit(data.length, data);
+ editLog.logSync();
}
/**
- * Journal writer journals new meta-data state.
- * <ol>
- * <li> If Journal Spool state is OFF then journal records (edits)
- * are applied directly to meta-data state in memory and are written
- * to the edits file(s).</li>
- * <li> If Journal Spool state is INPROGRESS then records are only
- * written to edits.new file, which is called Spooling.</li>
- * <li> Journal Spool state WAIT blocks journaling until the
- * Journal Spool reader finalizes merging of the spooled data and
- * switches to applying journal to memory.</li>
- * </ol>
- * @param length length of data.
- * @param data serialized journal records.
- * @throws IOException
- * @see #convergeJournalSpool()
+ * Apply the batch of edits to the local namespace.
*/
- synchronized void journal(int length, byte[] data) throws IOException {
+ private synchronized void applyEdits(long firstTxId, int numTxns, byte[] data)
+ throws IOException {
+ Preconditions.checkArgument(firstTxId == lastAppliedTxId + 1,
+ "Received txn batch starting at %s but expected %s",
+ firstTxId, lastAppliedTxId + 1);
assert backupInputStream.length() == 0 : "backup input stream is not empty";
try {
- switch(jsState) {
- case WAIT:
- case OFF:
- // wait until spooling is off
- waitSpoolEnd();
- // update NameSpace in memory
- backupInputStream.setBytes(data);
- FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
- int logVersion = storage.getLayoutVersion();
- BufferedInputStream bin = new BufferedInputStream(backupInputStream);
- DataInputStream in = new DataInputStream(bin);
- Checksum checksum = null;
- if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
- checksum = FSEditLog.getChecksum();
- in = new DataInputStream(new CheckedInputStream(bin, checksum));
- }
- logLoader.loadEditRecords(logVersion, in, checksum, true);
- getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
- break;
- case INPROGRESS:
- break;
+ if (LOG.isTraceEnabled()) {
+ LOG.debug("data:" + StringUtils.byteToHexString(data));
}
- // write to files
- editLog.logEdit(length, data);
- editLog.logSync();
+ backupInputStream.setBytes(data);
+ FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+ int logVersion = storage.getLayoutVersion();
+ BufferedInputStream bin = new BufferedInputStream(backupInputStream);
+ DataInputStream in = new DataInputStream(bin);
+ Checksum checksum = FSEditLog.getChecksum();
+ int numLoaded = logLoader.loadEditRecords(logVersion, in, checksum, true,
+ lastAppliedTxId + 1);
+ if (numLoaded != numTxns) {
+ throw new IOException("Batch of txns starting at txnid " +
+ firstTxId + " was supposed to contain " + numTxns +
+ " transactions but only was able to apply " + numLoaded);
+ }
+ lastAppliedTxId += numTxns;
+
+ getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
} finally {
backupInputStream.clear();
}
}
- private synchronized void waitSpoolEnd() {
- while(jsState == JSpoolState.WAIT) {
+ /**
+ * Transition the BackupNode from JOURNAL_ONLY state to IN_SYNC state.
+ * This is done by repeated invocations of tryConvergeJournalSpool until
+ * we are caught up to the latest in-progress edits file.
+ */
+ void convergeJournalSpool() throws IOException {
+ Preconditions.checkState(bnState == BNState.JOURNAL_ONLY,
+ "bad state: %s", bnState);
+
+ while (!tryConvergeJournalSpool()) {
+ ;
+ }
+ assert bnState == BNState.IN_SYNC;
+ }
+
+ private boolean tryConvergeJournalSpool() throws IOException {
+ Preconditions.checkState(bnState == BNState.JOURNAL_ONLY,
+ "bad state: %s", bnState);
+
+ // This section is unsynchronized so we can continue to apply
+ // ahead of where we're reading, concurrently. Since the state
+ // is JOURNAL_ONLY at this point, we know that lastAppliedTxId
+ // doesn't change, and curSegmentTxId only increases
+
+ while (lastAppliedTxId < editLog.getCurSegmentTxId() - 1) {
+ long target = editLog.getCurSegmentTxId();
+ LOG.info("Loading edits into backupnode to try to catch up from txid "
+ + lastAppliedTxId + " to " + target);
+ FSImageTransactionalStorageInspector inspector =
+ new FSImageTransactionalStorageInspector();
+
+ storage.inspectStorageDirs(inspector);
+ LogLoadPlan logLoadPlan = inspector.createLogLoadPlan(lastAppliedTxId,
+ target - 1);
+
+ logLoadPlan.doRecovery();
+ loadEdits(logLoadPlan.getEditsFiles());
+ }
+
+ // now, need to load the in-progress file
+ synchronized (this) {
+ if (lastAppliedTxId != editLog.getCurSegmentTxId() - 1) {
+ LOG.debug("Logs rolled while catching up to current segment");
+ return false; // drop lock and try again to load local logs
+ }
+
+ EditLogInputStream stream = getEditLog().getInProgressFileInputStream();
try {
- wait();
- } catch (InterruptedException e) {}
+ long remainingTxns = getEditLog().getLastWrittenTxId() - lastAppliedTxId;
+
+ LOG.info("Going to finish converging with remaining " + remainingTxns
+ + " txns from in-progress stream " + stream);
+
+ FSEditLogLoader loader = new FSEditLogLoader(namesystem);
+ int numLoaded = loader.loadFSEdits(stream, lastAppliedTxId + 1);
+ lastAppliedTxId += numLoaded;
+ assert numLoaded == remainingTxns :
+ "expected to load " + remainingTxns + " but loaded " +
+ numLoaded + " from " + stream;
+ } finally {
+ IOUtils.closeStream(stream);
+ }
+
+ LOG.info("Successfully synced BackupNode with NameNode at txnid " +
+ lastAppliedTxId);
+ setState(BNState.IN_SYNC);
}
- // now spooling should be off, verifying just in case
- assert jsState == JSpoolState.OFF : "Unexpected JSpool state: " + jsState;
+ return true;
}
/**
- * Start journal spool.
- * Switch to writing into edits.new instead of edits.
- *
- * edits.new for spooling is in separate directory "spool" rather than in
- * "current" because the two directories should be independent.
- * While spooling a checkpoint can happen and current will first
- * move to lastcheckpoint.tmp and then to previous.checkpoint
- * spool/edits.new will remain in place during that.
- */
- synchronized void startJournalSpool(NamenodeRegistration nnReg)
- throws IOException {
- switch(jsState) {
- case OFF:
- break;
- case INPROGRESS:
- return;
- case WAIT:
- waitSpoolEnd();
+ * Transition edit log to a new state, logging as necessary.
+ */
+ private synchronized void setState(BNState newState) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("State transition " + bnState + " -> " + newState,
+ new Exception("trace"));
}
+ bnState = newState;
+ }
- // create journal spool directories
- for (Iterator<StorageDirectory> it
- = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
- StorageDirectory sd = it.next();
- File jsDir = getJSpoolDir(sd);
- if (!jsDir.exists() && !jsDir.mkdirs()) {
- throw new IOException("Mkdirs failed to create "
- + jsDir.getCanonicalPath());
+ /**
+ * Receive a notification that the NameNode has begun a new edit log.
+ * This causes the BN to also start the new edit log in its local
+ * directories.
+ */
+ synchronized void namenodeStartedLogSegment(long txid)
+ throws IOException {
+ LOG.info("NameNode started a new log segment at txid " + txid);
+ if (editLog.isOpen()) {
+ if (editLog.getLastWrittenTxId() == txid - 1) {
+ // We are in sync with the NN, so end and finalize the current segment
+ editLog.endCurrentLogSegment(false);
+ } else {
+ // We appear to have missed some transactions -- the NN probably
+ // lost contact with us temporarily. So, mark the current segment
+ // as aborted.
+ LOG.warn("NN started new log segment at txid " + txid +
+ ", but BN had only written up to txid " +
+ editLog.getLastWrittenTxId() +
+ "in the log segment starting at " +
+ editLog.getCurSegmentTxId() + ". Aborting this " +
+ "log segment.");
+ editLog.abortCurrentLogSegment();
}
- // create edit file if missing
- File eFile = storage.getEditFile(sd);
- if(!eFile.exists()) {
- editLog.createEditLogFile(eFile);
+ }
+ editLog.setNextTxId(txid);
+ editLog.startLogSegment(txid, false);
+ if (bnState == BNState.DROP_UNTIL_NEXT_ROLL) {
+ setState(BNState.JOURNAL_ONLY);
+ }
+
+ if (stopApplyingEditsOnNextRoll) {
+ if (bnState == BNState.IN_SYNC) {
+ LOG.info("Stopped applying edits to prepare for checkpoint.");
+ setState(BNState.JOURNAL_ONLY);
}
+ stopApplyingEditsOnNextRoll = false;
+ notifyAll();
}
-
- if(!editLog.isOpen())
- editLog.open();
-
- // create streams pointing to the journal spool files
- // subsequent journal records will go directly to the spool
- editLog.divertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
- setCheckpointState(CheckpointStates.ROLLED_EDITS);
-
- // set up spooling
- if(backupInputStream == null)
- backupInputStream = new EditLogBackupInputStream(nnReg.getAddress());
- jsState = JSpoolState.INPROGRESS;
}
- synchronized void setCheckpointTime(int length, byte[] data)
- throws IOException {
- assert backupInputStream.length() == 0 : "backup input stream is not empty";
- try {
- // unpack new checkpoint time
- backupInputStream.setBytes(data);
- DataInputStream in = backupInputStream.getDataInputStream();
- byte op = in.readByte();
- assert op == NamenodeProtocol.JA_CHECKPOINT_TIME;
- LongWritable lw = new LongWritable();
- lw.readFields(in);
- storage.setCheckpointTimeInStorage(lw.get());
- } finally {
- backupInputStream.clear();
- }
+ /**
+ * Request that the next time the BN receives a log roll, it should
+ * stop applying the edits log to the local namespace. This is
+ * typically followed on by a call to {@link #waitUntilNamespaceFrozen()}
+ */
+ synchronized void freezeNamespaceAtNextRoll() {
+ stopApplyingEditsOnNextRoll = true;
}
/**
- * Merge Journal Spool to memory.<p>
- * Journal Spool reader reads journal records from edits.new.
- * When it reaches the end of the file it sets {@link JSpoolState} to WAIT.
- * This blocks journaling (see {@link #journal(int,byte[])}.
- * The reader
- * <ul>
- * <li> reads remaining journal records if any,</li>
- * <li> renames edits.new to edits,</li>
- * <li> sets {@link JSpoolState} to OFF,</li>
- * <li> and notifies the journaling thread.</li>
- * </ul>
- * Journaling resumes with applying new journal records to the memory state,
- * and writing them into edits file(s).
+ * After {@link #freezeNamespaceAtNextRoll()} has been called, wait until
+ * the BN receives notification of the next log roll.
*/
- void convergeJournalSpool() throws IOException {
- Iterator<StorageDirectory> itEdits
- = storage.dirIterator(NameNodeDirType.EDITS);
- if(! itEdits.hasNext())
- throw new IOException("Could not locate checkpoint directories");
- StorageDirectory sdEdits = itEdits.next();
- int numEdits = 0;
- File jSpoolFile = getJSpoolFile(sdEdits);
- long startTime = now();
- if(jSpoolFile.exists()) {
- // load edits.new
- EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
- BufferedInputStream bin = new BufferedInputStream(edits);
- DataInputStream in = new DataInputStream(bin);
- FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
- int logVersion = logLoader.readLogVersion(in);
- Checksum checksum = null;
- if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
- checksum = FSEditLog.getChecksum();
- in = new DataInputStream(new CheckedInputStream(bin, checksum));
- }
- numEdits += logLoader.loadEditRecords(logVersion, in, checksum, false);
+ synchronized void waitUntilNamespaceFrozen() throws IOException {
+ if (bnState != BNState.IN_SYNC) return;
- // first time reached the end of spool
- jsState = JSpoolState.WAIT;
- numEdits += logLoader.loadEditRecords(logVersion,
- in, checksum, true);
- getFSNamesystem().dir.updateCountForINodeWithQuota();
- edits.close();
- }
-
- FSImage.LOG.info("Edits file " + jSpoolFile.getCanonicalPath()
- + " of size " + jSpoolFile.length() + " edits # " + numEdits
- + " loaded in " + (now()-startTime)/1000 + " seconds.");
-
- // rename spool edits.new to edits making it in sync with the active node
- // subsequent journal records will go directly to edits
- editLog.revertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
-
- // write version file
- resetVersion(false, storage.getImageDigest());
-
- // wake up journal writer
- synchronized(this) {
- jsState = JSpoolState.OFF;
- notifyAll();
+ LOG.info("Waiting until the NameNode rolls its edit logs in order " +
+ "to freeze the BackupNode namespace.");
+ while (bnState == BNState.IN_SYNC) {
+ Preconditions.checkState(stopApplyingEditsOnNextRoll,
+ "If still in sync, we should still have the flag set to " +
+ "freeze at next roll");
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted waiting for namespace to freeze", ie);
+ throw new IOException(ie);
+ }
}
+ LOG.info("BackupNode namespace frozen.");
+ }
- // Rename lastcheckpoint.tmp to previous.checkpoint
- for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
- StorageDirectory sd = it.next();
- storage.moveLastCheckpoint(sd);
- }
+ /**
+ * Override close() so that we don't finalize edit logs.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ editLog.abortCurrentLogSegment();
+ storage.close();
}
}
Added: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1152295&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (added)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Fri Jul 29 16:28:45 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+
+/**
+ * A JournalManager implementation that uses RPCs to log transactions
+ * to a BackupNode.
+ */
+class BackupJournalManager implements JournalManager {
+
+ private final NamenodeRegistration nnReg;
+ private final NamenodeRegistration bnReg;
+
+ BackupJournalManager(NamenodeRegistration bnReg,
+ NamenodeRegistration nnReg) {
+ this.bnReg = bnReg;
+ this.nnReg = nnReg;
+ }
+
+ @Override
+ public EditLogOutputStream startLogSegment(long txId) throws IOException {
+ EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, nnReg);
+ stm.startLogSegment(txId);
+ return stm;
+ }
+
+ @Override
+ public void finalizeLogSegment(long firstTxId, long lastTxId)
+ throws IOException {
+ }
+
+ @Override
+ public void setOutputBufferCapacity(int size) {
+ }
+
+ @Override
+ public void purgeLogsOlderThan(long minTxIdToKeep, StoragePurger purger)
+ throws IOException {
+ }
+
+ public boolean matchesRegistration(NamenodeRegistration bnReg) {
+ return bnReg.getAddress().equals(this.bnReg.getAddress());
+ }
+
+ @Override
+ public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId) {
+ return null;
+ }
+}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Fri Jul 29 16:28:45 2011
@@ -22,21 +22,20 @@ import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
-import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
/**
@@ -53,7 +52,7 @@ import org.apache.hadoop.net.NetUtils;
* </ol>
*/
@InterfaceAudience.Private
-public class BackupNode extends NameNode {
+public class BackupNode extends NameNode implements JournalProtocol {
private static final String BN_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
private static final String BN_ADDRESS_DEFAULT = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT;
private static final String BN_HTTP_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
@@ -119,10 +118,9 @@ public class BackupNode extends NameNode
@Override // NameNode
protected void loadNamesystem(Configuration conf) throws IOException {
- BackupImage bnImage = new BackupImage();
+ BackupImage bnImage = new BackupImage(conf);
this.namesystem = new FSNamesystem(conf, bnImage);
- bnImage.recoverCreateRead(FSNamesystem.getNamespaceDirs(conf),
- FSNamesystem.getNamespaceEditsDirs(conf));
+ bnImage.recoverCreateRead();
}
@Override // NameNode
@@ -179,6 +177,17 @@ public class BackupNode extends NameNode
super.stop();
}
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ if (protocol.equals(JournalProtocol.class.getName())) {
+ return JournalProtocol.versionID;
+ } else {
+ return super.getProtocolVersion(protocol, clientVersion);
+ }
+ }
+
/////////////////////////////////////////////////////
// NamenodeProtocol implementation for backup node.
/////////////////////////////////////////////////////
@@ -205,34 +214,36 @@ public class BackupNode extends NameNode
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
throw new UnsupportedActionException("endCheckpoint");
- }
+ }
- @Override // NamenodeProtocol
+ /////////////////////////////////////////////////////
+ // BackupNodeProtocol implementation for backup node.
+ /////////////////////////////////////////////////////
+
+ @Override
public void journal(NamenodeRegistration nnReg,
- int jAction,
- int length,
- byte[] args) throws IOException {
+ long firstTxId, int numTxns,
+ byte[] records) throws IOException {
verifyRequest(nnReg);
if(!nnRpcAddress.equals(nnReg.getAddress()))
throw new IOException("Journal request from unexpected name-node: "
+ nnReg.getAddress() + " expecting " + nnRpcAddress);
- BackupImage bnImage = (BackupImage)getFSImage();
- switch(jAction) {
- case (int)JA_IS_ALIVE:
- return;
- case (int)JA_JOURNAL:
- bnImage.journal(length, args);
- return;
- case (int)JA_JSPOOL_START:
- bnImage.startJournalSpool(nnReg);
- return;
- case (int)JA_CHECKPOINT_TIME:
- bnImage.setCheckpointTime(length, args);
- setRegistration(); // keep registration up to date
- return;
- default:
- throw new IOException("Unexpected journal action: " + jAction);
- }
+ getBNImage().journal(firstTxId, numTxns, records);
+ }
+
+ @Override
+ public void startLogSegment(NamenodeRegistration registration, long txid)
+ throws IOException {
+ verifyRequest(registration);
+
+ getBNImage().namenodeStartedLogSegment(txid);
+ }
+
+ //////////////////////////////////////////////////////
+
+
+ BackupImage getBNImage() {
+ return (BackupImage)getFSImage();
}
boolean shouldCheckpointAtStartup() {
@@ -241,9 +252,9 @@ public class BackupNode extends NameNode
assert fsImage.getStorage().getNumStorageDirs() > 0;
return ! fsImage.getStorage().getStorageDir(0).getVersionFile().exists();
}
- if(namesystem == null || namesystem.dir == null || getFSImage() == null)
- return true;
- return fsImage.getEditLog().getNumEditStreams() == 0;
+
+ // BN always checkpoints on startup in order to get in sync with namespace
+ return true;
}
private NamespaceInfo handshake(Configuration conf) throws IOException {
@@ -287,14 +298,6 @@ public class BackupNode extends NameNode
checkpointManager.doCheckpoint();
}
- CheckpointStates getCheckpointState() {
- return getFSImage().getCheckpointState();
- }
-
- void setCheckpointState(CheckpointStates cs) {
- getFSImage().setCheckpointState(cs);
- }
-
/**
* Register this backup node with the active name-node.
* @param nsInfo
@@ -302,14 +305,15 @@ public class BackupNode extends NameNode
*/
private void registerWith(NamespaceInfo nsInfo) throws IOException {
BackupImage bnImage = (BackupImage)getFSImage();
+ NNStorage storage = bnImage.getStorage();
// verify namespaceID
- if(bnImage.getStorage().getNamespaceID() == 0) // new backup storage
- bnImage.getStorage().setStorageInfo(nsInfo);
- else if(bnImage.getStorage().getNamespaceID() != nsInfo.getNamespaceID())
- throw new IOException("Incompatible namespaceIDs"
- + ": active node namespaceID = " + nsInfo.getNamespaceID()
- + "; backup node namespaceID = " + bnImage.getStorage().getNamespaceID());
-
+ if (storage.getNamespaceID() == 0) { // new backup storage
+ storage.setStorageInfo(nsInfo);
+ storage.setBlockPoolID(nsInfo.getBlockPoolID());
+ storage.setClusterID(nsInfo.getClusterID());
+ } else {
+ nsInfo.validateStorage(storage);
+ }
setRegistration();
NamenodeRegistration nnReg = null;
while(!isStopRequested()) {
@@ -338,23 +342,6 @@ public class BackupNode extends NameNode
nnRpcAddress = nnReg.getAddress();
}
- /**
- * Reset node namespace state in memory and in storage directories.
- * @throws IOException
- */
- void resetNamespace() throws IOException {
- ((BackupImage)getFSImage()).reset();
- }
-
- /**
- * Get size of the local journal (edit log).
- * @return size of the current journal
- * @throws IOException
- */
- long journalSize() throws IOException {
- return namesystem.getEditLogSize();
- }
-
// TODO: move to a common with DataNode util class
private static NamespaceInfo handshake(NamenodeProtocol namenode)
throws IOException, SocketTimeoutException {
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java Fri Jul 29 16:28:45 2011
@@ -24,10 +24,11 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
-import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
+import com.google.common.collect.ComparisonChain;
+
/**
* A unique signature intended to identify checkpoint transactions.
*/
@@ -35,41 +36,35 @@ import org.apache.hadoop.io.WritableUtil
public class CheckpointSignature extends StorageInfo
implements WritableComparable<CheckpointSignature> {
private static final String FIELD_SEPARATOR = ":";
- long editsTime = -1L;
- long checkpointTime = -1L;
- MD5Hash imageDigest = null;
+ private static final int NUM_FIELDS = 7;
+
String blockpoolID = "";
+
+ long mostRecentCheckpointTxId;
+ long curSegmentTxId;
public CheckpointSignature() {}
CheckpointSignature(FSImage fsImage) {
super(fsImage.getStorage());
blockpoolID = fsImage.getBlockPoolID();
- editsTime = fsImage.getEditLog().getFsEditTime();
- checkpointTime = fsImage.getStorage().getCheckpointTime();
- imageDigest = fsImage.getStorage().getImageDigest();
- checkpointTime = fsImage.getStorage().getCheckpointTime();
+
+ mostRecentCheckpointTxId = fsImage.getStorage().getMostRecentCheckpointTxId();
+ curSegmentTxId = fsImage.getEditLog().getCurSegmentTxId();
}
CheckpointSignature(String str) {
String[] fields = str.split(FIELD_SEPARATOR);
- assert fields.length == 8 : "Must be 8 fields in CheckpointSignature";
- layoutVersion = Integer.valueOf(fields[0]);
- namespaceID = Integer.valueOf(fields[1]);
- cTime = Long.valueOf(fields[2]);
- editsTime = Long.valueOf(fields[3]);
- checkpointTime = Long.valueOf(fields[4]);
- imageDigest = new MD5Hash(fields[5]);
- clusterID = fields[6];
- blockpoolID = fields[7];
- }
-
- /**
- * Get the MD5 image digest
- * @return the MD5 image digest
- */
- MD5Hash getImageDigest() {
- return imageDigest;
+ assert fields.length == NUM_FIELDS :
+ "Must be " + NUM_FIELDS + " fields in CheckpointSignature";
+ int i = 0;
+ layoutVersion = Integer.valueOf(fields[i++]);
+ namespaceID = Integer.valueOf(fields[i++]);
+ cTime = Long.valueOf(fields[i++]);
+ mostRecentCheckpointTxId = Long.valueOf(fields[i++]);
+ curSegmentTxId = Long.valueOf(fields[i++]);
+ clusterID = fields[i++];
+ blockpoolID = fields[i++];
}
/**
@@ -101,33 +96,26 @@ public class CheckpointSignature extends
return String.valueOf(layoutVersion) + FIELD_SEPARATOR
+ String.valueOf(namespaceID) + FIELD_SEPARATOR
+ String.valueOf(cTime) + FIELD_SEPARATOR
- + String.valueOf(editsTime) + FIELD_SEPARATOR
- + String.valueOf(checkpointTime) + FIELD_SEPARATOR
- + imageDigest.toString() + FIELD_SEPARATOR
+ + String.valueOf(mostRecentCheckpointTxId) + FIELD_SEPARATOR
+ + String.valueOf(curSegmentTxId) + FIELD_SEPARATOR
+ clusterID + FIELD_SEPARATOR
+ blockpoolID ;
}
void validateStorageInfo(FSImage si) throws IOException {
- if(layoutVersion != si.getLayoutVersion()
- || namespaceID != si.getNamespaceID()
- || cTime != si.getStorage().cTime
- || checkpointTime != si.getStorage().getCheckpointTime()
- || !imageDigest.equals(si.getStorage().imageDigest)
- || !clusterID.equals(si.getClusterID())
- || !blockpoolID.equals(si.getBlockPoolID())) {
- // checkpointTime can change when the image is saved - do not compare
+ if(layoutVersion != si.getStorage().layoutVersion
+ || namespaceID != si.getStorage().namespaceID
+ || cTime != si.getStorage().cTime
+ || !clusterID.equals(si.getClusterID())
+ || !blockpoolID.equals(si.getBlockPoolID())) {
throw new IOException("Inconsistent checkpoint fields.\n"
+ "LV = " + layoutVersion + " namespaceID = " + namespaceID
- + " cTime = " + cTime + "; checkpointTime = " + checkpointTime
- + " ; imageDigest = " + imageDigest
+ + " cTime = " + cTime
+ " ; clusterId = " + clusterID
+ " ; blockpoolId = " + blockpoolID
+ ".\nExpecting respectively: "
- + si.getLayoutVersion() + "; "
- + si.getNamespaceID() + "; " + si.getStorage().cTime
- + "; " + si.getStorage().getCheckpointTime() + "; "
- + si.getStorage().imageDigest
+ + si.getStorage().layoutVersion + "; "
+ + si.getStorage().namespaceID + "; " + si.getStorage().cTime
+ "; " + si.getClusterID() + "; "
+ si.getBlockPoolID() + ".");
}
@@ -137,19 +125,15 @@ public class CheckpointSignature extends
// Comparable interface
//
public int compareTo(CheckpointSignature o) {
- return
- (layoutVersion < o.layoutVersion) ? -1 :
- (layoutVersion > o.layoutVersion) ? 1 :
- (namespaceID < o.namespaceID) ? -1 : (namespaceID > o.namespaceID) ? 1 :
- (cTime < o.cTime) ? -1 : (cTime > o.cTime) ? 1 :
- (editsTime < o.editsTime) ? -1 : (editsTime > o.editsTime) ? 1 :
- (checkpointTime < o.checkpointTime) ? -1 :
- (checkpointTime > o.checkpointTime) ? 1 :
- (clusterID.compareTo(o.clusterID) < 0) ? -1 :
- (clusterID.compareTo(o.clusterID) > 0) ? 1 :
- (blockpoolID.compareTo(o.blockpoolID) < 0) ? -1 :
- (blockpoolID.compareTo(o.blockpoolID) > 0) ? 1 :
- imageDigest.compareTo(o.imageDigest);
+ return ComparisonChain.start()
+ .compare(layoutVersion, o.layoutVersion)
+ .compare(namespaceID, o.namespaceID)
+ .compare(cTime, o.cTime)
+ .compare(mostRecentCheckpointTxId, o.mostRecentCheckpointTxId)
+ .compare(curSegmentTxId, o.curSegmentTxId)
+ .compare(clusterID, o.clusterID)
+ .compare(blockpoolID, o.blockpoolID)
+ .result();
}
public boolean equals(Object o) {
@@ -161,9 +145,8 @@ public class CheckpointSignature extends
public int hashCode() {
return layoutVersion ^ namespaceID ^
- (int)(cTime ^ editsTime ^ checkpointTime) ^
- imageDigest.hashCode() ^ clusterID.hashCode()
- ^ blockpoolID.hashCode();
+ (int)(cTime ^ mostRecentCheckpointTxId ^ curSegmentTxId)
+ ^ clusterID.hashCode() ^ blockpoolID.hashCode();
}
/////////////////////////////////////////////////
@@ -172,17 +155,14 @@ public class CheckpointSignature extends
public void write(DataOutput out) throws IOException {
super.write(out);
WritableUtils.writeString(out, blockpoolID);
- out.writeLong(editsTime);
- out.writeLong(checkpointTime);
- imageDigest.write(out);
+ out.writeLong(mostRecentCheckpointTxId);
+ out.writeLong(curSegmentTxId);
}
public void readFields(DataInput in) throws IOException {
super.readFields(in);
blockpoolID = WritableUtils.readString(in);
- editsTime = in.readLong();
- checkpointTime = in.readLong();
- imageDigest = new MD5Hash();
- imageDigest.readFields(in);
+ mostRecentCheckpointTxId = in.readLong();
+ curSegmentTxId = in.readLong();
}
}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Fri Jul 29 16:28:45 2011
@@ -17,29 +17,30 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import java.io.IOException;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
import java.io.File;
+import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Collection;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.util.Daemon;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
+
+import com.google.common.collect.Lists;
/**
* The Checkpointer is responsible for supporting periodic checkpoints
@@ -59,7 +60,7 @@ class Checkpointer extends Daemon {
private BackupNode backupNode;
volatile boolean shouldRun;
private long checkpointPeriod; // in seconds
- private long checkpointSize; // size (in MB) of current Edit Log
+ private long checkpointTxnCount; // size (in MB) of current Edit Log
private String infoBindAddress;
@@ -79,6 +80,7 @@ class Checkpointer extends Daemon {
try {
initialize(conf);
} catch(IOException e) {
+ LOG.warn("Checkpointer got exception", e);
shutdown();
throw e;
}
@@ -87,6 +89,7 @@ class Checkpointer extends Daemon {
/**
* Initialize checkpoint.
*/
+ @SuppressWarnings("deprecation")
private void initialize(Configuration conf) throws IOException {
// Create connection to the namenode.
shouldRun = true;
@@ -94,8 +97,9 @@ class Checkpointer extends Daemon {
// Initialize other scheduling parameters from the configuration
checkpointPeriod = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY,
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT);
- checkpointSize = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_SIZE_KEY,
- DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_SIZE_DEFAULT);
+ checkpointTxnCount = conf.getLong(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT);
+ SecondaryNameNode.warnForDeprecatedConfigs(conf);
// Pull out exact http address for posting url to avoid ip aliasing issues
String fullInfoAddr = conf.get(DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY,
@@ -104,8 +108,7 @@ class Checkpointer extends Daemon {
LOG.info("Checkpoint Period : " + checkpointPeriod + " secs " +
"(" + checkpointPeriod/60 + " min)");
- LOG.info("Log Size Trigger : " + checkpointSize + " bytes " +
- "(" + checkpointSize/1024 + " KB)");
+ LOG.info("Log Size Trigger : " + checkpointTxnCount + " txns ");
}
/**
@@ -128,8 +131,9 @@ class Checkpointer extends Daemon {
periodMSec *= 1000;
long lastCheckpointTime = 0;
- if(!backupNode.shouldCheckpointAtStartup())
+ if (!backupNode.shouldCheckpointAtStartup()) {
lastCheckpointTime = now();
+ }
while(shouldRun) {
try {
long now = now();
@@ -137,8 +141,8 @@ class Checkpointer extends Daemon {
if(now >= lastCheckpointTime + periodMSec) {
shouldCheckpoint = true;
} else {
- long size = getJournalSize();
- if(size >= checkpointSize)
+ long txns = countUncheckpointedTxns();
+ if(txns >= checkpointTxnCount)
shouldCheckpoint = true;
}
if(shouldCheckpoint) {
@@ -160,64 +164,24 @@ class Checkpointer extends Daemon {
}
}
- private long getJournalSize() throws IOException {
- // If BACKUP node has been loaded
- // get edits size from the local file. ACTIVE has the same.
- if(backupNode.isRole(NamenodeRole.BACKUP)
- && getFSImage().getEditLog().isOpen())
- return backupNode.journalSize();
- // Go to the ACTIVE node for its size
- return getNamenode().journalSize(backupNode.getRegistration());
- }
-
- /**
- * Download <code>fsimage</code> and <code>edits</code>
- * files from the remote name-node.
- */
- private void downloadCheckpoint(CheckpointSignature sig) throws IOException {
- // Retrieve image file
- String fileid = "getimage=1";
- Collection<File> list = getFSImage()
- .getStorage().getFiles(NameNodeFile.IMAGE, NameNodeDirType.IMAGE);
- File[] files = list.toArray(new File[list.size()]);
- assert files.length > 0 : "No checkpoint targets.";
- String nnHttpAddr = backupNode.nnHttpAddress;
- TransferFsImage.getFileClient(nnHttpAddr, fileid, files, false);
- LOG.info("Downloaded file " + files[0].getName() + " size " +
- files[0].length() + " bytes.");
-
- // Retrieve edits file
- fileid = "getedit=1";
- list = getFSImage()
- .getStorage().getFiles(NameNodeFile.EDITS, NameNodeDirType.EDITS);
- files = list.toArray(new File[list.size()]);
- assert files.length > 0 : "No checkpoint targets.";
- TransferFsImage.getFileClient(nnHttpAddr, fileid, files, false);
- LOG.info("Downloaded file " + files[0].getName() + " size " +
- files[0].length() + " bytes.");
- }
-
- /**
- * Copy the new image into remote name-node.
- */
- private void uploadCheckpoint(CheckpointSignature sig) throws IOException {
- // Use the exact http addr as specified in config to deal with ip aliasing
- InetSocketAddress httpSocAddr = backupNode.getHttpAddress();
- int httpPort = httpSocAddr.getPort();
- String fileid = "putimage=1&port=" + httpPort +
- "&machine=" + infoBindAddress +
- "&token=" + sig.toString() +
- "&newChecksum=" + getFSImage().getStorage().getImageDigest().toString();
- LOG.info("Posted URL " + backupNode.nnHttpAddress + fileid);
- TransferFsImage.getFileClient(backupNode.nnHttpAddress,
- fileid, (File[])null, false);
+ private long countUncheckpointedTxns() throws IOException {
+ long curTxId = getNamenode().getTransactionID();
+ long uncheckpointedTxns = curTxId -
+ getFSImage().getStorage().getMostRecentCheckpointTxId();
+ assert uncheckpointedTxns >= 0;
+ return uncheckpointedTxns;
}
/**
* Create a new checkpoint
*/
void doCheckpoint() throws IOException {
+ BackupImage bnImage = getFSImage();
+ NNStorage bnStorage = bnImage.getStorage();
+
long startTime = now();
+ bnImage.freezeNamespaceAtNextRoll();
+
NamenodeCommand cmd =
getNamenode().startCheckpoint(backupNode.getRegistration());
CheckpointCommand cpCmd = null;
@@ -233,37 +197,94 @@ class Checkpointer extends Daemon {
throw new IOException("Unsupported NamenodeCommand: "+cmd.getAction());
}
+ bnImage.waitUntilNamespaceFrozen();
+
CheckpointSignature sig = cpCmd.getSignature();
- assert FSConstants.LAYOUT_VERSION == sig.getLayoutVersion() :
- "Signature should have current layout version. Expected: "
- + FSConstants.LAYOUT_VERSION + " actual "+ sig.getLayoutVersion();
- assert !backupNode.isRole(NamenodeRole.CHECKPOINT) ||
- cpCmd.isImageObsolete() : "checkpoint node should always download image.";
- backupNode.setCheckpointState(CheckpointStates.UPLOAD_START);
- if(cpCmd.isImageObsolete()) {
- // First reset storage on disk and memory state
- backupNode.resetNamespace();
- downloadCheckpoint(sig);
- }
- BackupImage bnImage = getFSImage();
- bnImage.getStorage().setBlockPoolID(backupNode.getBlockPoolId());
- bnImage.getStorage().setClusterID(backupNode.getClusterId());
- bnImage.loadCheckpoint(sig);
+ // Make sure we're talking to the same NN!
sig.validateStorageInfo(bnImage);
- bnImage.saveCheckpoint();
- if(cpCmd.needToReturnImage())
- uploadCheckpoint(sig);
+ long lastApplied = bnImage.getLastAppliedTxId();
+ LOG.debug("Doing checkpoint. Last applied: " + lastApplied);
+ RemoteEditLogManifest manifest =
+ getNamenode().getEditLogManifest(bnImage.getLastAppliedTxId());
+
+ if (!manifest.getLogs().isEmpty()) {
+ RemoteEditLog firstRemoteLog = manifest.getLogs().get(0);
+ // we don't have enough logs to roll forward using only logs. Need
+ // to download and load the image.
+ if (firstRemoteLog.getStartTxId() > lastApplied + 1) {
+ LOG.info("Unable to roll forward using only logs. Downloading " +
+ "image with txid " + sig.mostRecentCheckpointTxId);
+ MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
+ backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId,
+ bnStorage, true);
+ bnImage.saveDigestAndRenameCheckpointImage(
+ sig.mostRecentCheckpointTxId, downloadedHash);
+
+ LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
+ File file = bnStorage.findImageFile(sig.mostRecentCheckpointTxId);
+ bnImage.reloadFromImageFile(file);
+ }
+
+ lastApplied = bnImage.getLastAppliedTxId();
+ if (firstRemoteLog.getStartTxId() > lastApplied + 1) {
+ throw new IOException("No logs to roll forward from " + lastApplied);
+ }
+
+ // get edits files
+ for (RemoteEditLog log : manifest.getLogs()) {
+ TransferFsImage.downloadEditsToStorage(
+ backupNode.nnHttpAddress, log, bnStorage);
+ }
+
+ rollForwardByApplyingLogs(manifest, bnImage);
+ }
+
+ long txid = bnImage.getLastAppliedTxId();
+ bnImage.saveFSImageInAllDirs(txid);
+ bnStorage.writeAll();
+
+ if(cpCmd.needToReturnImage()) {
+ TransferFsImage.uploadImageFromStorage(
+ backupNode.nnHttpAddress, getImageListenAddress(),
+ bnStorage, txid);
+ }
getNamenode().endCheckpoint(backupNode.getRegistration(), sig);
- bnImage.convergeJournalSpool();
+ if (backupNode.getRole() == NamenodeRole.BACKUP) {
+ bnImage.convergeJournalSpool();
+ }
backupNode.setRegistration(); // keep registration up to date
- if(backupNode.isRole(NamenodeRole.CHECKPOINT))
- getFSImage().getEditLog().close();
+
+ long imageSize = bnImage.getStorage().getFsImageName(txid).length();
LOG.info("Checkpoint completed in "
+ (now() - startTime)/1000 + " seconds."
- + " New Image Size: " + bnImage.getStorage().getFsImageName().length());
+ + " New Image Size: " + imageSize);
+ }
+
+ private InetSocketAddress getImageListenAddress() {
+ InetSocketAddress httpSocAddr = backupNode.getHttpAddress();
+ int httpPort = httpSocAddr.getPort();
+ return new InetSocketAddress(infoBindAddress, httpPort);
+ }
+
+ static void rollForwardByApplyingLogs(
+ RemoteEditLogManifest manifest,
+ FSImage dstImage) throws IOException {
+ NNStorage dstStorage = dstImage.getStorage();
+
+ List<File> editsFiles = Lists.newArrayList();
+ for (RemoteEditLog log : manifest.getLogs()) {
+ File f = dstStorage.findFinalizedEditsFile(
+ log.getStartTxId(), log.getEndTxId());
+ if (log.getStartTxId() > dstImage.getLastAppliedTxId()) {
+ editsFiles.add(f);
+ }
+ }
+ LOG.info("Checkpointer about to load edits from " +
+ editsFiles.size() + " file(s).");
+ dstImage.loadEdits(editsFiles);
}
}
Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java?rev=1152295&r1=1152294&r2=1152295&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java Fri Jul 29 16:28:45 2011
@@ -42,10 +42,6 @@ class EditLogBackupInputStream extends E
super(new byte[0]);
}
- byte[] getData() {
- return super.buf;
- }
-
void setData(byte[] newBytes) {
super.buf = newBytes;
super.count = newBytes == null ? 0 : newBytes.length;