You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/14 20:53:13 UTC
svn commit: r1146845 [1/2] - in /hadoop/common/branches/HDFS-1073/hdfs: ./
src/java/org/apache/hadoop/hdfs/server/namenode/
src/java/org/apache/hadoop/hdfs/server/protocol/
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Author: todd
Date: Thu Jul 14 18:53:11 2011
New Revision: 1146845
URL: http://svn.apache.org/viewvc?rev=1146845&view=rev
Log:
HDFS-1979. Fix backupnode for new edits/image layout. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BackupNodeProtocol.java
Modified:
hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
Modified: hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt Thu Jul 14 18:53:11 2011
@@ -67,3 +67,4 @@ HDFS-2102. Zero-pad edits filename to ma
HDFS-2010. Fix NameNode to exit if all edit streams become inaccessible. (atm
via todd)
HDFS-2123. Checkpoint interval should be based on txn count, not size. (todd)
+HDFS-1979. Fix backupnode for new edits/image layout. (todd)
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Thu Jul 14 18:53:11 2011
@@ -19,31 +19,21 @@ package org.apache.hadoop.hdfs.server.na
import java.io.BufferedInputStream;
import java.io.DataInputStream;
-import java.io.File;
import java.io.IOException;
import java.util.Iterator;
-import java.util.List;
-import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogLoadPlan;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogHeader;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
-import org.apache.hadoop.hdfs.util.MD5FileUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.conf.Configuration;
+import com.google.common.base.Preconditions;
/**
* Extension of FSImage for the backup node.
@@ -52,30 +42,39 @@ import org.apache.hadoop.conf.Configurat
*/
@InterfaceAudience.Private
public class BackupImage extends FSImage {
- // Names of the journal spool directory and the spool file
- private static final String STORAGE_JSPOOL_DIR = "jspool";
- private static final String STORAGE_JSPOOL_FILE =
- NNStorage.NameNodeFile.EDITS_NEW.getName();
-
/** Backup input stream for loading edits into memory */
- private EditLogBackupInputStream backupInputStream;
-
- /** Is journal spooling in progress */
- volatile JSpoolState jsState;
- private long lastAppliedTxId = 0;
-
- static enum JSpoolState {
- OFF,
- INPROGRESS,
- WAIT;
+ private EditLogBackupInputStream backupInputStream =
+ new EditLogBackupInputStream("Data from remote NameNode");
+
+ /**
+ * Current state of the BackupNode. The BackupNode's state
+ * transitions are as follows:
+ *
+ * Initial: DROP_UNTIL_NEXT_ROLL
+ * - Transitions to JOURNAL_ONLY the next time the log rolls
+ * - Transitions to IN_SYNC in convergeJournalSpool
+ * - Transitions back to JOURNAL_ONLY if the log rolls while
+ * stopApplyingOnNextRoll is true.
+ */
+ BNState bnState;
+ static enum BNState {
+ // Edits from the NN should be dropped. On the next log roll,
+ // transition to JOURNAL_ONLY state
+ DROP_UNTIL_NEXT_ROLL,
+ // Edits from the NN should be written to the local edits log
+ // but not applied to the namespace.
+ JOURNAL_ONLY,
+ // Edits should be written to the local edits log and applied
+ // to the local namespace.
+ IN_SYNC;
}
-
/**
- * Place-holder for a txid that still needs to be addressed
- * in HDFS-1073 branch before merging into trunk.
+ * Flag to indicate that the next time the NN rolls, the BN
+ * should transition from to JOURNAL_ONLY state.
+ * {@see #freezeNamespaceAtNextRoll()}
*/
- private static final long TODO_TXID = 0xDEADBEEF;
+ private boolean stopApplyingEditsOnNextRoll = false;
/**
* Construct a backup image.
@@ -85,7 +84,8 @@ public class BackupImage extends FSImage
BackupImage(Configuration conf) throws IOException {
super(conf);
storage.setDisablePreUpgradableLayoutCheck(true);
- jsState = JSpoolState.OFF;
+ bnState = BNState.DROP_UNTIL_NEXT_ROLL;
+ editLog.initJournals();
}
/**
@@ -130,279 +130,259 @@ public class BackupImage extends FSImage
}
/**
- * Reset storage directories.
- * <p>
- * Unlock the storage.
- * Rename <code>current</code> to <code>lastcheckpoint.tmp</code>
- * and recreate empty <code>current</code>.
- * @throws IOException
+ * Save meta-data into fsimage files.
+ * and create empty edits.
*/
- synchronized void reset() throws IOException {
- /* TODO: BackupNode
- // reset NameSpace tree
- FSDirectory fsDir = getFSNamesystem().dir;
- fsDir.reset();
-
- // unlock, close and rename storage directories
- storage.unlockAll();
-
- // recover from unsuccessful checkpoint if necessary
- recoverCreateRead();
- // rename and recreate
- for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
- StorageDirectory sd = it.next();
- // rename current to lastcheckpoint.tmp
- storage.moveCurrent(sd);
- }
- */
+ void saveCheckpoint() throws IOException {
+ saveNamespace();
}
/**
- * Load checkpoint from local files only if the memory state is empty.<br>
- * Set new checkpoint time received from the name-node.<br>
- * Move <code>lastcheckpoint.tmp</code> to <code>previous.checkpoint</code>.
+ * Receive a batch of edits from the NameNode.
+ *
+ * Depending on bnState, different actions are taken. See
+ * {@link BackupImage.BNState}
+ *
+ * @param firstTxId first txid in batch
+ * @param numTxns number of transactions
+ * @param data serialized journal records.
* @throws IOException
+ * @see #convergeJournalSpool()
*/
- void loadCheckpoint(CheckpointSignature sig) throws IOException {
- // load current image and journal if it is not in memory already
- if(!editLog.isOpen())
- editLog.startLogSegment(TODO_TXID);
-
- // set storage fields
- storage.setStorageInfo(sig);
-
- FSDirectory fsDir = getFSNamesystem().dir;
- if(fsDir.isEmpty()) {
- Iterator<StorageDirectory> itImage
- = storage.dirIterator(NameNodeDirType.IMAGE);
- Iterator<StorageDirectory> itEdits
- = storage.dirIterator(NameNodeDirType.EDITS);
- if(!itImage.hasNext() || ! itEdits.hasNext())
- throw new IOException("Could not locate checkpoint directories");
- StorageDirectory sdName = itImage.next();
- StorageDirectory sdEdits = itEdits.next();
-
- getFSDirectoryRootLock().writeLock();
- try { // load image under rootDir lock
- File imageFile = null; // TODO
- MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
- loadFSImage(imageFile, expectedMD5);
- } finally {
- getFSDirectoryRootLock().writeUnlock();
- }
- List<File> editsFiles =
- FSImageOldStorageInspector.getEditsInStorageDir(sdEdits);
- loadEdits(editsFiles);
- lastAppliedTxId = getEditLog().getLastWrittenTxId();
+ synchronized void journal(long firstTxId, int numTxns, byte[] data) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Got journal, " +
+ "state = " + bnState +
+ "; firstTxId = " + firstTxId +
+ "; numTxns = " + numTxns);
}
+
+ switch(bnState) {
+ case DROP_UNTIL_NEXT_ROLL:
+ return;
+
+ case IN_SYNC:
+ // update NameSpace in memory
+ applyEdits(firstTxId, numTxns, data);
+ break;
+
+ case JOURNAL_ONLY:
+ break;
+
+ default:
+ throw new AssertionError("Unhandled state: " + bnState);
+ }
+
+ // write to BN's local edit log.
+ logEditsLocally(firstTxId, numTxns, data);
}
/**
- * Save meta-data into fsimage files.
- * and create empty edits.
+ * Write the batch of edits to the local copy of the edit logs.
*/
- void saveCheckpoint() throws IOException {
- saveNamespace();
- }
-
- private FSDirectory getFSDirectoryRootLock() {
- return getFSNamesystem().dir;
- }
-
- static File getJSpoolDir(StorageDirectory sd) {
- return new File(sd.getRoot(), STORAGE_JSPOOL_DIR);
- }
-
- static File getJSpoolFile(StorageDirectory sd) {
- return new File(getJSpoolDir(sd), STORAGE_JSPOOL_FILE);
+ private void logEditsLocally(long firstTxId, int numTxns, byte[] data) {
+ long expectedTxId = editLog.getLastWrittenTxId() + 1;
+ Preconditions.checkState(firstTxId == expectedTxId,
+ "received txid batch starting at %s but expected txn %s",
+ firstTxId, expectedTxId);
+ editLog.setNextTxId(firstTxId + numTxns - 1);
+ editLog.logEdit(data.length, data);
+ editLog.logSync();
}
/**
- * Journal writer journals new meta-data state.
- * <ol>
- * <li> If Journal Spool state is OFF then journal records (edits)
- * are applied directly to meta-data state in memory and are written
- * to the edits file(s).</li>
- * <li> If Journal Spool state is INPROGRESS then records are only
- * written to edits.new file, which is called Spooling.</li>
- * <li> Journal Spool state WAIT blocks journaling until the
- * Journal Spool reader finalizes merging of the spooled data and
- * switches to applying journal to memory.</li>
- * </ol>
- * @param length length of data.
- * @param data serialized journal records.
- * @throws IOException
- * @see #convergeJournalSpool()
+ * Apply the batch of edits to the local namespace.
*/
- synchronized void journal(int length, byte[] data) throws IOException {
+ private synchronized void applyEdits(long firstTxId, int numTxns, byte[] data)
+ throws IOException {
+ Preconditions.checkArgument(firstTxId == lastAppliedTxId + 1,
+ "Received txn batch starting at %s but expected %s",
+ firstTxId, lastAppliedTxId + 1);
assert backupInputStream.length() == 0 : "backup input stream is not empty";
try {
- switch(jsState) {
- case WAIT:
- case OFF:
- // wait until spooling is off
- waitSpoolEnd();
- // update NameSpace in memory
- backupInputStream.setBytes(data);
- FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
- int logVersion = storage.getLayoutVersion();
- BufferedInputStream bin = new BufferedInputStream(backupInputStream);
- DataInputStream in = new DataInputStream(bin);
- Checksum checksum = null;
- if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
- checksum = FSEditLog.getChecksum();
- in = new DataInputStream(new CheckedInputStream(bin, checksum));
- }
- logLoader.loadEditRecords(logVersion, in, checksum, true,
- lastAppliedTxId + 1);
- getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
- break;
- case INPROGRESS:
- break;
+ if (LOG.isTraceEnabled()) {
+ LOG.debug("data:" + StringUtils.byteToHexString(data));
+ }
+ backupInputStream.setBytes(data);
+ FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+ int logVersion = storage.getLayoutVersion();
+ BufferedInputStream bin = new BufferedInputStream(backupInputStream);
+ DataInputStream in = new DataInputStream(bin);
+ Checksum checksum = FSEditLog.getChecksum();
+ int numLoaded = logLoader.loadEditRecords(logVersion, in, checksum, true,
+ lastAppliedTxId + 1);
+ if (numLoaded != numTxns) {
+ throw new IOException("Batch of txns starting at txnid " +
+ firstTxId + " was supposed to contain " + numTxns +
+ " transactions but only was able to apply " + numLoaded);
}
- // write to files
- editLog.logEdit(length, data);
- editLog.logSync();
+ lastAppliedTxId += numTxns;
+
+ getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
} finally {
backupInputStream.clear();
}
}
- private synchronized void waitSpoolEnd() {
- while(jsState == JSpoolState.WAIT) {
+ /**
+ * Transition the BackupNode from JOURNAL_ONLY state to IN_SYNC state.
+ * This is done by repeated invocations of tryConvergeJournalSpool until
+ * we are caught up to the latest in-progress edits file.
+ */
+ void convergeJournalSpool() throws IOException {
+ Preconditions.checkState(bnState == BNState.JOURNAL_ONLY,
+ "bad state: %s", bnState);
+
+ while (!tryConvergeJournalSpool()) {
+ ;
+ }
+ assert bnState == BNState.IN_SYNC;
+ }
+
+ private boolean tryConvergeJournalSpool() throws IOException {
+ Preconditions.checkState(bnState == BNState.JOURNAL_ONLY,
+ "bad state: %s", bnState);
+
+ // This section is unsynchronized so we can continue to apply
+ // ahead of where we're reading, concurrently. Since the state
+ // is JOURNAL_ONLY at this point, we know that lastAppliedTxId
+ // doesn't change, and curSegmentTxId only increases
+
+ while (lastAppliedTxId < editLog.getCurSegmentTxId() - 1) {
+ long target = editLog.getCurSegmentTxId();
+ LOG.info("Loading edits into backupnode to try to catch up from txid "
+ + lastAppliedTxId + " to " + target);
+ FSImageTransactionalStorageInspector inspector =
+ new FSImageTransactionalStorageInspector();
+
+ storage.inspectStorageDirs(inspector);
+ LogLoadPlan logLoadPlan = inspector.createLogLoadPlan(lastAppliedTxId,
+ target - 1);
+
+ logLoadPlan.doRecovery();
+ loadEdits(logLoadPlan.getEditsFiles());
+ }
+
+ // now, need to load the in-progress file
+ synchronized (this) {
+ if (lastAppliedTxId != editLog.getCurSegmentTxId() - 1) {
+ LOG.debug("Logs rolled while catching up to current segment");
+ return false; // drop lock and try again to load local logs
+ }
+
+ EditLogInputStream stream = getEditLog().getInProgressFileInputStream();
try {
- wait();
- } catch (InterruptedException e) {}
+ long remainingTxns = getEditLog().getLastWrittenTxId() - lastAppliedTxId;
+
+ LOG.info("Going to finish converging with remaining " + remainingTxns
+ + " txns from in-progress stream " + stream);
+
+ FSEditLogLoader loader = new FSEditLogLoader(namesystem);
+ int numLoaded = loader.loadFSEdits(stream, lastAppliedTxId + 1);
+ lastAppliedTxId += numLoaded;
+ assert numLoaded == remainingTxns :
+ "expected to load " + remainingTxns + " but loaded " +
+ numLoaded + " from " + stream;
+ } finally {
+ IOUtils.closeStream(stream);
+ }
+
+ LOG.info("Successfully synced BackupNode with NameNode at txnid " +
+ lastAppliedTxId);
+ setState(BNState.IN_SYNC);
}
- // now spooling should be off, verifying just in case
- assert jsState == JSpoolState.OFF : "Unexpected JSpool state: " + jsState;
+ return true;
}
/**
- * Start journal spool.
- * Switch to writing into edits.new instead of edits.
- *
- * edits.new for spooling is in separate directory "spool" rather than in
- * "current" because the two directories should be independent.
- * While spooling a checkpoint can happen and current will first
- * move to lastcheckpoint.tmp and then to previous.checkpoint
- * spool/edits.new will remain in place during that.
- */
- synchronized void startJournalSpool(NamenodeRegistration nnReg)
- throws IOException {
- switch(jsState) {
- case OFF:
- break;
- case INPROGRESS:
- return;
- case WAIT:
- waitSpoolEnd();
+ * Transition edit log to a new state, logging as necessary.
+ */
+ private synchronized void setState(BNState newState) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("State transition " + bnState + " -> " + newState,
+ new Exception("trace"));
}
+ bnState = newState;
+ }
- // create journal spool directories
- for (Iterator<StorageDirectory> it
- = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
- StorageDirectory sd = it.next();
- File jsDir = getJSpoolDir(sd);
- if (!jsDir.exists() && !jsDir.mkdirs()) {
- throw new IOException("Mkdirs failed to create "
- + jsDir.getCanonicalPath());
+ /**
+ * Receive a notification that the NameNode has begun a new edit log.
+ * This causes the BN to also start the new edit log in its local
+ * directories.
+ */
+ synchronized void namenodeStartedLogSegment(long txid) {
+ LOG.info("NameNode started a new log segment at txid " + txid);
+ if (editLog.isOpen()) {
+ if (editLog.getLastWrittenTxId() == txid - 1) {
+ // We are in sync with the NN, so end and finalize the current segment
+ editLog.endCurrentLogSegment(false);
+ } else {
+ // We appear to have missed some transactions -- the NN probably
+ // lost contact with us temporarily. So, mark the current segment
+ // as aborted.
+ LOG.warn("NN started new log segment at txid " + txid +
+ ", but BN had only written up to txid " +
+ editLog.getLastWrittenTxId() +
+ "in the log segment starting at " +
+ editLog.getCurSegmentTxId() + ". Aborting this " +
+ "log segment.");
+ editLog.abortCurrentLogSegment();
}
- // create edit file if missing
- /*File eFile = storage.getEditFile(sd); TODO
- if(!eFile.exists()) {
- editLog.createEditLogFile(eFile);
- }*/
- }
-
- if(!editLog.isOpen())
- editLog.startLogSegment(TODO_TXID);
-
- // create streams pointing to the journal spool files
- // subsequent journal records will go directly to the spool
-// TODO editLog.divertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
-
- // set up spooling
- if(backupInputStream == null)
- backupInputStream = new EditLogBackupInputStream(nnReg.getAddress());
- jsState = JSpoolState.INPROGRESS;
+ }
+ editLog.setNextTxId(txid);
+ editLog.startLogSegment(txid, false);
+ if (bnState == BNState.DROP_UNTIL_NEXT_ROLL) {
+ setState(BNState.JOURNAL_ONLY);
+ }
+
+ if (stopApplyingEditsOnNextRoll) {
+ if (bnState == BNState.IN_SYNC) {
+ LOG.info("Stopped applying edits to prepare for checkpoint.");
+ setState(BNState.JOURNAL_ONLY);
+ }
+ stopApplyingEditsOnNextRoll = false;
+ notifyAll();
+ }
}
/**
- * Merge Journal Spool to memory.<p>
- * Journal Spool reader reads journal records from edits.new.
- * When it reaches the end of the file it sets {@link JSpoolState} to WAIT.
- * This blocks journaling (see {@link #journal(int,byte[])}.
- * The reader
- * <ul>
- * <li> reads remaining journal records if any,</li>
- * <li> renames edits.new to edits,</li>
- * <li> sets {@link JSpoolState} to OFF,</li>
- * <li> and notifies the journaling thread.</li>
- * </ul>
- * Journaling resumes with applying new journal records to the memory state,
- * and writing them into edits file(s).
+ * Request that the next time the BN receives a log roll, it should
+ * stop applying the edits log to the local namespace. This is
+ * typically followed on by a call to {@link #waitUntilNamespaceFrozen()}
*/
- void convergeJournalSpool() throws IOException {
- Iterator<StorageDirectory> itEdits
- = storage.dirIterator(NameNodeDirType.EDITS);
- if(! itEdits.hasNext())
- throw new IOException("Could not locate checkpoint directories");
- StorageDirectory sdEdits = itEdits.next();
- int numEdits = 0;
- File jSpoolFile = getJSpoolFile(sdEdits);
- long startTime = now();
- if(jSpoolFile.exists()) {
- // load edits.new
- EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
- BufferedInputStream bin = new BufferedInputStream(edits);
- DataInputStream in = new DataInputStream(bin);
- FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+ synchronized void freezeNamespaceAtNextRoll() {
+ stopApplyingEditsOnNextRoll = true;
+ }
- LogHeader header = FSEditLogOp.LogHeader.read(in);
- int loaded = logLoader.loadEditRecords(
- header.logVersion, in, header.checksum, false,
- lastAppliedTxId + 1);
-
- lastAppliedTxId += loaded;
- numEdits += loaded;
-
- // first time reached the end of spool
- jsState = JSpoolState.WAIT;
- loaded = logLoader.loadEditRecords(
- header.logVersion, in, header.checksum,
- true, lastAppliedTxId + 1);
- numEdits += loaded;
- lastAppliedTxId += loaded;
-
- getFSNamesystem().dir.updateCountForINodeWithQuota();
- edits.close();
- }
-
- FSImage.LOG.info("Edits file " + jSpoolFile.getCanonicalPath()
- + " of size " + jSpoolFile.length() + " edits # " + numEdits
- + " loaded in " + (now()-startTime)/1000 + " seconds.");
-
- // rename spool edits.new to edits making it in sync with the active node
- // subsequent journal records will go directly to edits
- // TODO editLog.revertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
-
- // write version file
- // TODO resetVersion(storage.getImageDigest());
-
- // wake up journal writer
- synchronized(this) {
- jsState = JSpoolState.OFF;
- notifyAll();
- }
+ /**
+ * After {@link #freezeNamespaceAtNextRoll()} has been called, wait until
+ * the BN receives notification of the next log roll.
+ */
+ synchronized void waitUntilNamespaceFrozen() throws IOException {
+ if (bnState != BNState.IN_SYNC) return;
- /*
- * TODO: bn
- // Rename lastcheckpoint.tmp to previous.checkpoint
- for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
- StorageDirectory sd = it.next();
- storage.moveLastCheckpoint(sd);
+ LOG.info("Waiting until the NameNode rolls its edit logs in order " +
+ "to freeze the BackupNode namespace.");
+ while (bnState == BNState.IN_SYNC) {
+ Preconditions.checkState(stopApplyingEditsOnNextRoll,
+ "If still in sync, we should still have the flag set to " +
+ "freeze at next roll");
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted waiting for namespace to freeze", ie);
+ throw new IOException(ie);
+ }
}
- */
+ LOG.info("BackupNode namespace frozen.");
+ }
+
+ /**
+ * Override close() so that we don't finalize edit logs.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ editLog.abortCurrentLogSegment();
+ storage.close();
}
}
Added: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1146845&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (added)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Thu Jul 14 18:53:11 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.namenode.NNStorageArchivalManager.StorageArchiver;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+
+public class BackupJournalManager implements JournalManager {
+
+ private final NamenodeRegistration nnReg;
+ private final NamenodeRegistration bnReg;
+
+ BackupJournalManager(NamenodeRegistration bnReg,
+ NamenodeRegistration nnReg) {
+ this.bnReg = bnReg;
+ this.nnReg = nnReg;
+ }
+
+ @Override
+ public EditLogOutputStream startLogSegment(long txId) throws IOException {
+ EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, nnReg);
+ stm.startLogSegment(txId);
+ return stm;
+ }
+
+ @Override
+ public void finalizeLogSegment(long firstTxId, long lastTxId)
+ throws IOException {
+ }
+
+ @Override
+ public void setOutputBufferCapacity(int size) {
+ }
+
+ @Override
+ public void archiveLogsOlderThan(long minTxIdToKeep, StorageArchiver archiver)
+ throws IOException {
+ }
+
+ public boolean matchesRegistration(NamenodeRegistration bnReg) {
+ return bnReg.getAddress().equals(this.bnReg.getAddress());
+ }
+
+ @Override
+ public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId) {
+ return null;
+ }
+}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Thu Jul 14 18:53:11 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.protocol.BackupNodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -51,7 +52,7 @@ import org.apache.hadoop.net.NetUtils;
* </ol>
*/
@InterfaceAudience.Private
-public class BackupNode extends NameNode {
+public class BackupNode extends NameNode implements BackupNodeProtocol {
private static final String BN_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
private static final String BN_ADDRESS_DEFAULT = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT;
private static final String BN_HTTP_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
@@ -176,6 +177,17 @@ public class BackupNode extends NameNode
super.stop();
}
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ if (protocol.equals(BackupNodeProtocol.class.getName())) {
+ return BackupNodeProtocol.versionID;
+ } else {
+ return super.getProtocolVersion(protocol, clientVersion);
+ }
+ }
+
/////////////////////////////////////////////////////
// NamenodeProtocol implementation for backup node.
/////////////////////////////////////////////////////
@@ -202,30 +214,36 @@ public class BackupNode extends NameNode
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
throw new UnsupportedActionException("endCheckpoint");
- }
+ }
- @Override // NamenodeProtocol
+ /////////////////////////////////////////////////////
+ // BackupNodeProtocol implementation for backup node.
+ /////////////////////////////////////////////////////
+
+ @Override
public void journal(NamenodeRegistration nnReg,
- int jAction,
- int length,
- byte[] args) throws IOException {
+ long firstTxId, int numTxns,
+ byte[] records) throws IOException {
verifyRequest(nnReg);
if(!nnRpcAddress.equals(nnReg.getAddress()))
throw new IOException("Journal request from unexpected name-node: "
+ nnReg.getAddress() + " expecting " + nnRpcAddress);
- BackupImage bnImage = (BackupImage)getFSImage();
- switch(jAction) {
- case (int)JA_IS_ALIVE:
- return;
- case (int)JA_JOURNAL:
- bnImage.journal(length, args);
- return;
- case (int)JA_JSPOOL_START:
- bnImage.startJournalSpool(nnReg);
- return;
- default:
- throw new IOException("Unexpected journal action: " + jAction);
- }
+ getBNImage().journal(firstTxId, numTxns, records);
+ }
+
+ @Override
+ public void startLogSegment(NamenodeRegistration registration, long txid)
+ throws IOException {
+ verifyRequest(registration);
+
+ getBNImage().namenodeStartedLogSegment(txid);
+ }
+
+ //////////////////////////////////////////////////////
+
+
+ BackupImage getBNImage() {
+ return (BackupImage)getFSImage();
}
boolean shouldCheckpointAtStartup() {
@@ -234,9 +252,9 @@ public class BackupNode extends NameNode
assert fsImage.getStorage().getNumStorageDirs() > 0;
return ! fsImage.getStorage().getStorageDir(0).getVersionFile().exists();
}
- if(namesystem == null || namesystem.dir == null || getFSImage() == null)
- return true;
- return false; // TODO fsImage.getEditLog().getNumJournals() == 0;
+
+ // BN always checkpoints on startup in order to get in sync with namespace
+ return true;
}
private NamespaceInfo handshake(Configuration conf) throws IOException {
@@ -287,14 +305,15 @@ public class BackupNode extends NameNode
*/
private void registerWith(NamespaceInfo nsInfo) throws IOException {
BackupImage bnImage = (BackupImage)getFSImage();
+ NNStorage storage = bnImage.getStorage();
// verify namespaceID
- if(bnImage.getStorage().getNamespaceID() == 0) // new backup storage
- bnImage.getStorage().setStorageInfo(nsInfo);
- else if(bnImage.getStorage().getNamespaceID() != nsInfo.getNamespaceID())
- throw new IOException("Incompatible namespaceIDs"
- + ": active node namespaceID = " + nsInfo.getNamespaceID()
- + "; backup node namespaceID = " + bnImage.getStorage().getNamespaceID());
-
+ if (storage.getNamespaceID() == 0) { // new backup storage
+ storage.setStorageInfo(nsInfo);
+ storage.setBlockPoolID(nsInfo.getBlockPoolID());
+ storage.setClusterID(nsInfo.getClusterID());
+ } else {
+ nsInfo.validateStorage(storage);
+ }
setRegistration();
NamenodeRegistration nnReg = null;
while(!isStopRequested()) {
@@ -323,14 +342,6 @@ public class BackupNode extends NameNode
nnRpcAddress = nnReg.getAddress();
}
- /**
- * Reset node namespace state in memory and in storage directories.
- * @throws IOException
- */
- void resetNamespace() throws IOException {
- ((BackupImage)getFSImage()).reset();
- }
-
// TODO: move to a common with DataNode util class
private static NamespaceInfo handshake(NamenodeProtocol namenode)
throws IOException, SocketTimeoutException {
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java Thu Jul 14 18:53:11 2011
@@ -110,7 +110,6 @@ public class CheckpointSignature extends
|| cTime != si.getStorage().cTime
|| !clusterID.equals(si.getClusterID())
|| !blockpoolID.equals(si.getBlockPoolID())) {
- // checkpointTime can change when the image is saved - do not compare
throw new IOException("Inconsistent checkpoint fields.\n"
+ "LV = " + layoutVersion + " namespaceID = " + namespaceID
+ " cTime = " + cTime
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Thu Jul 14 18:53:11 2011
@@ -17,30 +17,27 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import java.io.IOException;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
import java.io.File;
+import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.util.Daemon;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
/**
* The Checkpointer is responsible for supporting periodic checkpoints
@@ -80,6 +77,7 @@ class Checkpointer extends Daemon {
try {
initialize(conf);
} catch(IOException e) {
+ LOG.warn("Checkpointer got exception", e);
shutdown();
throw e;
}
@@ -134,8 +132,9 @@ class Checkpointer extends Daemon {
periodMSec *= 1000;
long lastCheckpointTime = 0;
- if(!backupNode.shouldCheckpointAtStartup())
+ if (!backupNode.shouldCheckpointAtStartup()) {
lastCheckpointTime = now();
+ }
while(shouldRun) {
try {
long now = now();
@@ -175,44 +174,15 @@ class Checkpointer extends Daemon {
}
/**
- * Download <code>fsimage</code> and <code>edits</code>
- * files from the remote name-node.
- */
- private void downloadCheckpoint(CheckpointSignature sig) throws IOException {
- String nnHttpAddr = backupNode.nnHttpAddress;
-
- // Retrieve image file
- MD5Hash hash = TransferFsImage.downloadImageToStorage(
- nnHttpAddr, sig.mostRecentCheckpointTxId,
- getFSImage().getStorage(), true);
- getFSImage().saveDigestAndRenameCheckpointImage(sig.mostRecentCheckpointTxId, hash);
-
- // Retrieve edits file
- // TODO!
- }
-
- /**
- * Copy the new image into remote name-node.
- */
- private void uploadCheckpoint(CheckpointSignature sig) throws IOException {
- // TODO: checkpoint node disabled in 1073 branch
-/* // Use the exact http addr as specified in config to deal with ip aliasing
- InetSocketAddress httpSocAddr = backupNode.getHttpAddress();
- int httpPort = httpSocAddr.getPort();
- String fileid = "putimage=1&port=" + httpPort +
- "&machine=" + infoBindAddress +
- "&token=" + sig.toString();
- LOG.info("Posted URL " + backupNode.nnHttpAddress + fileid);
- TransferFsImage.getFileClient(backupNode.nnHttpAddress,
- fileid, null, false);
- */
- }
-
- /**
* Create a new checkpoint
*/
void doCheckpoint() throws IOException {
+ BackupImage bnImage = getFSImage();
+ NNStorage bnStorage = bnImage.getStorage();
+
long startTime = now();
+ bnImage.freezeNamespaceAtNextRoll();
+
NamenodeCommand cmd =
getNamenode().startCheckpoint(backupNode.getRegistration());
CheckpointCommand cpCmd = null;
@@ -228,36 +198,76 @@ class Checkpointer extends Daemon {
throw new IOException("Unsupported NamenodeCommand: "+cmd.getAction());
}
+ bnImage.waitUntilNamespaceFrozen();
+
CheckpointSignature sig = cpCmd.getSignature();
- assert FSConstants.LAYOUT_VERSION == sig.getLayoutVersion() :
- "Signature should have current layout version. Expected: "
- + FSConstants.LAYOUT_VERSION + " actual "+ sig.getLayoutVersion();
- assert !backupNode.isRole(NamenodeRole.CHECKPOINT) ||
- cpCmd.isImageObsolete() : "checkpoint node should always download image.";
- if(cpCmd.isImageObsolete()) {
- // First reset storage on disk and memory state
- backupNode.resetNamespace();
- downloadCheckpoint(sig);
- }
- BackupImage bnImage = getFSImage();
- bnImage.getStorage().setBlockPoolID(backupNode.getBlockPoolId());
- bnImage.getStorage().setClusterID(backupNode.getClusterId());
- bnImage.loadCheckpoint(sig);
+ // Make sure we're talking to the same NN!
sig.validateStorageInfo(bnImage);
- bnImage.saveCheckpoint();
- if(cpCmd.needToReturnImage())
- uploadCheckpoint(sig);
+ long lastApplied = bnImage.getLastAppliedTxId();
+ LOG.debug("Doing checkpoint. Last applied: " + lastApplied);
+ RemoteEditLogManifest manifest =
+ getNamenode().getEditLogManifest(bnImage.getLastAppliedTxId());
+
+ if (!manifest.getLogs().isEmpty()) {
+ RemoteEditLog firstRemoteLog = manifest.getLogs().get(0);
+ // we don't have enough logs to roll forward using only logs. Need
+ // to download and load the image.
+ if (firstRemoteLog.getStartTxId() > lastApplied + 1) {
+ LOG.info("Unable to roll forward using only logs. Downloading " +
+ "image with txid " + sig.mostRecentCheckpointTxId);
+ MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
+ backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId,
+ bnStorage, true);
+ bnImage.saveDigestAndRenameCheckpointImage(
+ sig.mostRecentCheckpointTxId, downloadedHash);
+
+ LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
+ File file = bnStorage.findImageFile(sig.mostRecentCheckpointTxId);
+ bnImage.reloadFromImageFile(file);
+ }
+
+ lastApplied = bnImage.getLastAppliedTxId();
+ if (firstRemoteLog.getStartTxId() > lastApplied + 1) {
+ throw new IOException("No logs to roll forward from " + lastApplied);
+ }
+
+ // get edits files
+ for (RemoteEditLog log : manifest.getLogs()) {
+ TransferFsImage.downloadEditsToStorage(
+ backupNode.nnHttpAddress, log, bnStorage);
+ }
+
+ SecondaryNameNode.rollForwardByApplyingLogs(manifest, bnImage);
+ }
+
+ long txid = bnImage.getLastAppliedTxId();
+ bnImage.saveFSImageInAllDirs(txid);
+ bnStorage.writeAll();
+
+ if(cpCmd.needToReturnImage()) {
+ TransferFsImage.uploadImageFromStorage(
+ backupNode.nnHttpAddress, getImageListenAddress(),
+ bnStorage, txid);
+ }
getNamenode().endCheckpoint(backupNode.getRegistration(), sig);
- bnImage.convergeJournalSpool();
+ if (backupNode.getRole() == NamenodeRole.BACKUP) {
+ bnImage.convergeJournalSpool();
+ }
backupNode.setRegistration(); // keep registration up to date
- if(backupNode.isRole(NamenodeRole.CHECKPOINT))
- getFSImage().getEditLog().close();
+
+ long imageSize = bnImage.getStorage().getFsImageName(txid).length();
LOG.info("Checkpoint completed in "
+ (now() - startTime)/1000 + " seconds."
- + " New Image Size: " + bnImage.getStorage().getFsImageName(0 /* TODO */).length());
+ + " New Image Size: " + imageSize);
+ }
+
+ private InetSocketAddress getImageListenAddress() {
+ InetSocketAddress httpSocAddr = backupNode.getHttpAddress();
+ int httpPort = httpSocAddr.getPort();
+ return new InetSocketAddress(infoBindAddress, httpPort);
}
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Thu Jul 14 18:53:11 2011
@@ -17,14 +17,14 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.BackupNodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
@@ -42,7 +42,7 @@ import org.apache.hadoop.net.NetUtils;
class EditLogBackupOutputStream extends EditLogOutputStream {
static int DEFAULT_BUFFER_SIZE = 256;
- private NamenodeProtocol backupNode; // RPC proxy to backup node
+ private BackupNodeProtocol backupNode; // RPC proxy to backup node
private NamenodeRegistration bnRegistration; // backup node registration
private NamenodeRegistration nnRegistration; // active node registration
private ArrayList<JournalRecord> bufCurrent; // current buffer for writing
@@ -60,13 +60,8 @@ class EditLogBackupOutputStream extends
this.args = writables;
}
- void write(DataOutputStream out) throws IOException {
- out.write(op);
- out.writeLong(txid);
- if(args == null)
- return;
- for(Writable w : args)
- w.write(out);
+ void write(DataOutputBuffer out) throws IOException {
+ writeChecksummedOp(out, op, txid, args);
}
}
@@ -81,8 +76,8 @@ class EditLogBackupOutputStream extends
Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress);
try {
this.backupNode =
- (NamenodeProtocol) RPC.getProxy(NamenodeProtocol.class,
- NamenodeProtocol.versionID, bnAddress, new HdfsConfiguration());
+ (BackupNodeProtocol) RPC.getProxy(BackupNodeProtocol.class,
+ BackupNodeProtocol.versionID, bnAddress, new HdfsConfiguration());
} catch(IOException e) {
Storage.LOG.error("Error connecting to: " + bnAddress, e);
throw e;
@@ -91,7 +86,7 @@ class EditLogBackupOutputStream extends
this.bufReady = new ArrayList<JournalRecord>();
this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
}
-
+
@Override // JournalStream
public String getName() {
return bnRegistration.getAddress();
@@ -156,23 +151,14 @@ class EditLogBackupOutputStream extends
@Override // EditLogOutputStream
protected void flushAndSync() throws IOException {
assert out.size() == 0 : "Output buffer is not empty";
- int bufReadySize = bufReady.size();
- for(int idx = 0; idx < bufReadySize; idx++) {
- JournalRecord jRec = null;
- for(; idx < bufReadySize; idx++) {
- jRec = bufReady.get(idx);
- if(jRec.op >= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode())
- break; // special operation should be sent in a separate call to BN
- jRec.write(out);
- }
- if(out.size() > 0)
- send(NamenodeProtocol.JA_JOURNAL);
- if(idx == bufReadySize)
- break;
- // operation like start journal spool or increment checkpoint time
- // is a separate call to BN
+ for (JournalRecord jRec : bufReady) {
jRec.write(out);
- send(jRec.op);
+ }
+ if (out.size() > 0) {
+ byte[] data = Arrays.copyOf(out.getData(), out.getLength());
+ backupNode.journal(nnRegistration,
+ bufReady.get(0).txid, bufReady.size(),
+ data);
}
bufReady.clear(); // erase all data in the buffer
out.reset(); // reset buffer to the start position
@@ -188,16 +174,6 @@ class EditLogBackupOutputStream extends
return 0;
}
- private void send(int ja) throws IOException {
- try {
- int length = out.getLength();
- out.write(FSEditLogOpCodes.OP_INVALID.getOpCode());
- backupNode.journal(nnRegistration, ja, length, out.getData());
- } finally {
- out.reset();
- }
- }
-
/**
* Get backup node registration.
*/
@@ -205,17 +181,7 @@ class EditLogBackupOutputStream extends
return bnRegistration;
}
- /**
- * Verify that the backup node is alive.
- */
- boolean isAlive() {
- try {
- send(NamenodeProtocol.JA_IS_ALIVE);
- } catch(IOException ei) {
- Storage.LOG.info(bnRegistration.getRole() + " "
- + bnRegistration.getAddress() + " is not alive. ", ei);
- return false;
- }
- return true;
+ void startLogSegment(long txId) throws IOException {
+ backupNode.startLogSegment(nnRegistration, txId);
}
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Thu Jul 14 18:53:11 2011
@@ -72,5 +72,10 @@ class EditLogFileInputStream extends Edi
// file size + size of both buffers
return file.length();
}
+
+ @Override
+ public String toString() {
+ return getName();
+ }
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Thu Jul 14 18:53:11 2011
@@ -111,21 +111,9 @@ class EditLogFileOutputStream extends Ed
if (fp == null) {
throw new IOException("Trying to use aborted output stream");
}
- int start = bufCurrent.getLength();
- write(op);
- bufCurrent.writeLong(txid);
- for (Writable w : writables) {
- w.write(bufCurrent);
- }
- // write transaction checksum
- int end = bufCurrent.getLength();
- Checksum checksum = FSEditLog.getChecksum();
- checksum.reset();
- checksum.update(bufCurrent.getData(), start, end-start);
- int sum = (int)checksum.getValue();
- bufCurrent.writeInt(sum);
+ writeChecksummedOp(bufCurrent, op, txid, writables);
}
-
+
@Override
void write(final byte[] data, int off, int len) throws IOException {
bufCurrent.write(data, off, len);
@@ -151,7 +139,7 @@ class EditLogFileOutputStream extends Ed
setReadyToFlush();
flush();
-
+
// close should have been called after all pending transactions
// have been flushed & synced.
// if already closed, just skip
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Thu Jul 14 18:53:11 2011
@@ -19,8 +19,11 @@ package org.apache.hadoop.hdfs.server.na
import java.io.IOException;
import java.io.OutputStream;
+import java.util.zip.Checksum;
import static org.apache.hadoop.hdfs.server.common.Util.now;
+
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
/**
@@ -135,5 +138,25 @@ implements JournalStream {
return getName();
}
-
+ /**
+ * Write the given operation to the specified buffer, including
+ * the transaction ID and checksum.
+ */
+ protected static void writeChecksummedOp(
+ DataOutputBuffer buf, byte op, long txid, Writable... writables)
+ throws IOException {
+ int start = buf.getLength();
+ buf.write(op);
+ buf.writeLong(txid);
+ for (Writable w : writables) {
+ w.write(buf);
+ }
+ // write transaction checksum
+ int end = buf.getLength();
+ Checksum checksum = FSEditLog.getChecksum();
+ checksum.reset();
+ checksum.update(buf.getData(), start, end-start);
+ int sum = (int)checksum.getValue();
+ buf.writeInt(sum);
+ }
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Jul 14 18:53:11 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
+import java.util.Iterator;
import java.util.List;
import java.util.zip.Checksum;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.Deprecated
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
@@ -151,7 +153,7 @@ public class FSEditLog {
/**
* Initialize the list of edit journals
*/
- private void initJournals() {
+ synchronized void initJournals() {
assert journals.isEmpty();
Preconditions.checkState(state == State.UNINITIALIZED,
"Bad state: %s", state);
@@ -167,10 +169,6 @@ public class FSEditLog {
state = State.BETWEEN_LOG_SEGMENTS;
}
- private int getNumEditsDirs() {
- return storage.getNumStorageDirs(NameNodeDirType.EDITS);
- }
-
/**
* Initialize the output stream for logging, opening the first
* log segment.
@@ -179,7 +177,7 @@ public class FSEditLog {
Preconditions.checkState(state == State.UNINITIALIZED);
initJournals();
- startLogSegment(getLastWrittenTxId() + 1);
+ startLogSegment(getLastWrittenTxId() + 1, true);
assert state == State.IN_SEGMENT : "Bad state: " + state;
}
@@ -199,7 +197,7 @@ public class FSEditLog {
if (state == State.IN_SEGMENT) {
assert !journals.isEmpty();
waitForSyncToFinish();
- endCurrentLogSegment();
+ endCurrentLogSegment(true);
}
state = State.CLOSED;
@@ -335,11 +333,12 @@ public class FSEditLog {
* Set the transaction ID to use for the next transaction written.
*/
synchronized void setNextTxId(long nextTxId) {
- assert synctxid <= txid &&
- nextTxId >= txid : "May not decrease txid." +
- " synctxid=" + synctxid +
- " txid=" + txid +
- " nextTxid=" + nextTxId;
+ Preconditions.checkArgument(synctxid <= txid &&
+ nextTxId >= txid,
+ "May not decrease txid." +
+ " synctxid=%s txid=%s nextTxId=%s",
+ synctxid, txid, nextTxId);
+
txid = nextTxId - 1;
}
@@ -470,7 +469,8 @@ public class FSEditLog {
if (badJournals.size() >= journals.size()) {
LOG.fatal("Could not sync any journal to persistent storage. " +
- "Unsynced transactions: " + (txid - synctxid));
+ "Unsynced transactions: " + (txid - synctxid),
+ new Exception());
runtime.exit(1);
}
} finally {
@@ -726,36 +726,6 @@ public class FSEditLog {
}
/**
- * Return the size of the current EditLog
- */
- // TODO who uses this, does it make sense with transactions?
- synchronized long getEditLogSize() throws IOException {
- assert getNumEditsDirs() <= journals.size() :
- "Number of edits directories should not exceed the number of streams.";
- long size = -1;
-
- List<JournalAndStream> badJournals = Lists.newArrayList();
-
- for (JournalAndStream j : journals) {
- if (!j.isActive()) continue;
- EditLogOutputStream es = j.getCurrentStream();
- try {
- long curSize = es.length();
- assert (size == 0 || size == curSize || curSize ==0) :
- "Wrong streams size";
- size = Math.max(size, curSize);
- } catch (IOException e) {
- LOG.error("getEditLogSize: editstream.length failed. removing journal " + j, e);
- badJournals.add(j);
- }
- }
- disableAndReportErrorOnJournals(badJournals);
-
- assert size != -1;
- return size;
- }
-
- /**
* Used only by unit tests.
*/
@VisibleForTesting
@@ -793,10 +763,10 @@ public class FSEditLog {
*/
synchronized long rollEditLog() throws IOException {
LOG.info("Rolling edit logs.");
- endCurrentLogSegment();
+ endCurrentLogSegment(true);
long nextTxId = getLastWrittenTxId() + 1;
- startLogSegment(nextTxId);
+ startLogSegment(nextTxId, true);
assert curSegmentTxId == nextTxId;
return nextTxId;
@@ -806,14 +776,20 @@ public class FSEditLog {
* Start writing to the log segment with the given txid.
* Transitions from BETWEEN_LOG_SEGMENTS state to IN_LOG_SEGMENT state.
*/
- synchronized void startLogSegment(final long txId) {
- LOG.info("Starting log segment at " + txId);
+ synchronized void startLogSegment(final long segmentTxId,
+ boolean writeHeaderTxn) {
+ LOG.info("Starting log segment at " + segmentTxId);
+ Preconditions.checkArgument(segmentTxId > 0,
+ "Bad txid: %s", segmentTxId);
Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
"Bad state: %s", state);
- Preconditions.checkState(txId > curSegmentTxId,
- "Cannot start writing to log segment " + txId +
+ Preconditions.checkState(segmentTxId > curSegmentTxId,
+ "Cannot start writing to log segment " + segmentTxId +
" when previous log segment started at " + curSegmentTxId);
- curSegmentTxId = txId;
+ Preconditions.checkArgument(segmentTxId == txid + 1,
+ "Cannot start log segment at txid %s when next expected " +
+ "txid is %s", segmentTxId, txid + 1);
+ curSegmentTxId = segmentTxId;
numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
@@ -823,26 +799,32 @@ public class FSEditLog {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
- jas.startLogSegment(txId);
+ jas.startLogSegment(segmentTxId);
}
- }, "starting log segment " + txId);
+ }, "starting log segment " + segmentTxId);
state = State.IN_SEGMENT;
- logEdit(FSEditLogOpCodes.OP_START_LOG_SEGMENT);
- logSync();
+ if (writeHeaderTxn) {
+ logEdit(FSEditLogOpCodes.OP_START_LOG_SEGMENT);
+ logSync();
+ }
}
/**
* Finalize the current log segment.
* Transitions from IN_SEGMENT state to BETWEEN_LOG_SEGMENTS state.
*/
- synchronized void endCurrentLogSegment() {
+ synchronized void endCurrentLogSegment(boolean writeEndTxn) {
LOG.info("Ending log segment " + curSegmentTxId);
Preconditions.checkState(state == State.IN_SEGMENT,
"Bad state: %s", state);
- logEdit(FSEditLogOpCodes.OP_END_LOG_SEGMENT);
- waitForSyncToFinish();
+
+ if (writeEndTxn) {
+ logEdit(FSEditLogOpCodes.OP_END_LOG_SEGMENT);
+ logSync();
+ }
+
printStatistics(true);
final long lastTxId = getLastWrittenTxId();
@@ -858,6 +840,20 @@ public class FSEditLog {
state = State.BETWEEN_LOG_SEGMENTS;
}
+
+ /**
+ * Abort all current logs. Called from the backup node.
+ */
+ synchronized void abortCurrentLogSegment() {
+ mapJournalsAndReportErrors(new JournalClosure() {
+
+ @Override
+ public void apply(JournalAndStream jas) throws IOException {
+ jas.abort();
+ }
+ }, "aborting all streams");
+ state = State.BETWEEN_LOG_SEGMENTS;
+ }
/**
* Archive any log files that are older than the given txid.
@@ -910,36 +906,62 @@ public class FSEditLog {
/**
* Create (or find if already exists) an edit output stream, which
* streams journal records (edits) to the specified backup node.<br>
- * Send a record, prescribing to start journal spool.<br>
- * This should be sent via regular stream of journal records so that
- * the backup node new exactly after which record it should start spooling.
+ *
+ * The new BackupNode will start receiving edits the next time this
+ * NameNode's logs roll.
*
* @param bnReg the backup node registration information.
* @param nnReg this (active) name-node registration.
* @throws IOException
*/
- synchronized void logJSpoolStart(NamenodeRegistration bnReg, // backup node
- NamenodeRegistration nnReg) // active name-node
+ synchronized void registerBackupNode(
+ NamenodeRegistration bnReg, // backup node
+ NamenodeRegistration nnReg) // active name-node
throws IOException {
- /*
if(bnReg.isRole(NamenodeRole.CHECKPOINT))
return; // checkpoint node does not stream edits
- if(editStreams == null)
- editStreams = new ArrayList<EditLogOutputStream>();
- EditLogOutputStream boStream = null;
- for(EditLogOutputStream eStream : editStreams) {
- if(eStream.getName().equals(bnReg.getAddress())) {
- boStream = eStream; // already there
- break;
+
+ JournalAndStream jas = findBackupJournalAndStream(bnReg);
+ if (jas != null) {
+ // already registered
+ LOG.info("Backup node " + bnReg + " re-registers");
+ return;
+ }
+
+ LOG.info("Registering new backup node: " + bnReg);
+ BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
+ journals.add(new JournalAndStream(bjm));
+ }
+
+ synchronized void releaseBackupStream(NamenodeRegistration registration) {
+ for (Iterator<JournalAndStream> iter = journals.iterator();
+ iter.hasNext();) {
+ JournalAndStream jas = iter.next();
+ if (jas.manager instanceof BackupJournalManager &&
+ ((BackupJournalManager)jas.manager).matchesRegistration(
+ registration)) {
+ jas.abort();
+ LOG.info("Removing backup journal " + jas);
+ iter.remove();
+ }
+ }
+ }
+
+ /**
+ * Find the JournalAndStream associated with this BackupNode.
+ * @return null if it cannot be found
+ */
+ private synchronized JournalAndStream findBackupJournalAndStream(
+ NamenodeRegistration bnReg) {
+ for (JournalAndStream jas : journals) {
+ if (jas.manager instanceof BackupJournalManager) {
+ BackupJournalManager bjm = (BackupJournalManager)jas.manager;
+ if (bjm.matchesRegistration(bnReg)) {
+ return jas;
+ }
}
}
- if(boStream == null) {
- boStream = new EditLogBackupOutputStream(bnReg, nnReg);
- editStreams.add(boStream);
- }
- logEdit(OP_JSPOOL_START, (Writable[])null);
- TODO: backupnode is disabled
- */
+ return null;
}
/**
@@ -961,62 +983,6 @@ public class FSEditLog {
endTransaction(start);
}
- synchronized void releaseBackupStream(NamenodeRegistration registration) {
- /*
- Iterator<EditLogOutputStream> it =
- getOutputStreamIterator(JournalType.BACKUP);
- ArrayList<EditLogOutputStream> errorStreams = null;
- NamenodeRegistration backupNode = null;
- while(it.hasNext()) {
- EditLogBackupOutputStream eStream = (EditLogBackupOutputStream)it.next();
- backupNode = eStream.getRegistration();
- if(backupNode.getAddress().equals(registration.getAddress()) &&
- backupNode.isRole(registration.getRole())) {
- errorStreams = new ArrayList<EditLogOutputStream>(1);
- errorStreams.add(eStream);
- break;
- }
- }
- assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
- "Not a backup node corresponds to a backup stream";
- disableAndReportErrorOnJournals(errorStreams);
- TODO BN currently disabled
- */
- }
-
- synchronized boolean checkBackupRegistration(
- NamenodeRegistration registration) {
- /*
- Iterator<EditLogOutputStream> it =
- getOutputStreamIterator(JournalType.BACKUP);
- boolean regAllowed = !it.hasNext();
- NamenodeRegistration backupNode = null;
- ArrayList<EditLogOutputStream> errorStreams = null;
- while(it.hasNext()) {
- EditLogBackupOutputStream eStream = (EditLogBackupOutputStream)it.next();
- backupNode = eStream.getRegistration();
- if(backupNode.getAddress().equals(registration.getAddress()) &&
- backupNode.isRole(registration.getRole())) {
- regAllowed = true; // same node re-registers
- break;
- }
- if(!eStream.isAlive()) {
- if(errorStreams == null)
- errorStreams = new ArrayList<EditLogOutputStream>(1);
- errorStreams.add(eStream);
- regAllowed = true; // previous backup node failed
- }
- }
- assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
- "Not a backup node corresponds to a backup stream";
- disableAndReportErrorOnJournals(errorStreams);
- return regAllowed;
-
- TODO BN currently disabled
- */
- return false;
- }
-
static BytesWritable toBytesWritable(Options.Rename... options) {
byte[] bytes = new byte[options.length];
for (int i = 0; i < options.length; i++) {
@@ -1061,12 +1027,7 @@ public class FSEditLog {
for (JournalAndStream j : badJournals) {
LOG.error("Disabling journal " + j);
- try {
- j.abort();
- } catch (IOException ioe) {
- LOG.warn("Failed to abort faulty journal " + j
- + " before removing it (might be OK)", ioe);
- }
+ j.abort();
}
}
@@ -1093,13 +1054,17 @@ public class FSEditLog {
}
public void close(long lastTxId) throws IOException {
+ Preconditions.checkArgument(lastTxId >= segmentStartsAtTxId,
+ "invalid segment: lastTxId %s >= " +
+ "segment starting txid %s", lastTxId, segmentStartsAtTxId);
+
if (stream == null) return;
stream.close();
manager.finalizeLogSegment(segmentStartsAtTxId, lastTxId);
stream = null;
}
- public void abort() throws IOException {
+ public void abort() {
if (stream == null) return;
try {
stream.abort();
@@ -1133,5 +1098,30 @@ public class FSEditLog {
JournalManager getManager() {
return manager;
}
+
+ public EditLogInputStream getInProgressInputStream() throws IOException {
+ return manager.getInProgressInputStream(segmentStartsAtTxId);
+ }
+ }
+
+ /**
+ * @return an EditLogInputStream that reads from the same log that
+ * the edit log is currently writing. This is used from the BackupNode
+ * during edits synchronization.
+ * @throws IOException if no valid logs are available.
+ */
+ synchronized EditLogInputStream getInProgressFileInputStream()
+ throws IOException {
+ for (JournalAndStream jas : journals) {
+ if (!jas.isActive()) continue;
+ try {
+ EditLogInputStream in = jas.getInProgressInputStream();
+ if (in != null) return in;
+ } catch (IOException ioe) {
+ LOG.warn("Unable to get the in-progress input stream from " + jas,
+ ioe);
+ }
+ }
+ throw new IOException("No in-progress stream provided edits");
}
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Thu Jul 14 18:53:11 2011
@@ -21,16 +21,13 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,7 +42,6 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.Util;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
@@ -74,14 +70,17 @@ import com.google.common.collect.Lists;
public class FSImage implements Closeable {
protected static final Log LOG = LogFactory.getLog(FSImage.class.getName());
- // checkpoint states
- enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; }
-
protected FSNamesystem namesystem = null;
protected FSEditLog editLog = null;
private boolean isUpgradeFinalized = false;
protected NNStorage storage;
+
+ /**
+ * The last transaction ID that was either loaded from an image
+ * or loaded by loading edits files.
+ */
+ protected long lastAppliedTxId = 0;
/**
* URIs for importing an image from a checkpoint. In the default case,
@@ -92,10 +91,6 @@ public class FSImage implements Closeabl
final private Configuration conf;
- /**
- * Can fs-image be rolled?
- */
- volatile protected CheckpointStates ckptState = FSImage.CheckpointStates.START;
private final NNStorageArchivalManager archivalManager;
/**
@@ -559,6 +554,18 @@ public class FSImage implements Closeabl
editLog.open();
storage.writeTransactionIdFileToStorage(editLog.getCurSegmentTxId());
};
+
+ /**
+ * Toss the current image and namesystem, reloading from the specified
+ * file.
+ */
+ void reloadFromImageFile(File file) throws IOException {
+ // TODO: namesystem.close(); ??
+ namesystem.dir.reset();
+
+ LOG.debug("Reloading namespace from " + file);
+ loadFSImage(file);
+ }
/**
* Choose latest image from one of the directories,
@@ -626,8 +633,12 @@ public class FSImage implements Closeabl
} catch (IOException ioe) {
throw new IOException("Failed to load image from " + loadPlan.getImageFile(), ioe);
}
-
- needToSave |= loadEdits(loadPlan.getEditsFiles());
+
+ long numLoaded = loadEdits(loadPlan.getEditsFiles());
+ needToSave |= numLoaded > 0;
+
+ // update the txid for the edit log
+ editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
/* TODO(todd) Need to discuss whether we should force a re-save
* of the image if one of the edits or images has an old format
@@ -640,13 +651,14 @@ public class FSImage implements Closeabl
/**
* Load the specified list of edit files into the image.
- * @return true if the image should be re-saved
+ * @return the number of transactions loaded
*/
- protected boolean loadEdits(List<File> editLogs) throws IOException {
+ protected long loadEdits(List<File> editLogs) throws IOException {
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editLogs));
-
+
+ long startingTxId = getLastAppliedTxId() + 1;
+
FSEditLogLoader loader = new FSEditLogLoader(namesystem);
- long startingTxId = storage.getMostRecentCheckpointTxId() + 1;
int numLoaded = 0;
// Load latest edits
for (File edits : editLogs) {
@@ -655,17 +667,13 @@ public class FSImage implements Closeabl
int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
startingTxId += thisNumLoaded;
numLoaded += thisNumLoaded;
+ lastAppliedTxId += thisNumLoaded;
editIn.close();
}
// update the counts
getFSNamesystem().dir.updateCountForINodeWithQuota();
-
- // update the txid for the edit log
- editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
-
- // If we loaded any edits, need to save.
- return numLoaded > 0;
+ return numLoaded;
}
@@ -673,7 +681,7 @@ public class FSImage implements Closeabl
* Load the image namespace from the given image file, verifying
* it against the MD5 sum stored in its associated .md5 file.
*/
- void loadFSImage(File imageFile) throws IOException {
+ private void loadFSImage(File imageFile) throws IOException {
MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
if (expectedMD5 == null) {
throw new IOException("No MD5 file found corresponding to image file "
@@ -687,7 +695,7 @@ public class FSImage implements Closeabl
* filenames and blocks. Return whether we should
* "re-save" and consolidate the edit-logs
*/
- void loadFSImage(File curFile, MD5Hash expectedMd5) throws IOException {
+ private void loadFSImage(File curFile, MD5Hash expectedMd5) throws IOException {
FSImageFormat.Loader loader = new FSImageFormat.Loader(
conf, getFSNamesystem());
loader.load(curFile);
@@ -704,8 +712,9 @@ public class FSImage implements Closeabl
}
long txId = loader.getLoadedImageTxId();
+ LOG.info("Loaded image for txid " + txId + " from " + curFile);
+ lastAppliedTxId = txId;
storage.setMostRecentCheckpointTxId(txId);
- editLog.setNextTxId(txId + 1);
}
@@ -718,10 +727,10 @@ public class FSImage implements Closeabl
FSImageFormat.Saver saver = new FSImageFormat.Saver();
FSImageCompression compression = FSImageCompression.createCompression(conf);
- saver.save(newFile, getFSNamesystem(), compression);
+ saver.save(newFile, txid, getFSNamesystem(), compression);
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
- storage.setMostRecentCheckpointTxId(editLog.getLastWrittenTxId());
+ storage.setMostRecentCheckpointTxId(txid);
}
/**
@@ -784,7 +793,7 @@ public class FSImage implements Closeabl
boolean editLogWasOpen = editLog.isOpen();
if (editLogWasOpen) {
- editLog.endCurrentLogSegment();
+ editLog.endCurrentLogSegment(true);
}
long imageTxId = editLog.getLastWrittenTxId();
try {
@@ -793,7 +802,7 @@ public class FSImage implements Closeabl
} finally {
if (editLogWasOpen) {
- editLog.startLogSegment(imageTxId + 1);
+ editLog.startLogSegment(imageTxId + 1, true);
// Take this opportunity to note the current transaction
storage.writeTransactionIdFileToStorage(imageTxId + 1);
}
@@ -951,7 +960,6 @@ public class FSImage implements Closeabl
// do not return image if there are no image directories
needToReturnImg = false;
CheckpointSignature sig = rollEditLog();
- getEditLog().logJSpoolStart(bnReg, nnReg);
return new CheckpointCommand(sig, isImgObsolete, needToReturnImg);
}
@@ -1003,7 +1011,9 @@ public class FSImage implements Closeabl
}
synchronized public void close() throws IOException {
- getEditLog().close();
+ if (editLog != null) { // 2NN doesn't have any edit log
+ getEditLog().close();
+ }
storage.close();
}
@@ -1054,4 +1064,8 @@ public class FSImage implements Closeabl
public String getBlockPoolID() {
return storage.getBlockPoolID();
}
+
+ public synchronized long getLastAppliedTxId() {
+ return lastAppliedTxId;
+ }
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Thu Jul 14 18:53:11 2011
@@ -539,6 +539,7 @@ class FSImageFormat {
}
void save(File newFile,
+ long txid,
FSNamesystem sourceNamesystem,
FSImageCompression compression)
throws IOException {
@@ -559,7 +560,6 @@ class FSImageFormat {
.getStorage().getNamespaceID()); // TODO bad dependency
out.writeLong(fsDir.rootDir.numItemsInTree());
out.writeLong(sourceNamesystem.getGenerationStamp());
- long txid = sourceNamesystem.getEditLog().getLastWrittenTxId();
out.writeLong(txid);
// write compression info and set up compressed stream
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java Thu Jul 14 18:53:11 2011
@@ -106,14 +106,15 @@ class FSImageTransactionalStorageInspect
"not configured to contain images.");
}
}
+ }
+
- // Check for a seen_txid file, which marks a minimum transaction ID that
- // must be included in our load plan.
- try {
- maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
- } catch (IOException ioe) {
- LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
- }
+ // Check for a seen_txid file, which marks a minimum transaction ID that
+ // must be included in our load plan.
+ try {
+ maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
+ } catch (IOException ioe) {
+ LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
}
List<FoundEditLog> editLogs = matchEditLogs(filesInStorage);
@@ -215,14 +216,45 @@ class FSImageTransactionalStorageInspect
}
FoundFSImage recoveryImage = getLatestImage();
- long expectedTxId = recoveryImage.txId + 1;
+ LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE);
+
+ return new TransactionalLoadPlan(recoveryImage,
+ logPlan);
+ }
+
+ /**
+ * Plan which logs to load in order to bring the namespace up-to-date.
+ * Transactions will be considered in the range (sinceTxId, maxTxId]
+ *
+ * @param sinceTxId the highest txid that is already loaded
+ * (eg from the image checkpoint)
+ * @param maxStartTxId ignore any log files that start after this txid
+ */
+ LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException {
+ long expectedTxId = sinceTxId + 1;
List<FoundEditLog> recoveryLogs = new ArrayList<FoundEditLog>();
- SortedMap<Long, LogGroup> usefulGroups = logGroups.tailMap(expectedTxId);
- LOG.debug("Excluded " + (logGroups.size() - usefulGroups.size()) +
- " groups of logs because they start with a txid less than image " +
- "txid " + recoveryImage.txId);
+ SortedMap<Long, LogGroup> tailGroups = logGroups.tailMap(expectedTxId);
+ if (logGroups.size() > tailGroups.size()) {
+ LOG.debug("Excluded " + (logGroups.size() - tailGroups.size()) +
+ " groups of logs because they start with a txid less than image " +
+ "txid " + sinceTxId);
+ }
+
+ SortedMap<Long, LogGroup> usefulGroups;
+ if (maxStartTxId > sinceTxId) {
+ usefulGroups = tailGroups.headMap(maxStartTxId);
+ } else {
+ usefulGroups = new TreeMap<Long, LogGroup>();
+ }
+
+ if (usefulGroups.size() > tailGroups.size()) {
+ LOG.debug("Excluded " + (tailGroups.size() - usefulGroups.size()) +
+ " groups of logs because they start with a txid higher than max " +
+ "txid " + sinceTxId);
+ }
+
for (Map.Entry<Long, LogGroup> entry : usefulGroups.entrySet()) {
long logStartTxId = entry.getKey();
@@ -251,7 +283,7 @@ class FSImageTransactionalStorageInspect
long lastLogGroupStartTxId = usefulGroups.isEmpty() ?
0 : usefulGroups.lastKey();
- if (maxSeenTxId > recoveryImage.txId &&
+ if (maxSeenTxId > sinceTxId &&
maxSeenTxId > lastLogGroupStartTxId) {
String msg = "At least one storage directory indicated it has seen a " +
"log segment starting at txid " + maxSeenTxId;
@@ -263,9 +295,10 @@ class FSImageTransactionalStorageInspect
}
throw new IOException(msg);
}
-
- return new TransactionalLoadPlan(recoveryImage, recoveryLogs,
+
+ return new LogLoadPlan(recoveryLogs,
Lists.newArrayList(usefulGroups.values()));
+
}
@Override
@@ -595,23 +628,18 @@ class FSImageTransactionalStorageInspect
static class TransactionalLoadPlan extends LoadPlan {
final FoundFSImage image;
- final List<FoundEditLog> editLogs;
- final List<LogGroup> logGroupsToRecover;
+ final LogLoadPlan logPlan;
public TransactionalLoadPlan(FoundFSImage image,
- List<FoundEditLog> editLogs,
- List<LogGroup> logGroupsToRecover) {
+ LogLoadPlan logPlan) {
super();
this.image = image;
- this.editLogs = editLogs;
- this.logGroupsToRecover = logGroupsToRecover;
+ this.logPlan = logPlan;
}
@Override
boolean doRecovery() throws IOException {
- for (LogGroup g : logGroupsToRecover) {
- g.recover();
- }
+ logPlan.doRecovery();
return false;
}
@@ -622,11 +650,7 @@ class FSImageTransactionalStorageInspect
@Override
List<File> getEditsFiles() {
- List<File> ret = new ArrayList<File>();
- for (FoundEditLog log : editLogs) {
- ret.add(log.getFile());
- }
- return ret;
+ return logPlan.getEditsFiles();
}
@Override
@@ -634,4 +658,29 @@ class FSImageTransactionalStorageInspect
return image.sd;
}
}
+
+ static class LogLoadPlan {
+ final List<FoundEditLog> editLogs;
+ final List<LogGroup> logGroupsToRecover;
+
+ LogLoadPlan(List<FoundEditLog> editLogs,
+ List<LogGroup> logGroupsToRecover) {
+ this.editLogs = editLogs;
+ this.logGroupsToRecover = logGroupsToRecover;
+ }
+
+ public void doRecovery() throws IOException {
+ for (LogGroup g : logGroupsToRecover) {
+ g.recover();
+ }
+ }
+
+ public List<File> getEditsFiles() {
+ List<File> ret = new ArrayList<File>();
+ for (FoundEditLog log : editLogs) {
+ ret.add(log.getFile());
+ }
+ return ret;
+ }
+ }
}