You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/07/14 20:53:13 UTC

svn commit: r1146845 [1/2] - in /hadoop/common/branches/HDFS-1073/hdfs: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ src/java/org/apache/hadoop/hdfs/server/protocol/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/

Author: todd
Date: Thu Jul 14 18:53:11 2011
New Revision: 1146845

URL: http://svn.apache.org/viewvc?rev=1146845&view=rev
Log:
HDFS-1979. Fix backupnode for new edits/image layout. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/BackupNodeProtocol.java
Modified:
    hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
    hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
    hadoop/common/branches/HDFS-1073/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java

Modified: hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/CHANGES.HDFS-1073.txt Thu Jul 14 18:53:11 2011
@@ -67,3 +67,4 @@ HDFS-2102. Zero-pad edits filename to ma
 HDFS-2010. Fix NameNode to exit if all edit streams become inaccessible. (atm
            via todd)
 HDFS-2123. Checkpoint interval should be based on txn count, not size. (todd)
+HDFS-1979. Fix backupnode for new edits/image layout. (todd)

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Thu Jul 14 18:53:11 2011
@@ -19,31 +19,21 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
-import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.List;
-import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogLoadPlan;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.StringUtils;
 
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogHeader;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
-import org.apache.hadoop.hdfs.util.MD5FileUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.conf.Configuration;
+import com.google.common.base.Preconditions;
 
 /**
  * Extension of FSImage for the backup node.
@@ -52,30 +42,39 @@ import org.apache.hadoop.conf.Configurat
  */
 @InterfaceAudience.Private
 public class BackupImage extends FSImage {
-  // Names of the journal spool directory and the spool file
-  private static final String STORAGE_JSPOOL_DIR = "jspool";
-  private static final String STORAGE_JSPOOL_FILE =
-    NNStorage.NameNodeFile.EDITS_NEW.getName();
-
   /** Backup input stream for loading edits into memory */
-  private EditLogBackupInputStream backupInputStream;
-
-  /** Is journal spooling in progress */
-  volatile JSpoolState jsState;
-  private long lastAppliedTxId = 0;
-
-  static enum JSpoolState {
-    OFF,
-    INPROGRESS,
-    WAIT;
+  private EditLogBackupInputStream backupInputStream =
+    new EditLogBackupInputStream("Data from remote NameNode");
+  
+  /**
+   * Current state of the BackupNode. The BackupNode's state
+   * transitions are as follows:
+   * 
+   * Initial: DROP_UNTIL_NEXT_ROLL
+   * - Transitions to JOURNAL_ONLY the next time the log rolls
+   * - Transitions to IN_SYNC in convergeJournalSpool
+   * - Transitions back to JOURNAL_ONLY if the log rolls while
+   *   stopApplyingOnNextRoll is true.
+   */
+  BNState bnState;
+  static enum BNState {
+    // Edits from the NN should be dropped. On the next log roll,
+    // transition to JOURNAL_ONLY state
+    DROP_UNTIL_NEXT_ROLL,
+    // Edits from the NN should be written to the local edits log
+    // but not applied to the namespace.
+    JOURNAL_ONLY,
+    // Edits should be written to the local edits log and applied
+    // to the local namespace.
+    IN_SYNC;
   }
 
-
   /**
-   * Place-holder for a txid that still needs to be addressed
-   * in HDFS-1073 branch before merging into trunk.
+   * Flag to indicate that the next time the NN rolls, the BN
+   * should transition from to JOURNAL_ONLY state.
+   * {@see #freezeNamespaceAtNextRoll()}
    */
-  private static final long TODO_TXID = 0xDEADBEEF;
+  private boolean stopApplyingEditsOnNextRoll = false;
 
   /**
    * Construct a backup image.
@@ -85,7 +84,8 @@ public class BackupImage extends FSImage
   BackupImage(Configuration conf) throws IOException {
     super(conf);
     storage.setDisablePreUpgradableLayoutCheck(true);
-    jsState = JSpoolState.OFF;
+    bnState = BNState.DROP_UNTIL_NEXT_ROLL;
+    editLog.initJournals();
   }
 
   /**
@@ -130,279 +130,259 @@ public class BackupImage extends FSImage
   }
 
   /**
-   * Reset storage directories.
-   * <p>
-   * Unlock the storage.
-   * Rename <code>current</code> to <code>lastcheckpoint.tmp</code>
-   * and recreate empty <code>current</code>.
-   * @throws IOException
+   * Save meta-data into fsimage files.
+   * and create empty edits.
    */
-  synchronized void reset() throws IOException {
-    /* TODO: BackupNode
-    // reset NameSpace tree
-    FSDirectory fsDir = getFSNamesystem().dir;
-    fsDir.reset();
-
-    // unlock, close and rename storage directories
-    storage.unlockAll();
-    
-    // recover from unsuccessful checkpoint if necessary
-    recoverCreateRead();
-    // rename and recreate
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      // rename current to lastcheckpoint.tmp
-      storage.moveCurrent(sd);
-    }
-    */
+  void saveCheckpoint() throws IOException {
+    saveNamespace();
   }
 
   /**
-   * Load checkpoint from local files only if the memory state is empty.<br>
-   * Set new checkpoint time received from the name-node.<br>
-   * Move <code>lastcheckpoint.tmp</code> to <code>previous.checkpoint</code>.
+   * Receive a batch of edits from the NameNode.
+   * 
+   * Depending on bnState, different actions are taken. See
+   * {@link BackupImage.BNState}
+   * 
+   * @param firstTxId first txid in batch
+   * @param numTxns number of transactions
+   * @param data serialized journal records.
    * @throws IOException
+   * @see #convergeJournalSpool()
    */
-  void loadCheckpoint(CheckpointSignature sig) throws IOException {
-    // load current image and journal if it is not in memory already
-    if(!editLog.isOpen())
-      editLog.startLogSegment(TODO_TXID);
-
-    // set storage fields
-    storage.setStorageInfo(sig);
-
-    FSDirectory fsDir = getFSNamesystem().dir;
-    if(fsDir.isEmpty()) {
-      Iterator<StorageDirectory> itImage
-        = storage.dirIterator(NameNodeDirType.IMAGE);
-      Iterator<StorageDirectory> itEdits
-        = storage.dirIterator(NameNodeDirType.EDITS);
-      if(!itImage.hasNext() || ! itEdits.hasNext())
-        throw new IOException("Could not locate checkpoint directories");
-      StorageDirectory sdName = itImage.next();
-      StorageDirectory sdEdits = itEdits.next();
-
-      getFSDirectoryRootLock().writeLock();
-      try { // load image under rootDir lock
-        File imageFile = null; // TODO
-        MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
-        loadFSImage(imageFile, expectedMD5);
-      } finally {
-        getFSDirectoryRootLock().writeUnlock();
-      }
-      List<File> editsFiles =
-        FSImageOldStorageInspector.getEditsInStorageDir(sdEdits);
-      loadEdits(editsFiles);
-      lastAppliedTxId = getEditLog().getLastWrittenTxId();
+  synchronized void journal(long firstTxId, int numTxns, byte[] data) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Got journal, " +
+          "state = " + bnState +
+          "; firstTxId = " + firstTxId +
+          "; numTxns = " + numTxns);
     }
+    
+    switch(bnState) {
+      case DROP_UNTIL_NEXT_ROLL:
+        return;
+
+      case IN_SYNC:
+        // update NameSpace in memory
+        applyEdits(firstTxId, numTxns, data);
+        break;
+      
+      case JOURNAL_ONLY:
+        break;
+      
+      default:
+        throw new AssertionError("Unhandled state: " + bnState);
+    }
+    
+    // write to BN's local edit log.
+    logEditsLocally(firstTxId, numTxns, data);
   }
 
   /**
-   * Save meta-data into fsimage files.
-   * and create empty edits.
+   * Write the batch of edits to the local copy of the edit logs.
    */
-  void saveCheckpoint() throws IOException {
-    saveNamespace();
-  }
-
-  private FSDirectory getFSDirectoryRootLock() {
-    return getFSNamesystem().dir;
-  }
-
-  static File getJSpoolDir(StorageDirectory sd) {
-    return new File(sd.getRoot(), STORAGE_JSPOOL_DIR);
-  }
-
-  static File getJSpoolFile(StorageDirectory sd) {
-    return new File(getJSpoolDir(sd), STORAGE_JSPOOL_FILE);
+  private void logEditsLocally(long firstTxId, int numTxns, byte[] data) {
+    long expectedTxId = editLog.getLastWrittenTxId() + 1;
+    Preconditions.checkState(firstTxId == expectedTxId,
+        "received txid batch starting at %s but expected txn %s",
+        firstTxId, expectedTxId);
+    editLog.setNextTxId(firstTxId + numTxns - 1);
+    editLog.logEdit(data.length, data);
+    editLog.logSync();
   }
 
   /**
-   * Journal writer journals new meta-data state.
-   * <ol>
-   * <li> If Journal Spool state is OFF then journal records (edits)
-   * are applied directly to meta-data state in memory and are written
-   * to the edits file(s).</li>
-   * <li> If Journal Spool state is INPROGRESS then records are only
-   * written to edits.new file, which is called Spooling.</li>
-   * <li> Journal Spool state WAIT blocks journaling until the
-   * Journal Spool reader finalizes merging of the spooled data and
-   * switches to applying journal to memory.</li>
-   * </ol>
-   * @param length length of data.
-   * @param data serialized journal records.
-   * @throws IOException
-   * @see #convergeJournalSpool()
+   * Apply the batch of edits to the local namespace.
    */
-  synchronized void journal(int length, byte[] data) throws IOException {
+  private synchronized void applyEdits(long firstTxId, int numTxns, byte[] data)
+      throws IOException {
+    Preconditions.checkArgument(firstTxId == lastAppliedTxId + 1,
+        "Received txn batch starting at %s but expected %s",
+        firstTxId, lastAppliedTxId + 1);
     assert backupInputStream.length() == 0 : "backup input stream is not empty";
     try {
-      switch(jsState) {
-        case WAIT:
-        case OFF:
-          // wait until spooling is off
-          waitSpoolEnd();
-          // update NameSpace in memory
-          backupInputStream.setBytes(data);
-          FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
-          int logVersion = storage.getLayoutVersion();
-          BufferedInputStream bin = new BufferedInputStream(backupInputStream);
-          DataInputStream in = new DataInputStream(bin);
-          Checksum checksum = null;
-          if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
-            checksum = FSEditLog.getChecksum();
-            in = new DataInputStream(new CheckedInputStream(bin, checksum));
-          }
-          logLoader.loadEditRecords(logVersion, in, checksum, true,
-                                    lastAppliedTxId + 1);
-          getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
-          break;
-        case INPROGRESS:
-          break;
+      if (LOG.isTraceEnabled()) {
+        LOG.debug("data:" + StringUtils.byteToHexString(data));
+      }
+      backupInputStream.setBytes(data);
+      FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+      int logVersion = storage.getLayoutVersion();
+      BufferedInputStream bin = new BufferedInputStream(backupInputStream);
+      DataInputStream in = new DataInputStream(bin);
+      Checksum checksum = FSEditLog.getChecksum();
+      int numLoaded = logLoader.loadEditRecords(logVersion, in, checksum, true,
+                                lastAppliedTxId + 1);
+      if (numLoaded != numTxns) {
+        throw new IOException("Batch of txns starting at txnid " +
+            firstTxId + " was supposed to contain " + numTxns +
+            " transactions but only was able to apply " + numLoaded);
       }
-      // write to files
-      editLog.logEdit(length, data);
-      editLog.logSync();
+      lastAppliedTxId += numTxns;
+      
+      getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
     } finally {
       backupInputStream.clear();
     }
   }
 
-  private synchronized void waitSpoolEnd() {
-    while(jsState == JSpoolState.WAIT) {
+  /**
+   * Transition the BackupNode from JOURNAL_ONLY state to IN_SYNC state.
+   * This is done by repeated invocations of tryConvergeJournalSpool until
+   * we are caught up to the latest in-progress edits file.
+   */
+  void convergeJournalSpool() throws IOException {
+    Preconditions.checkState(bnState == BNState.JOURNAL_ONLY,
+        "bad state: %s", bnState);
+
+    while (!tryConvergeJournalSpool()) {
+      ;
+    }
+    assert bnState == BNState.IN_SYNC;
+  }
+  
+  private boolean tryConvergeJournalSpool() throws IOException {
+    Preconditions.checkState(bnState == BNState.JOURNAL_ONLY,
+        "bad state: %s", bnState);
+    
+    // This section is unsynchronized so we can continue to apply
+    // ahead of where we're reading, concurrently. Since the state
+    // is JOURNAL_ONLY at this point, we know that lastAppliedTxId
+    // doesn't change, and curSegmentTxId only increases
+
+    while (lastAppliedTxId < editLog.getCurSegmentTxId() - 1) {
+      long target = editLog.getCurSegmentTxId();
+      LOG.info("Loading edits into backupnode to try to catch up from txid "
+          + lastAppliedTxId + " to " + target);
+      FSImageTransactionalStorageInspector inspector =
+        new FSImageTransactionalStorageInspector();
+      
+      storage.inspectStorageDirs(inspector);
+      LogLoadPlan logLoadPlan = inspector.createLogLoadPlan(lastAppliedTxId,
+          target - 1);
+  
+      logLoadPlan.doRecovery();
+      loadEdits(logLoadPlan.getEditsFiles());
+    }
+    
+    // now, need to load the in-progress file
+    synchronized (this) {
+      if (lastAppliedTxId != editLog.getCurSegmentTxId() - 1) {
+        LOG.debug("Logs rolled while catching up to current segment");
+        return false; // drop lock and try again to load local logs
+      }
+      
+      EditLogInputStream stream = getEditLog().getInProgressFileInputStream();
       try {
-        wait();
-      } catch (InterruptedException  e) {}
+        long remainingTxns = getEditLog().getLastWrittenTxId() - lastAppliedTxId;
+        
+        LOG.info("Going to finish converging with remaining " + remainingTxns
+            + " txns from in-progress stream " + stream);
+        
+        FSEditLogLoader loader = new FSEditLogLoader(namesystem);
+        int numLoaded = loader.loadFSEdits(stream, lastAppliedTxId + 1);
+        lastAppliedTxId += numLoaded;
+        assert numLoaded == remainingTxns :
+          "expected to load " + remainingTxns + " but loaded " +
+          numLoaded + " from " + stream;
+      } finally {
+        IOUtils.closeStream(stream);
+      }
+
+      LOG.info("Successfully synced BackupNode with NameNode at txnid " +
+          lastAppliedTxId);
+      setState(BNState.IN_SYNC);
     }
-    // now spooling should be off, verifying just in case
-    assert jsState == JSpoolState.OFF : "Unexpected JSpool state: " + jsState;
+    return true;
   }
 
   /**
-   * Start journal spool.
-   * Switch to writing into edits.new instead of edits.
-   *
-   * edits.new for spooling is in separate directory "spool" rather than in
-   * "current" because the two directories should be independent.
-   * While spooling a checkpoint can happen and current will first
-   * move to lastcheckpoint.tmp and then to previous.checkpoint
-   * spool/edits.new will remain in place during that.
-   */
-  synchronized void startJournalSpool(NamenodeRegistration nnReg)
-  throws IOException {
-    switch(jsState) {
-      case OFF:
-        break;
-      case INPROGRESS:
-        return;
-      case WAIT:
-        waitSpoolEnd();
+   * Transition edit log to a new state, logging as necessary.
+   */
+  private synchronized void setState(BNState newState) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("State transition " + bnState + " -> " + newState,
+          new Exception("trace"));
     }
+    bnState = newState;
+  }
 
-    // create journal spool directories
-    for (Iterator<StorageDirectory> it
-           = storage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      File jsDir = getJSpoolDir(sd);
-      if (!jsDir.exists() && !jsDir.mkdirs()) {
-        throw new IOException("Mkdirs failed to create "
-                              + jsDir.getCanonicalPath());
+  /**
+   * Receive a notification that the NameNode has begun a new edit log.
+   * This causes the BN to also start the new edit log in its local
+   * directories.
+   */
+  synchronized void namenodeStartedLogSegment(long txid) {
+    LOG.info("NameNode started a new log segment at txid " + txid);
+    if (editLog.isOpen()) {
+      if (editLog.getLastWrittenTxId() == txid - 1) {
+        // We are in sync with the NN, so end and finalize the current segment
+        editLog.endCurrentLogSegment(false);
+      } else {
+        // We appear to have missed some transactions -- the NN probably
+        // lost contact with us temporarily. So, mark the current segment
+        // as aborted.
+        LOG.warn("NN started new log segment at txid " + txid +
+            ", but BN had only written up to txid " +
+            editLog.getLastWrittenTxId() +
+            "in the log segment starting at " + 
+        		editLog.getCurSegmentTxId() + ". Aborting this " +
+        		"log segment.");
+        editLog.abortCurrentLogSegment();
       }
-      // create edit file if missing
-      /*File eFile = storage.getEditFile(sd); TODO
-      if(!eFile.exists()) {
-        editLog.createEditLogFile(eFile);
-      }*/
-    }
-
-    if(!editLog.isOpen())
-      editLog.startLogSegment(TODO_TXID);
-
-    // create streams pointing to the journal spool files
-    // subsequent journal records will go directly to the spool
-// TODO    editLog.divertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
-
-    // set up spooling
-    if(backupInputStream == null)
-      backupInputStream = new EditLogBackupInputStream(nnReg.getAddress());
-    jsState = JSpoolState.INPROGRESS;
+    }
+    editLog.setNextTxId(txid);
+    editLog.startLogSegment(txid, false);
+    if (bnState == BNState.DROP_UNTIL_NEXT_ROLL) {
+      setState(BNState.JOURNAL_ONLY);
+    }
+    
+    if (stopApplyingEditsOnNextRoll) {
+      if (bnState == BNState.IN_SYNC) {
+        LOG.info("Stopped applying edits to prepare for checkpoint.");
+        setState(BNState.JOURNAL_ONLY);
+      }
+      stopApplyingEditsOnNextRoll = false;
+      notifyAll();
+    }
   }
 
   /**
-   * Merge Journal Spool to memory.<p>
-   * Journal Spool reader reads journal records from edits.new.
-   * When it reaches the end of the file it sets {@link JSpoolState} to WAIT.
-   * This blocks journaling (see {@link #journal(int,byte[])}.
-   * The reader
-   * <ul>
-   * <li> reads remaining journal records if any,</li>
-   * <li> renames edits.new to edits,</li>
-   * <li> sets {@link JSpoolState} to OFF,</li>
-   * <li> and notifies the journaling thread.</li>
-   * </ul>
-   * Journaling resumes with applying new journal records to the memory state,
-   * and writing them into edits file(s).
+   * Request that the next time the BN receives a log roll, it should
+   * stop applying the edits log to the local namespace. This is
+   * typically followed on by a call to {@link #waitUntilNamespaceFrozen()}
    */
-  void convergeJournalSpool() throws IOException {
-    Iterator<StorageDirectory> itEdits
-      = storage.dirIterator(NameNodeDirType.EDITS);
-    if(! itEdits.hasNext())
-      throw new IOException("Could not locate checkpoint directories");
-    StorageDirectory sdEdits = itEdits.next();
-    int numEdits = 0;
-    File jSpoolFile = getJSpoolFile(sdEdits);
-    long startTime = now();
-    if(jSpoolFile.exists()) {
-      // load edits.new
-      EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
-      BufferedInputStream bin = new BufferedInputStream(edits);
-      DataInputStream in = new DataInputStream(bin);
-      FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
+  synchronized void freezeNamespaceAtNextRoll() {
+    stopApplyingEditsOnNextRoll = true;
+  }
 
-      LogHeader header = FSEditLogOp.LogHeader.read(in);
-      int loaded = logLoader.loadEditRecords(
-          header.logVersion, in, header.checksum, false,
-          lastAppliedTxId + 1);
-
-      lastAppliedTxId += loaded;
-      numEdits += loaded;
-
-      // first time reached the end of spool
-      jsState = JSpoolState.WAIT;
-      loaded = logLoader.loadEditRecords(
-          header.logVersion, in, header.checksum,
-          true, lastAppliedTxId + 1);
-      numEdits += loaded;
-      lastAppliedTxId += loaded;
-
-      getFSNamesystem().dir.updateCountForINodeWithQuota();
-      edits.close();
-    }
-
-    FSImage.LOG.info("Edits file " + jSpoolFile.getCanonicalPath()
-        + " of size " + jSpoolFile.length() + " edits # " + numEdits
-        + " loaded in " + (now()-startTime)/1000 + " seconds.");
-
-    // rename spool edits.new to edits making it in sync with the active node
-    // subsequent journal records will go directly to edits
-    // TODO editLog.revertFileStreams(STORAGE_JSPOOL_DIR + "/" + STORAGE_JSPOOL_FILE);
-
-    // write version file
-    // TODO resetVersion(storage.getImageDigest());
-
-    // wake up journal writer
-    synchronized(this) {
-      jsState = JSpoolState.OFF;
-      notifyAll();
-    }
+  /**
+   * After {@link #freezeNamespaceAtNextRoll()} has been called, wait until
+   * the BN receives notification of the next log roll.
+   */
+  synchronized void waitUntilNamespaceFrozen() throws IOException {
+    if (bnState != BNState.IN_SYNC) return;
 
-    /*
-     * TODO: bn
-    // Rename lastcheckpoint.tmp to previous.checkpoint
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
-      StorageDirectory sd = it.next();
-      storage.moveLastCheckpoint(sd);
+    LOG.info("Waiting until the NameNode rolls its edit logs in order " +
+        "to freeze the BackupNode namespace.");
+    while (bnState == BNState.IN_SYNC) {
+      Preconditions.checkState(stopApplyingEditsOnNextRoll,
+        "If still in sync, we should still have the flag set to " +
+        "freeze at next roll");
+      try {
+        wait();
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted waiting for namespace to freeze", ie);
+        throw new IOException(ie);
+      }
     }
-    */
+    LOG.info("BackupNode namespace frozen.");
+  }
+
+  /**
+   * Override close() so that we don't finalize edit logs.
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    editLog.abortCurrentLogSegment();
+    storage.close();
   }
 }

Added: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1146845&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (added)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Thu Jul 14 18:53:11 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.namenode.NNStorageArchivalManager.StorageArchiver;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+
+public class BackupJournalManager implements JournalManager {
+
+  private final NamenodeRegistration nnReg;
+  private final NamenodeRegistration bnReg;
+  
+  BackupJournalManager(NamenodeRegistration bnReg,
+      NamenodeRegistration nnReg) {
+    this.bnReg = bnReg;
+    this.nnReg = nnReg;
+  }
+
+  @Override
+  public EditLogOutputStream startLogSegment(long txId) throws IOException {
+    EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, nnReg);
+    stm.startLogSegment(txId);
+    return stm;
+  }
+
+  @Override
+  public void finalizeLogSegment(long firstTxId, long lastTxId)
+      throws IOException {
+  }
+
+  @Override
+  public void setOutputBufferCapacity(int size) {
+  }
+
+  @Override
+  public void archiveLogsOlderThan(long minTxIdToKeep, StorageArchiver archiver)
+      throws IOException {
+  }
+
+  public boolean matchesRegistration(NamenodeRegistration bnReg) {
+    return bnReg.getAddress().equals(this.bnReg.getAddress());
+  }
+
+  @Override
+  public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId) {
+    return null;
+  }
+}

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Thu Jul 14 18:53:11 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.protocol.BackupNodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -51,7 +52,7 @@ import org.apache.hadoop.net.NetUtils;
  * </ol>
  */
 @InterfaceAudience.Private
-public class BackupNode extends NameNode {
+public class BackupNode extends NameNode implements BackupNodeProtocol {
   private static final String BN_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
   private static final String BN_ADDRESS_DEFAULT = DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT;
   private static final String BN_HTTP_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
@@ -176,6 +177,17 @@ public class BackupNode extends NameNode
     super.stop();
   }
 
+  
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    if (protocol.equals(BackupNodeProtocol.class.getName())) {
+      return BackupNodeProtocol.versionID;
+    } else {
+      return super.getProtocolVersion(protocol, clientVersion);
+    }
+  }
+
   /////////////////////////////////////////////////////
   // NamenodeProtocol implementation for backup node.
   /////////////////////////////////////////////////////
@@ -202,30 +214,36 @@ public class BackupNode extends NameNode
   public void endCheckpoint(NamenodeRegistration registration,
                             CheckpointSignature sig) throws IOException {
     throw new UnsupportedActionException("endCheckpoint");
-  }
+  }  
 
-  @Override // NamenodeProtocol
+  /////////////////////////////////////////////////////
+  // BackupNodeProtocol implementation for backup node.
+  /////////////////////////////////////////////////////
+
+  @Override
   public void journal(NamenodeRegistration nnReg,
-                      int jAction,
-                      int length,
-                      byte[] args) throws IOException {
+      long firstTxId, int numTxns,
+      byte[] records) throws IOException {
     verifyRequest(nnReg);
     if(!nnRpcAddress.equals(nnReg.getAddress()))
       throw new IOException("Journal request from unexpected name-node: "
           + nnReg.getAddress() + " expecting " + nnRpcAddress);
-    BackupImage bnImage = (BackupImage)getFSImage();
-    switch(jAction) {
-      case (int)JA_IS_ALIVE:
-        return;
-      case (int)JA_JOURNAL:
-        bnImage.journal(length, args);
-        return;
-      case (int)JA_JSPOOL_START:
-        bnImage.startJournalSpool(nnReg);
-        return;
-      default:
-        throw new IOException("Unexpected journal action: " + jAction);
-    }
+    getBNImage().journal(firstTxId, numTxns, records);
+  }
+
+  @Override
+  public void startLogSegment(NamenodeRegistration registration, long txid)
+      throws IOException {
+    verifyRequest(registration);
+  
+    getBNImage().namenodeStartedLogSegment(txid);
+  }
+
+  //////////////////////////////////////////////////////
+  
+  
+  BackupImage getBNImage() {
+    return (BackupImage)getFSImage();
   }
 
   boolean shouldCheckpointAtStartup() {
@@ -234,9 +252,9 @@ public class BackupNode extends NameNode
       assert fsImage.getStorage().getNumStorageDirs() > 0;
       return ! fsImage.getStorage().getStorageDir(0).getVersionFile().exists();
     }
-    if(namesystem == null || namesystem.dir == null || getFSImage() == null)
-      return true;
-    return false; // TODO fsImage.getEditLog().getNumJournals() == 0;
+    
+    // BN always checkpoints on startup in order to get in sync with namespace
+    return true;
   }
 
   private NamespaceInfo handshake(Configuration conf) throws IOException {
@@ -287,14 +305,15 @@ public class BackupNode extends NameNode
    */
   private void registerWith(NamespaceInfo nsInfo) throws IOException {
     BackupImage bnImage = (BackupImage)getFSImage();
+    NNStorage storage = bnImage.getStorage();
     // verify namespaceID
-    if(bnImage.getStorage().getNamespaceID() == 0) // new backup storage
-      bnImage.getStorage().setStorageInfo(nsInfo);
-    else if(bnImage.getStorage().getNamespaceID() != nsInfo.getNamespaceID())
-      throw new IOException("Incompatible namespaceIDs"
-          + ": active node namespaceID = " + nsInfo.getNamespaceID() 
-          + "; backup node namespaceID = " + bnImage.getStorage().getNamespaceID());
-
+    if (storage.getNamespaceID() == 0) { // new backup storage
+      storage.setStorageInfo(nsInfo);
+      storage.setBlockPoolID(nsInfo.getBlockPoolID());
+      storage.setClusterID(nsInfo.getClusterID());
+    } else {
+      nsInfo.validateStorage(storage);
+    }
     setRegistration();
     NamenodeRegistration nnReg = null;
     while(!isStopRequested()) {
@@ -323,14 +342,6 @@ public class BackupNode extends NameNode
     nnRpcAddress = nnReg.getAddress();
   }
 
-  /**
-   * Reset node namespace state in memory and in storage directories.
-   * @throws IOException
-   */
-  void resetNamespace() throws IOException {
-    ((BackupImage)getFSImage()).reset();
-  }
-
   // TODO: move to a common with DataNode util class
   private static NamespaceInfo handshake(NamenodeProtocol namenode)
   throws IOException, SocketTimeoutException {

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java Thu Jul 14 18:53:11 2011
@@ -110,7 +110,6 @@ public class CheckpointSignature extends
        || cTime != si.getStorage().cTime
        || !clusterID.equals(si.getClusterID())
        || !blockpoolID.equals(si.getBlockPoolID())) {
-      // checkpointTime can change when the image is saved - do not compare
       throw new IOException("Inconsistent checkpoint fields.\n"
           + "LV = " + layoutVersion + " namespaceID = " + namespaceID
           + " cTime = " + cTime

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Thu Jul 14 18:53:11 2011
@@ -17,30 +17,27 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.IOException;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
 import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.namenode.FSImage.CheckpointStates;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.util.Daemon;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
 
 /**
  * The Checkpointer is responsible for supporting periodic checkpoints 
@@ -80,6 +77,7 @@ class Checkpointer extends Daemon {
     try {
       initialize(conf);
     } catch(IOException e) {
+      LOG.warn("Checkpointer got exception", e);
       shutdown();
       throw e;
     }
@@ -134,8 +132,9 @@ class Checkpointer extends Daemon {
     periodMSec *= 1000;
 
     long lastCheckpointTime = 0;
-    if(!backupNode.shouldCheckpointAtStartup())
+    if (!backupNode.shouldCheckpointAtStartup()) {
       lastCheckpointTime = now();
+    }
     while(shouldRun) {
       try {
         long now = now();
@@ -175,44 +174,15 @@ class Checkpointer extends Daemon {
   }
 
   /**
-   * Download <code>fsimage</code> and <code>edits</code>
-   * files from the remote name-node.
-   */
-  private void downloadCheckpoint(CheckpointSignature sig) throws IOException {
-    String nnHttpAddr = backupNode.nnHttpAddress;
-
-    // Retrieve image file
-    MD5Hash hash = TransferFsImage.downloadImageToStorage(
-        nnHttpAddr, sig.mostRecentCheckpointTxId,
-        getFSImage().getStorage(), true);
-    getFSImage().saveDigestAndRenameCheckpointImage(sig.mostRecentCheckpointTxId, hash);
-    
-    // Retrieve edits file
-    // TODO!
-  }
-
-  /**
-   * Copy the new image into remote name-node.
-   */
-  private void uploadCheckpoint(CheckpointSignature sig) throws IOException {
-    // TODO: checkpoint node disabled in 1073 branch
-/*    // Use the exact http addr as specified in config to deal with ip aliasing
-    InetSocketAddress httpSocAddr = backupNode.getHttpAddress();
-    int httpPort = httpSocAddr.getPort();
-    String fileid = "putimage=1&port=" + httpPort +
-      "&machine=" + infoBindAddress +
-      "&token=" + sig.toString();
-    LOG.info("Posted URL " + backupNode.nnHttpAddress + fileid);
-    TransferFsImage.getFileClient(backupNode.nnHttpAddress, 
-        fileid, null, false);
-        */
-  }
-
-  /**
    * Create a new checkpoint
    */
   void doCheckpoint() throws IOException {
+    BackupImage bnImage = getFSImage();
+    NNStorage bnStorage = bnImage.getStorage();
+
     long startTime = now();
+    bnImage.freezeNamespaceAtNextRoll();
+    
     NamenodeCommand cmd = 
       getNamenode().startCheckpoint(backupNode.getRegistration());
     CheckpointCommand cpCmd = null;
@@ -228,36 +198,76 @@ class Checkpointer extends Daemon {
         throw new IOException("Unsupported NamenodeCommand: "+cmd.getAction());
     }
 
+    bnImage.waitUntilNamespaceFrozen();
+    
     CheckpointSignature sig = cpCmd.getSignature();
-    assert FSConstants.LAYOUT_VERSION == sig.getLayoutVersion() :
-      "Signature should have current layout version. Expected: "
-      + FSConstants.LAYOUT_VERSION + " actual "+ sig.getLayoutVersion();
-    assert !backupNode.isRole(NamenodeRole.CHECKPOINT) ||
-      cpCmd.isImageObsolete() : "checkpoint node should always download image.";
-    if(cpCmd.isImageObsolete()) {
-      // First reset storage on disk and memory state
-      backupNode.resetNamespace();
-      downloadCheckpoint(sig);
-    }
 
-    BackupImage bnImage = getFSImage();
-    bnImage.getStorage().setBlockPoolID(backupNode.getBlockPoolId());
-    bnImage.getStorage().setClusterID(backupNode.getClusterId());
-    bnImage.loadCheckpoint(sig);
+    // Make sure we're talking to the same NN!
     sig.validateStorageInfo(bnImage);
-    bnImage.saveCheckpoint();
 
-    if(cpCmd.needToReturnImage())
-      uploadCheckpoint(sig);
+    long lastApplied = bnImage.getLastAppliedTxId();
+    LOG.debug("Doing checkpoint. Last applied: " + lastApplied);
+    RemoteEditLogManifest manifest =
+      getNamenode().getEditLogManifest(bnImage.getLastAppliedTxId());
+
+    if (!manifest.getLogs().isEmpty()) {
+      RemoteEditLog firstRemoteLog = manifest.getLogs().get(0);
+      // we don't have enough logs to roll forward using only logs. Need
+      // to download and load the image.
+      if (firstRemoteLog.getStartTxId() > lastApplied + 1) {
+        LOG.info("Unable to roll forward using only logs. Downloading " +
+            "image with txid " + sig.mostRecentCheckpointTxId);
+        MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
+            backupNode.nnHttpAddress, sig.mostRecentCheckpointTxId,
+            bnStorage, true);
+        bnImage.saveDigestAndRenameCheckpointImage(
+            sig.mostRecentCheckpointTxId, downloadedHash);
+        
+        LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
+        File file = bnStorage.findImageFile(sig.mostRecentCheckpointTxId);
+        bnImage.reloadFromImageFile(file);
+      }
+      
+      lastApplied = bnImage.getLastAppliedTxId();
+      if (firstRemoteLog.getStartTxId() > lastApplied + 1) {
+        throw new IOException("No logs to roll forward from " + lastApplied);
+      }
+  
+      // get edits files
+      for (RemoteEditLog log : manifest.getLogs()) {
+        TransferFsImage.downloadEditsToStorage(
+            backupNode.nnHttpAddress, log, bnStorage);
+      }
+  
+      SecondaryNameNode.rollForwardByApplyingLogs(manifest, bnImage);
+    }
+
+    long txid = bnImage.getLastAppliedTxId();
+    bnImage.saveFSImageInAllDirs(txid);
+    bnStorage.writeAll();
+
+    if(cpCmd.needToReturnImage()) {
+      TransferFsImage.uploadImageFromStorage(
+          backupNode.nnHttpAddress, getImageListenAddress(),
+          bnStorage, txid);
+    }
 
     getNamenode().endCheckpoint(backupNode.getRegistration(), sig);
 
-    bnImage.convergeJournalSpool();
+    if (backupNode.getRole() == NamenodeRole.BACKUP) {
+      bnImage.convergeJournalSpool();
+    }
     backupNode.setRegistration(); // keep registration up to date
-    if(backupNode.isRole(NamenodeRole.CHECKPOINT))
-        getFSImage().getEditLog().close();
+    
+    long imageSize = bnImage.getStorage().getFsImageName(txid).length();
     LOG.info("Checkpoint completed in "
         + (now() - startTime)/1000 + " seconds."
-        + " New Image Size: " + bnImage.getStorage().getFsImageName(0 /* TODO */).length());
+        + " New Image Size: " + imageSize);
+  }
+
+  private InetSocketAddress getImageListenAddress() {
+    InetSocketAddress httpSocAddr = backupNode.getHttpAddress();
+    int httpPort = httpSocAddr.getPort();
+    return new InetSocketAddress(infoBindAddress, httpPort);
   }
 }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Thu Jul 14 18:53:11 2011
@@ -17,14 +17,14 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.BackupNodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
@@ -42,7 +42,7 @@ import org.apache.hadoop.net.NetUtils;
 class EditLogBackupOutputStream extends EditLogOutputStream {
   static int DEFAULT_BUFFER_SIZE = 256;
 
-  private NamenodeProtocol backupNode;          // RPC proxy to backup node
+  private BackupNodeProtocol backupNode;        // RPC proxy to backup node
   private NamenodeRegistration bnRegistration;  // backup node registration
   private NamenodeRegistration nnRegistration;  // active node registration
   private ArrayList<JournalRecord> bufCurrent;  // current buffer for writing
@@ -60,13 +60,8 @@ class EditLogBackupOutputStream extends 
       this.args = writables;
     }
 
-    void write(DataOutputStream out) throws IOException {
-      out.write(op);
-      out.writeLong(txid);
-      if(args == null)
-        return;
-      for(Writable w : args)
-        w.write(out);
+    void write(DataOutputBuffer out) throws IOException {
+      writeChecksummedOp(out, op, txid, args);
     }
   }
 
@@ -81,8 +76,8 @@ class EditLogBackupOutputStream extends 
     Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress);
     try {
       this.backupNode =
-        (NamenodeProtocol) RPC.getProxy(NamenodeProtocol.class,
-            NamenodeProtocol.versionID, bnAddress, new HdfsConfiguration());
+        (BackupNodeProtocol) RPC.getProxy(BackupNodeProtocol.class,
+            BackupNodeProtocol.versionID, bnAddress, new HdfsConfiguration());
     } catch(IOException e) {
       Storage.LOG.error("Error connecting to: " + bnAddress, e);
       throw e;
@@ -91,7 +86,7 @@ class EditLogBackupOutputStream extends 
     this.bufReady = new ArrayList<JournalRecord>();
     this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
   }
-
+  
   @Override // JournalStream
   public String getName() {
     return bnRegistration.getAddress();
@@ -156,23 +151,14 @@ class EditLogBackupOutputStream extends 
   @Override // EditLogOutputStream
   protected void flushAndSync() throws IOException {
     assert out.size() == 0 : "Output buffer is not empty";
-    int bufReadySize = bufReady.size();
-    for(int idx = 0; idx < bufReadySize; idx++) {
-      JournalRecord jRec = null;
-      for(; idx < bufReadySize; idx++) {
-        jRec = bufReady.get(idx);
-        if(jRec.op >= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode())
-          break;  // special operation should be sent in a separate call to BN
-        jRec.write(out);
-      }
-      if(out.size() > 0)
-        send(NamenodeProtocol.JA_JOURNAL);
-      if(idx == bufReadySize)
-        break;
-      // operation like start journal spool or increment checkpoint time
-      // is a separate call to BN
+    for (JournalRecord jRec : bufReady) {
       jRec.write(out);
-      send(jRec.op);
+    }
+    if (out.size() > 0) {
+      byte[] data = Arrays.copyOf(out.getData(), out.getLength());
+      backupNode.journal(nnRegistration,
+          bufReady.get(0).txid, bufReady.size(),
+          data);
     }
     bufReady.clear();         // erase all data in the buffer
     out.reset();              // reset buffer to the start position
@@ -188,16 +174,6 @@ class EditLogBackupOutputStream extends 
     return 0;
   }
 
-  private void send(int ja) throws IOException {
-    try {
-      int length = out.getLength();
-      out.write(FSEditLogOpCodes.OP_INVALID.getOpCode());
-      backupNode.journal(nnRegistration, ja, length, out.getData());
-    } finally {
-      out.reset();
-    }
-  }
-
   /**
    * Get backup node registration.
    */
@@ -205,17 +181,7 @@ class EditLogBackupOutputStream extends 
     return bnRegistration;
   }
 
-  /**
-   * Verify that the backup node is alive.
-   */
-  boolean isAlive() {
-    try {
-      send(NamenodeProtocol.JA_IS_ALIVE);
-    } catch(IOException ei) {
-      Storage.LOG.info(bnRegistration.getRole() + " "
-                      + bnRegistration.getAddress() + " is not alive. ", ei);
-      return false;
-    }
-    return true;
+  void startLogSegment(long txId) throws IOException {
+    backupNode.startLogSegment(nnRegistration, txId);
   }
 }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Thu Jul 14 18:53:11 2011
@@ -72,5 +72,10 @@ class EditLogFileInputStream extends Edi
     // file size + size of both buffers
     return file.length();
   }
+  
+  @Override
+  public String toString() {
+    return getName();
+  }
 
 }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Thu Jul 14 18:53:11 2011
@@ -111,21 +111,9 @@ class EditLogFileOutputStream extends Ed
     if (fp == null) {
       throw new IOException("Trying to use aborted output stream");
     }
-    int start = bufCurrent.getLength();
-    write(op);
-    bufCurrent.writeLong(txid);
-    for (Writable w : writables) {
-      w.write(bufCurrent);
-    }
-    // write transaction checksum
-    int end = bufCurrent.getLength();
-    Checksum checksum = FSEditLog.getChecksum();
-    checksum.reset();
-    checksum.update(bufCurrent.getData(), start, end-start);
-    int sum = (int)checksum.getValue();
-    bufCurrent.writeInt(sum);
+    writeChecksummedOp(bufCurrent, op, txid, writables);
   }
-  
+
   @Override
   void write(final byte[] data, int off, int len) throws IOException {
     bufCurrent.write(data, off, len);
@@ -151,7 +139,7 @@ class EditLogFileOutputStream extends Ed
 
     setReadyToFlush();
     flush();
-    
+
     // close should have been called after all pending transactions
     // have been flushed & synced.
     // if already closed, just skip

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Thu Jul 14 18:53:11 2011
@@ -19,8 +19,11 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.zip.Checksum;
 
 import static org.apache.hadoop.hdfs.server.common.Util.now;
+
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -135,5 +138,25 @@ implements JournalStream {
     return getName();
   }
 
-
+  /**
+   * Write the given operation to the specified buffer, including
+   * the transaction ID and checksum.
+   */
+  protected static void writeChecksummedOp(
+      DataOutputBuffer buf, byte op, long txid, Writable... writables)
+      throws IOException {
+    int start = buf.getLength();
+    buf.write(op);
+    buf.writeLong(txid);
+    for (Writable w : writables) {
+      w.write(buf);
+    }
+    // write transaction checksum
+    int end = buf.getLength();
+    Checksum checksum = FSEditLog.getChecksum();
+    checksum.reset();
+    checksum.update(buf.getData(), start, end-start);
+    int sum = (int)checksum.getValue();
+    buf.writeInt(sum);
+  }
 }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Jul 14 18:53:11 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 import java.util.zip.Checksum;
 
@@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.Deprecated
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
@@ -151,7 +153,7 @@ public class FSEditLog  {
   /**
    * Initialize the list of edit journals
    */
-  private void initJournals() {
+  synchronized void initJournals() {
     assert journals.isEmpty();
     Preconditions.checkState(state == State.UNINITIALIZED,
         "Bad state: %s", state);
@@ -167,10 +169,6 @@ public class FSEditLog  {
     state = State.BETWEEN_LOG_SEGMENTS;
   }
   
-  private int getNumEditsDirs() {
-   return storage.getNumStorageDirs(NameNodeDirType.EDITS);
-  }
-
   /**
    * Initialize the output stream for logging, opening the first
    * log segment.
@@ -179,7 +177,7 @@ public class FSEditLog  {
     Preconditions.checkState(state == State.UNINITIALIZED);
     initJournals();
 
-    startLogSegment(getLastWrittenTxId() + 1);
+    startLogSegment(getLastWrittenTxId() + 1, true);
     assert state == State.IN_SEGMENT : "Bad state: " + state;
   }
   
@@ -199,7 +197,7 @@ public class FSEditLog  {
     if (state == State.IN_SEGMENT) {
       assert !journals.isEmpty();
       waitForSyncToFinish();
-      endCurrentLogSegment();
+      endCurrentLogSegment(true);
     }
 
     state = State.CLOSED;
@@ -335,11 +333,12 @@ public class FSEditLog  {
    * Set the transaction ID to use for the next transaction written.
    */
   synchronized void setNextTxId(long nextTxId) {
-    assert synctxid <= txid &&
-       nextTxId >= txid : "May not decrease txid." +
-      " synctxid=" + synctxid +
-      " txid=" + txid +
-      " nextTxid=" + nextTxId;
+    Preconditions.checkArgument(synctxid <= txid &&
+       nextTxId >= txid,
+       "May not decrease txid." +
+      " synctxid=%s txid=%s nextTxId=%s",
+      synctxid, txid, nextTxId);
+      
     txid = nextTxId - 1;
   }
     
@@ -470,7 +469,8 @@ public class FSEditLog  {
       
       if (badJournals.size() >= journals.size()) {
         LOG.fatal("Could not sync any journal to persistent storage. " +
-            "Unsynced transactions: " + (txid - synctxid));
+            "Unsynced transactions: " + (txid - synctxid),
+            new Exception());
         runtime.exit(1);
       }
     } finally {
@@ -726,36 +726,6 @@ public class FSEditLog  {
   }
 
   /**
-   * Return the size of the current EditLog
-   */
-  // TODO who uses this, does it make sense with transactions?
-  synchronized long getEditLogSize() throws IOException {
-    assert getNumEditsDirs() <= journals.size() :
-        "Number of edits directories should not exceed the number of streams.";
-    long size = -1;
-        
-    List<JournalAndStream> badJournals = Lists.newArrayList();
-    
-    for (JournalAndStream j : journals) {
-      if (!j.isActive()) continue;
-      EditLogOutputStream es = j.getCurrentStream();
-      try {
-        long curSize = es.length();
-        assert (size == 0 || size == curSize || curSize ==0) :
-          "Wrong streams size";
-        size = Math.max(size, curSize);
-      } catch (IOException e) {
-        LOG.error("getEditLogSize: editstream.length failed. removing journal " + j, e);
-        badJournals.add(j);
-      }
-    }
-    disableAndReportErrorOnJournals(badJournals);
-    
-    assert size != -1;
-    return size;
-  }
-  
-  /**
    * Used only by unit tests.
    */
   @VisibleForTesting
@@ -793,10 +763,10 @@ public class FSEditLog  {
    */
   synchronized long rollEditLog() throws IOException {
     LOG.info("Rolling edit logs.");
-    endCurrentLogSegment();
+    endCurrentLogSegment(true);
     
     long nextTxId = getLastWrittenTxId() + 1;
-    startLogSegment(nextTxId);
+    startLogSegment(nextTxId, true);
     
     assert curSegmentTxId == nextTxId;
     return nextTxId;
@@ -806,14 +776,20 @@ public class FSEditLog  {
    * Start writing to the log segment with the given txid.
    * Transitions from BETWEEN_LOG_SEGMENTS state to IN_LOG_SEGMENT state. 
    */
-  synchronized void startLogSegment(final long txId) {
-    LOG.info("Starting log segment at " + txId);
+  synchronized void startLogSegment(final long segmentTxId,
+      boolean writeHeaderTxn) {
+    LOG.info("Starting log segment at " + segmentTxId);
+    Preconditions.checkArgument(segmentTxId > 0,
+        "Bad txid: %s", segmentTxId);
     Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
         "Bad state: %s", state);
-    Preconditions.checkState(txId > curSegmentTxId,
-        "Cannot start writing to log segment " + txId +
+    Preconditions.checkState(segmentTxId > curSegmentTxId,
+        "Cannot start writing to log segment " + segmentTxId +
         " when previous log segment started at " + curSegmentTxId);
-    curSegmentTxId = txId;
+    Preconditions.checkArgument(segmentTxId == txid + 1,
+        "Cannot start log segment at txid %s when next expected " +
+        "txid is %s", segmentTxId, txid + 1);
+    curSegmentTxId = segmentTxId;
     
     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
 
@@ -823,26 +799,32 @@ public class FSEditLog  {
     mapJournalsAndReportErrors(new JournalClosure() {
       @Override
       public void apply(JournalAndStream jas) throws IOException {
-        jas.startLogSegment(txId);
+        jas.startLogSegment(segmentTxId);
       }
-    }, "starting log segment " + txId);
+    }, "starting log segment " + segmentTxId);
 
     state = State.IN_SEGMENT;
 
-    logEdit(FSEditLogOpCodes.OP_START_LOG_SEGMENT);
-    logSync();    
+    if (writeHeaderTxn) {
+      logEdit(FSEditLogOpCodes.OP_START_LOG_SEGMENT);
+      logSync();
+    }
   }
 
   /**
    * Finalize the current log segment.
    * Transitions from IN_SEGMENT state to BETWEEN_LOG_SEGMENTS state.
    */
-  synchronized void endCurrentLogSegment() {
+  synchronized void endCurrentLogSegment(boolean writeEndTxn) {
     LOG.info("Ending log segment " + curSegmentTxId);
     Preconditions.checkState(state == State.IN_SEGMENT,
         "Bad state: %s", state);
-    logEdit(FSEditLogOpCodes.OP_END_LOG_SEGMENT);
-    waitForSyncToFinish();
+    
+    if (writeEndTxn) {
+      logEdit(FSEditLogOpCodes.OP_END_LOG_SEGMENT);
+      logSync();
+    }
+
     printStatistics(true);
     
     final long lastTxId = getLastWrittenTxId();
@@ -858,6 +840,20 @@ public class FSEditLog  {
     
     state = State.BETWEEN_LOG_SEGMENTS;
   }
+  
+  /**
+   * Abort all current logs. Called from the backup node.
+   */
+  synchronized void abortCurrentLogSegment() {
+    mapJournalsAndReportErrors(new JournalClosure() {
+      
+      @Override
+      public void apply(JournalAndStream jas) throws IOException {
+        jas.abort();
+      }
+    }, "aborting all streams");
+    state = State.BETWEEN_LOG_SEGMENTS;
+  }
 
   /**
    * Archive any log files that are older than the given txid.
@@ -910,36 +906,62 @@ public class FSEditLog  {
   /**
    * Create (or find if already exists) an edit output stream, which
    * streams journal records (edits) to the specified backup node.<br>
-   * Send a record, prescribing to start journal spool.<br>
-   * This should be sent via regular stream of journal records so that
-   * the backup node new exactly after which record it should start spooling.
+   * 
+   * The new BackupNode will start receiving edits the next time this
+   * NameNode's logs roll.
    * 
    * @param bnReg the backup node registration information.
    * @param nnReg this (active) name-node registration.
    * @throws IOException
    */
-  synchronized void logJSpoolStart(NamenodeRegistration bnReg, // backup node
-                      NamenodeRegistration nnReg) // active name-node
+  synchronized void registerBackupNode(
+      NamenodeRegistration bnReg, // backup node
+      NamenodeRegistration nnReg) // active name-node
   throws IOException {
-    /*
     if(bnReg.isRole(NamenodeRole.CHECKPOINT))
       return; // checkpoint node does not stream edits
-    if(editStreams == null)
-      editStreams = new ArrayList<EditLogOutputStream>();
-    EditLogOutputStream boStream = null;
-    for(EditLogOutputStream eStream : editStreams) {
-      if(eStream.getName().equals(bnReg.getAddress())) {
-        boStream = eStream; // already there
-        break;
+    
+    JournalAndStream jas = findBackupJournalAndStream(bnReg);
+    if (jas != null) {
+      // already registered
+      LOG.info("Backup node " + bnReg + " re-registers");
+      return;
+    }
+    
+    LOG.info("Registering new backup node: " + bnReg);
+    BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
+    journals.add(new JournalAndStream(bjm));
+  }
+  
+  synchronized void releaseBackupStream(NamenodeRegistration registration) {
+    for (Iterator<JournalAndStream> iter = journals.iterator();
+         iter.hasNext();) {
+      JournalAndStream jas = iter.next();
+      if (jas.manager instanceof BackupJournalManager &&
+          ((BackupJournalManager)jas.manager).matchesRegistration(
+              registration)) {
+        jas.abort();        
+        LOG.info("Removing backup journal " + jas);
+        iter.remove();
+      }
+    }
+  }
+  
+  /**
+   * Find the JournalAndStream associated with this BackupNode.
+   * @return null if it cannot be found
+   */
+  private synchronized JournalAndStream findBackupJournalAndStream(
+      NamenodeRegistration bnReg) {
+    for (JournalAndStream jas : journals) {
+      if (jas.manager instanceof BackupJournalManager) {
+        BackupJournalManager bjm = (BackupJournalManager)jas.manager;
+        if (bjm.matchesRegistration(bnReg)) {
+          return jas;
+        }
       }
     }
-    if(boStream == null) {
-      boStream = new EditLogBackupOutputStream(bnReg, nnReg);
-      editStreams.add(boStream);
-    }
-    logEdit(OP_JSPOOL_START, (Writable[])null);
-    TODO: backupnode is disabled
-    */
+    return null;
   }
 
   /**
@@ -961,62 +983,6 @@ public class FSEditLog  {
     endTransaction(start);
   }
 
-  synchronized void releaseBackupStream(NamenodeRegistration registration) {
-    /*
-    Iterator<EditLogOutputStream> it =
-                                  getOutputStreamIterator(JournalType.BACKUP);
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    NamenodeRegistration backupNode = null;
-    while(it.hasNext()) {
-      EditLogBackupOutputStream eStream = (EditLogBackupOutputStream)it.next();
-      backupNode = eStream.getRegistration();
-      if(backupNode.getAddress().equals(registration.getAddress()) &&
-            backupNode.isRole(registration.getRole())) {
-        errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
-        break;
-      }
-    }
-    assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
-      "Not a backup node corresponds to a backup stream";
-    disableAndReportErrorOnJournals(errorStreams);
-    TODO BN currently disabled
-    */
-  }
-
-  synchronized boolean checkBackupRegistration(
-      NamenodeRegistration registration) {
-    /*
-    Iterator<EditLogOutputStream> it =
-                                  getOutputStreamIterator(JournalType.BACKUP);
-    boolean regAllowed = !it.hasNext();
-    NamenodeRegistration backupNode = null;
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    while(it.hasNext()) {
-      EditLogBackupOutputStream eStream = (EditLogBackupOutputStream)it.next();
-      backupNode = eStream.getRegistration();
-      if(backupNode.getAddress().equals(registration.getAddress()) &&
-          backupNode.isRole(registration.getRole())) {
-        regAllowed = true; // same node re-registers
-        break;
-      }
-      if(!eStream.isAlive()) {
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
-        regAllowed = true; // previous backup node failed
-      }
-    }
-    assert backupNode == null || backupNode.isRole(NamenodeRole.BACKUP) :
-      "Not a backup node corresponds to a backup stream";
-    disableAndReportErrorOnJournals(errorStreams);
-    return regAllowed;
-    
-    TODO BN currently disabled
-    */
-    return false;
-  }
-  
   static BytesWritable toBytesWritable(Options.Rename... options) {
     byte[] bytes = new byte[options.length];
     for (int i = 0; i < options.length; i++) {
@@ -1061,12 +1027,7 @@ public class FSEditLog  {
  
     for (JournalAndStream j : badJournals) {
       LOG.error("Disabling journal " + j);
-      try {
-        j.abort();
-      } catch (IOException ioe) {
-        LOG.warn("Failed to abort faulty journal " + j
-                 + " before removing it (might be OK)", ioe);
-      }
+      j.abort();
     }
   }
 
@@ -1093,13 +1054,17 @@ public class FSEditLog  {
     }
 
     public void close(long lastTxId) throws IOException {
+      Preconditions.checkArgument(lastTxId >= segmentStartsAtTxId,
+          "invalid segment: lastTxId %s >= " +
+          "segment starting txid %s", lastTxId, segmentStartsAtTxId);
+          
       if (stream == null) return;
       stream.close();
       manager.finalizeLogSegment(segmentStartsAtTxId, lastTxId);
       stream = null;
     }
     
-    public void abort() throws IOException {
+    public void abort() {
       if (stream == null) return;
       try {
         stream.abort();
@@ -1133,5 +1098,30 @@ public class FSEditLog  {
     JournalManager getManager() {
       return manager;
     }
+
+    public EditLogInputStream getInProgressInputStream() throws IOException {
+      return manager.getInProgressInputStream(segmentStartsAtTxId);
+    }
+  }
+
+  /**
+   * @return an EditLogInputStream that reads from the same log that
+   * the edit log is currently writing. This is used from the BackupNode
+   * during edits synchronization.
+   * @throws IOException if no valid logs are available.
+   */
+  synchronized EditLogInputStream getInProgressFileInputStream()
+      throws IOException {
+    for (JournalAndStream jas : journals) {
+      if (!jas.isActive()) continue;
+      try {
+        EditLogInputStream in = jas.getInProgressInputStream();
+        if (in != null) return in;
+      } catch (IOException ioe) {
+        LOG.warn("Unable to get the in-progress input stream from " + jas,
+            ioe);
+      }
+    }
+    throw new IOException("No in-progress stream provided edits");
   }
 }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Thu Jul 14 18:53:11 2011
@@ -21,16 +21,13 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,7 +42,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.Util;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
@@ -74,14 +70,17 @@ import com.google.common.collect.Lists;
 public class FSImage implements Closeable {
   protected static final Log LOG = LogFactory.getLog(FSImage.class.getName());
 
-  // checkpoint states
-  enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; }
-
   protected FSNamesystem namesystem = null;
   protected FSEditLog editLog = null;
   private boolean isUpgradeFinalized = false;
 
   protected NNStorage storage;
+  
+  /**
+   * The last transaction ID that was either loaded from an image
+   * or loaded by loading edits files.
+   */
+  protected long lastAppliedTxId = 0;
 
   /**
    * URIs for importing an image from a checkpoint. In the default case,
@@ -92,10 +91,6 @@ public class FSImage implements Closeabl
 
   final private Configuration conf;
 
-  /**
-   * Can fs-image be rolled?
-   */
-  volatile protected CheckpointStates ckptState = FSImage.CheckpointStates.START;
   private final NNStorageArchivalManager archivalManager; 
 
   /**
@@ -559,6 +554,18 @@ public class FSImage implements Closeabl
     editLog.open();
     storage.writeTransactionIdFileToStorage(editLog.getCurSegmentTxId());
   };
+  
+  /**
+   * Toss the current image and namesystem, reloading from the specified
+   * file.
+   */
+  void reloadFromImageFile(File file) throws IOException {
+    // TODO: namesystem.close(); ??
+    namesystem.dir.reset();
+
+    LOG.debug("Reloading namespace from " + file);
+    loadFSImage(file);
+  }
 
   /**
    * Choose latest image from one of the directories,
@@ -626,8 +633,12 @@ public class FSImage implements Closeabl
     } catch (IOException ioe) {
       throw new IOException("Failed to load image from " + loadPlan.getImageFile(), ioe);
     }
-
-    needToSave |= loadEdits(loadPlan.getEditsFiles());
+    
+    long numLoaded = loadEdits(loadPlan.getEditsFiles());
+    needToSave |= numLoaded > 0;
+    
+    // update the txid for the edit log
+    editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
 
     /* TODO(todd) Need to discuss whether we should force a re-save
      * of the image if one of the edits or images has an old format
@@ -640,13 +651,14 @@ public class FSImage implements Closeabl
 
   /**
    * Load the specified list of edit files into the image.
-   * @return true if the image should be re-saved
+   * @return the number of transactions loaded
    */
-  protected boolean loadEdits(List<File> editLogs) throws IOException {
+  protected long loadEdits(List<File> editLogs) throws IOException {
     LOG.debug("About to load edits:\n  " + Joiner.on("\n  ").join(editLogs));
-      
+
+    long startingTxId = getLastAppliedTxId() + 1;
+    
     FSEditLogLoader loader = new FSEditLogLoader(namesystem);
-    long startingTxId = storage.getMostRecentCheckpointTxId() + 1;
     int numLoaded = 0;
     // Load latest edits
     for (File edits : editLogs) {
@@ -655,17 +667,13 @@ public class FSImage implements Closeabl
       int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
       startingTxId += thisNumLoaded;
       numLoaded += thisNumLoaded;
+      lastAppliedTxId += thisNumLoaded;
       editIn.close();
     }
 
     // update the counts
     getFSNamesystem().dir.updateCountForINodeWithQuota();    
-    
-    // update the txid for the edit log
-    editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
-
-    // If we loaded any edits, need to save.
-    return numLoaded > 0;
+    return numLoaded;
   }
 
 
@@ -673,7 +681,7 @@ public class FSImage implements Closeabl
    * Load the image namespace from the given image file, verifying
    * it against the MD5 sum stored in its associated .md5 file.
    */
-  void loadFSImage(File imageFile) throws IOException {
+  private void loadFSImage(File imageFile) throws IOException {
     MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
     if (expectedMD5 == null) {
       throw new IOException("No MD5 file found corresponding to image file "
@@ -687,7 +695,7 @@ public class FSImage implements Closeabl
    * filenames and blocks.  Return whether we should
    * "re-save" and consolidate the edit-logs
    */
-  void loadFSImage(File curFile, MD5Hash expectedMd5) throws IOException {
+  private void loadFSImage(File curFile, MD5Hash expectedMd5) throws IOException {
     FSImageFormat.Loader loader = new FSImageFormat.Loader(
         conf, getFSNamesystem());
     loader.load(curFile);
@@ -704,8 +712,9 @@ public class FSImage implements Closeabl
     }
 
     long txId = loader.getLoadedImageTxId();
+    LOG.info("Loaded image for txid " + txId + " from " + curFile);
+    lastAppliedTxId = txId;
     storage.setMostRecentCheckpointTxId(txId);
-    editLog.setNextTxId(txId + 1);
   }
 
 
@@ -718,10 +727,10 @@ public class FSImage implements Closeabl
     
     FSImageFormat.Saver saver = new FSImageFormat.Saver();
     FSImageCompression compression = FSImageCompression.createCompression(conf);
-    saver.save(newFile, getFSNamesystem(), compression);
+    saver.save(newFile, txid, getFSNamesystem(), compression);
     
     MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
-    storage.setMostRecentCheckpointTxId(editLog.getLastWrittenTxId());
+    storage.setMostRecentCheckpointTxId(txid);
   }
 
   /**
@@ -784,7 +793,7 @@ public class FSImage implements Closeabl
     boolean editLogWasOpen = editLog.isOpen();
     
     if (editLogWasOpen) {
-      editLog.endCurrentLogSegment();
+      editLog.endCurrentLogSegment(true);
     }
     long imageTxId = editLog.getLastWrittenTxId();
     try {
@@ -793,7 +802,7 @@ public class FSImage implements Closeabl
       
     } finally {
       if (editLogWasOpen) {
-        editLog.startLogSegment(imageTxId + 1);
+        editLog.startLogSegment(imageTxId + 1, true);
         // Take this opportunity to note the current transaction
         storage.writeTransactionIdFileToStorage(imageTxId + 1);
       }
@@ -951,7 +960,6 @@ public class FSImage implements Closeabl
       // do not return image if there are no image directories
       needToReturnImg = false;
     CheckpointSignature sig = rollEditLog();
-    getEditLog().logJSpoolStart(bnReg, nnReg);
     return new CheckpointCommand(sig, isImgObsolete, needToReturnImg);
   }
 
@@ -1003,7 +1011,9 @@ public class FSImage implements Closeabl
   }
 
   synchronized public void close() throws IOException {
-    getEditLog().close();
+    if (editLog != null) { // 2NN doesn't have any edit log
+      getEditLog().close();
+    }
     storage.close();
   }
 
@@ -1054,4 +1064,8 @@ public class FSImage implements Closeabl
   public String getBlockPoolID() {
     return storage.getBlockPoolID();
   }
+
+  public synchronized long getLastAppliedTxId() {
+    return lastAppliedTxId;
+  }
 }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Thu Jul 14 18:53:11 2011
@@ -539,6 +539,7 @@ class FSImageFormat {
     }
 
     void save(File newFile,
+              long txid,
               FSNamesystem sourceNamesystem,
               FSImageCompression compression)
       throws IOException {
@@ -559,7 +560,6 @@ class FSImageFormat {
                      .getStorage().getNamespaceID()); // TODO bad dependency
         out.writeLong(fsDir.rootDir.numItemsInTree());
         out.writeLong(sourceNamesystem.getGenerationStamp());
-        long txid = sourceNamesystem.getEditLog().getLastWrittenTxId();
         out.writeLong(txid);
 
         // write compression info and set up compressed stream

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java?rev=1146845&r1=1146844&r2=1146845&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java Thu Jul 14 18:53:11 2011
@@ -106,14 +106,15 @@ class FSImageTransactionalStorageInspect
                    "not configured to contain images.");
         }
       }
+    }
+    
 
-      // Check for a seen_txid file, which marks a minimum transaction ID that
-      // must be included in our load plan.
-      try {
-        maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
-      } catch (IOException ioe) {
-        LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
-      }
+    // Check for a seen_txid file, which marks a minimum transaction ID that
+    // must be included in our load plan.
+    try {
+      maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
+    } catch (IOException ioe) {
+      LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
     }
     
     List<FoundEditLog> editLogs = matchEditLogs(filesInStorage);
@@ -215,14 +216,45 @@ class FSImageTransactionalStorageInspect
     }
 
     FoundFSImage recoveryImage = getLatestImage();
-    long expectedTxId = recoveryImage.txId + 1;
+    LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE);
+
+    return new TransactionalLoadPlan(recoveryImage,
+        logPlan);
+  }
+  
+  /**
+   * Plan which logs to load in order to bring the namespace up-to-date.
+   * Transactions will be considered in the range (sinceTxId, maxTxId]
+   * 
+   * @param sinceTxId the highest txid that is already loaded 
+   *                  (eg from the image checkpoint)
+   * @param maxStartTxId ignore any log files that start after this txid
+   */
+  LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException {
+    long expectedTxId = sinceTxId + 1;
     
     List<FoundEditLog> recoveryLogs = new ArrayList<FoundEditLog>();
     
-    SortedMap<Long, LogGroup> usefulGroups = logGroups.tailMap(expectedTxId);
-    LOG.debug("Excluded " + (logGroups.size() - usefulGroups.size()) + 
-        " groups of logs because they start with a txid less than image " +
-        "txid " + recoveryImage.txId);
+    SortedMap<Long, LogGroup> tailGroups = logGroups.tailMap(expectedTxId);
+    if (logGroups.size() > tailGroups.size()) {
+      LOG.debug("Excluded " + (logGroups.size() - tailGroups.size()) + 
+          " groups of logs because they start with a txid less than image " +
+          "txid " + sinceTxId);
+    }
+    
+    SortedMap<Long, LogGroup> usefulGroups;
+    if (maxStartTxId > sinceTxId) {
+      usefulGroups = tailGroups.headMap(maxStartTxId);
+    } else {
+      usefulGroups = new TreeMap<Long, LogGroup>();
+    }
+    
+    if (usefulGroups.size() > tailGroups.size()) {
+      LOG.debug("Excluded " + (tailGroups.size() - usefulGroups.size()) + 
+        " groups of logs because they start with a txid higher than max " +
+        "txid " + sinceTxId);
+    }
+
 
     for (Map.Entry<Long, LogGroup> entry : usefulGroups.entrySet()) {
       long logStartTxId = entry.getKey();
@@ -251,7 +283,7 @@ class FSImageTransactionalStorageInspect
     
     long lastLogGroupStartTxId = usefulGroups.isEmpty() ?
         0 : usefulGroups.lastKey();
-    if (maxSeenTxId > recoveryImage.txId &&
+    if (maxSeenTxId > sinceTxId &&
         maxSeenTxId > lastLogGroupStartTxId) {
       String msg = "At least one storage directory indicated it has seen a " +
         "log segment starting at txid " + maxSeenTxId;
@@ -263,9 +295,10 @@ class FSImageTransactionalStorageInspect
       }
       throw new IOException(msg);
     }
-
-    return new TransactionalLoadPlan(recoveryImage, recoveryLogs,
+    
+    return new LogLoadPlan(recoveryLogs,
         Lists.newArrayList(usefulGroups.values()));
+
   }
 
   @Override
@@ -595,23 +628,18 @@ class FSImageTransactionalStorageInspect
 
   static class TransactionalLoadPlan extends LoadPlan {
     final FoundFSImage image;
-    final List<FoundEditLog> editLogs;
-    final List<LogGroup> logGroupsToRecover;
+    final LogLoadPlan logPlan;
     
     public TransactionalLoadPlan(FoundFSImage image,
-        List<FoundEditLog> editLogs,
-        List<LogGroup> logGroupsToRecover) {
+        LogLoadPlan logPlan) {
       super();
       this.image = image;
-      this.editLogs = editLogs;
-      this.logGroupsToRecover = logGroupsToRecover;
+      this.logPlan = logPlan;
     }
 
     @Override
     boolean doRecovery() throws IOException {
-      for (LogGroup g : logGroupsToRecover) {
-        g.recover();
-      }
+      logPlan.doRecovery();
       return false;
     }
 
@@ -622,11 +650,7 @@ class FSImageTransactionalStorageInspect
 
     @Override
     List<File> getEditsFiles() {
-      List<File> ret = new ArrayList<File>();
-      for (FoundEditLog log : editLogs) {
-        ret.add(log.getFile());
-      }
-      return ret;
+      return logPlan.getEditsFiles();
     }
 
     @Override
@@ -634,4 +658,29 @@ class FSImageTransactionalStorageInspect
       return image.sd;
     }
   }
+  
+  static class LogLoadPlan {
+    final List<FoundEditLog> editLogs;
+    final List<LogGroup> logGroupsToRecover;
+    
+    LogLoadPlan(List<FoundEditLog> editLogs,
+        List<LogGroup> logGroupsToRecover) {
+      this.editLogs = editLogs;
+      this.logGroupsToRecover = logGroupsToRecover;
+    }
+
+    public void doRecovery() throws IOException {
+      for (LogGroup g : logGroupsToRecover) {
+        g.recover();
+      }
+    }
+
+    public List<File> getEditsFiles() {
+      List<File> ret = new ArrayList<File>();
+      for (FoundEditLog log : editLogs) {
+        ret.add(log.getFile());
+      }
+      return ret;
+    }
+  }
 }