You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/08/15 02:57:25 UTC

svn commit: r1373183 - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/qjournal/client/ src/main/java/org/apache/hadoop/hdfs/qjournal/server/ src/main/java/org/apache/hadoop/hdfs/server/name...

Author: todd
Date: Wed Aug 15 00:57:24 2012
New Revision: 1373183

URL: http://svn.apache.org/viewvc?rev=1373183&view=rev
Log:
HDFS-3799. QJM: handle empty log segments during recovery. Contributed by Todd Lipcon.

Modified:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Wed Aug 15 00:57:24 2012
@@ -20,3 +20,5 @@ HDFS-3793. Implement genericized format(
 HDFS-3795. QJM: validate journal dir at startup (todd)
 
 HDFS-3798. Avoid throwing NPE when finalizeSegment() is called on invalid segment (todd)
+
+HDFS-3799. QJM: handle empty log segments during recovery (todd)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Wed Aug 15 00:57:24 2012
@@ -23,7 +23,9 @@ import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -111,15 +113,8 @@ public class IPCLoggerChannel implements
         DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT);
     
     executor = MoreExecutors.listeningDecorator(
-        Executors.newSingleThreadExecutor(
-          new ThreadFactoryBuilder()
-            .setDaemon(true)
-            .setNameFormat("Logger channel to " + addr)
-            .setUncaughtExceptionHandler(
-                UncaughtExceptionHandlers.systemExit())
-            .build()));
+        createExecutor());
   }
-  
   @Override
   public synchronized void setEpoch(long epoch) {
     this.epoch = epoch;
@@ -154,6 +149,21 @@ public class IPCLoggerChannel implements
     return new QJournalProtocolTranslatorPB(pbproxy);
   }
   
+  
+  /**
+   * Separated out for easy overriding in tests.
+   */
+  @VisibleForTesting
+  protected ExecutorService createExecutor() {
+    return Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder()
+          .setDaemon(true)
+          .setNameFormat("Logger channel to " + addr)
+          .setUncaughtExceptionHandler(
+              UncaughtExceptionHandlers.systemExit())
+          .build());
+  }
+  
   @Override
   public URL buildURLToFetchLogs(long segmentTxId) {
     Preconditions.checkArgument(segmentTxId > 0,

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Wed Aug 15 00:57:24 2012
@@ -235,15 +235,30 @@ public class QuorumJournalManager implem
     } else if (bestResponse.hasSegmentState()) {
       LOG.info("Using longest log: " + bestEntry);
     } else {
-      // TODO: can we get here? what about the following case:
-      // - 3 JNs, JN1, JN2, JN3
-      // - writer starts segment 101 on JN1, then crashes
-      // - during newEpoch(), we saw the segment on JN1 and decide to recover segment 101
-      // - during prepare(), JN1 has actually crashed, and we only talk to JN2 and JN3,
+      // None of the responses to prepareRecovery() had a segment at the given
+      // txid. This can happen for example in the following situation:
+      // - 3 JNs: JN1, JN2, JN3
+      // - writer starts segment 101 on JN1, then crashes before
+      //   writing to JN2 and JN3
+      // - during newEpoch(), we saw the segment on JN1 and decide to
+      //   recover segment 101
+      // - before prepare(), JN1 crashes, and we only talk to JN2 and JN3,
       //   neither of which has any entry for this log.
-      // Write a test case.
-      throw new AssertionError("None of the responses " +
-          "had a log to recover: " + QuorumCall.mapToString(prepareResponses));
+      // In this case, it is allowed to do nothing for recovery, since the
+      // segment wasn't started on a quorum of nodes.
+
+      // Sanity check: we should only get here if none of the responses had
+      // a log. This should be a postcondition of the recovery comparator,
+      // but a bug in the comparator might cause us to get here.
+      for (PrepareRecoveryResponseProto resp : prepareResponses.values()) {
+        assert !resp.hasSegmentState() :
+          "One of the loggers had a response, but no best logger " +
+          "was found.";
+      }
+
+      LOG.info("None of the responders had a log to recover: " +
+          QuorumCall.mapToString(prepareResponses));
+      return;
     }
     
     

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Wed Aug 15 00:57:24 2012
@@ -101,9 +101,21 @@ class Journal implements Closeable {
     }
     LOG.info("Scanning storage " + fjm);
     List<EditLogFile> files = fjm.getLogFiles(0);
-    if (!files.isEmpty()) {
-      EditLogFile latestLog = files.get(files.size() - 1);
-      LOG.info("Latest log is " + latestLog);
+    if (files.isEmpty()) {
+      curSegmentTxId = HdfsConstants.INVALID_TXID;
+      return;
+    }
+    
+    EditLogFile latestLog = files.get(files.size() - 1);
+    latestLog.validateLog();
+    LOG.info("Latest log is " + latestLog);
+    if (latestLog.getLastTxId() == HdfsConstants.INVALID_TXID) {
+      // the log contains no transactions
+      LOG.warn("Latest log " + latestLog + " has no transactions. " +
+          "moving it aside");
+      latestLog.moveAsideEmptyFile();
+      curSegmentTxId = HdfsConstants.INVALID_TXID;
+    } else {
       curSegmentTxId = latestLog.getFirstTxId();
     }
   }
@@ -166,6 +178,7 @@ class Journal implements Closeable {
     if (curSegment != null) {
       curSegment.close();
       curSegment = null;
+      curSegmentTxId = HdfsConstants.INVALID_TXID;
     }
     
     NewEpochResponseProto.Builder builder =
@@ -248,10 +261,37 @@ class Journal implements Closeable {
     checkRequest(reqInfo);
     checkFormatted();
     
-    Preconditions.checkState(curSegment == null,
-        "Can't start a log segment, already writing " + curSegment);
-    Preconditions.checkState(nextTxId == txid || nextTxId == HdfsConstants.INVALID_TXID,
-        "Can't start log segment " + txid + " expecting nextTxId=" + nextTxId);
+    if (curSegment != null) {
+      LOG.warn("Client is requesting a new log segment " + txid + 
+          " though we are already writing " + curSegment + ". " +
+          "Aborting the current segment in order to begin the new one.");
+      // The writer may have lost a connection to us and is now
+      // re-connecting after the connection came back.
+      // We should abort our own old segment.
+      curSegment.abort();
+      curSegment = null;
+    }
+
+    // Paranoid sanity check: we should never overwrite a finalized log file.
+    // Additionally, if it's in-progress, it should have at most 1 transaction.
+    // This can happen if the writer crashes exactly at the start of a segment.
+    EditLogFile existing = fjm.getLogFile(txid);
+    if (existing != null) {
+      if (!existing.isInProgress()) {
+        throw new IllegalStateException("Already have a finalized segment " +
+            existing + " beginning at " + txid);
+      }
+      
+      // If it's in-progress, it should only contain one transaction,
+      // because the "startLogSegment" transaction is written alone at the
+      // start of each segment. 
+      existing.validateLog();
+      if (existing.getLastTxId() != existing.getFirstTxId()) {
+        throw new IllegalStateException("The log file " +
+            existing + " seems to contain valid transactions");
+      }
+    }
+    
     curSegment = fjm.startLogSegment(txid);
     curSegmentTxId = txid;
     nextTxId = txid;
@@ -360,9 +400,10 @@ class Journal implements Closeable {
       elf.validateLog();
     }
     if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) {
-      // no transactions in file
-      throw new AssertionError("TODO: no transactions in file " +
-          elf);
+      LOG.info("Edit log file " + elf + " appears to be empty. " +
+          "Moving it aside...");
+      elf.moveAsideEmptyFile();
+      return null;
     }
     SegmentStateProto ret = SegmentStateProto.newBuilder()
         .setStartTxId(segmentTxId)
@@ -433,13 +474,16 @@ class Journal implements Closeable {
     }
 
     SegmentStateProto currentSegment = getSegmentInfo(segmentTxId);
-    // TODO: this can be null, in the case that one of the loggers started
-    // the next segment, but others did not! add regression test and null
-    // check in next condition below.
-    
-    // TODO: what if they have the same length but one is finalized and the
-    // other isn't! cover that case.
-    if (currentSegment.getEndTxId() != segment.getEndTxId()) {
+    if (currentSegment == null ||
+        currentSegment.getEndTxId() != segment.getEndTxId()) {
+      if (currentSegment == null) {
+        LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
+            ": no current segment in place");
+      } else {
+        LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
+            ": old segment " + TextFormat.shortDebugString(segment) + " is " +
+            "not the right length");
+      }
       syncLog(reqInfo, segment, fromUrl);
     } else {
       LOG.info("Skipping download of log " +
@@ -481,14 +525,6 @@ class Journal implements Closeable {
     try {
       success = tmpFile.renameTo(storage.getInProgressEditLog(
           segment.getStartTxId()));
-      if (success) {
-        // If we're synchronizing the latest segment, update our cached
-        // info.
-        // TODO: can this be done more generally?
-        if (curSegmentTxId == segment.getStartTxId()) {
-          nextTxId = segment.getEndTxId() + 1;
-        }
-      }
     } finally {
       if (!success) {
         if (!tmpFile.delete()) {

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Wed Aug 15 00:57:24 2012
@@ -460,7 +460,7 @@ public class FileJournalManager implemen
       renameSelf(".corrupt");
     }
 
-    void moveAsideEmptyFile() throws IOException {
+    public void moveAsideEmptyFile() throws IOException {
       assert lastTxId == HdfsConstants.INVALID_TXID;
       renameSelf(".empty");
     }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java Wed Aug 15 00:57:24 2012
@@ -30,6 +30,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,6 +56,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * Functional tests for QuorumJournalManager.
@@ -206,6 +209,124 @@ public class TestQuorumJournalManager {
     }
   }
   
+  /**
+   * Test the case where the NN crashes after starting a new segment
+   * on all nodes, but before writing the first transaction to it.
+   */
+  @Test
+  public void testCrashAtBeginningOfSegment() throws Exception {
+    writeSegment(cluster, qjm, 1, 3, true);
+    waitForAllPendingCalls(qjm.getLoggerSetForTests());
+    
+    EditLogOutputStream stm = qjm.startLogSegment(4);
+    try {
+      waitForAllPendingCalls(qjm.getLoggerSetForTests());
+    } finally {
+      stm.abort();
+    }
+    
+    
+    // Make a new QJM
+    qjm = new QuorumJournalManager(
+        conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO);
+    qjm.recoverUnfinalizedSegments();
+    checkRecovery(cluster, 1, 3);
+
+    writeSegment(cluster, qjm, 4, 3, true);
+  }
+  
+  @Test
+  public void testOutOfSyncAtBeginningOfSegment0() throws Exception {
+    doTestOutOfSyncAtBeginningOfSegment(0);
+  }
+  
+  @Test
+  public void testOutOfSyncAtBeginningOfSegment1() throws Exception {
+    doTestOutOfSyncAtBeginningOfSegment(1);
+  }
+
+  @Test
+  public void testOutOfSyncAtBeginningOfSegment2() throws Exception {
+    doTestOutOfSyncAtBeginningOfSegment(2);
+  }
+  
+  /**
+   * Test the case where, at the beginning of a segment, transactions
+   * have been written to one JN but not others.
+   */
+  public void doTestOutOfSyncAtBeginningOfSegment(int nodeWithOneTxn)
+      throws Exception {
+    
+    int nodeWithEmptySegment = (nodeWithOneTxn + 1) % 3;
+    int nodeMissingSegment = (nodeWithOneTxn + 2) % 3;
+    
+    writeSegment(cluster, qjm, 1, 3, true);
+    waitForAllPendingCalls(qjm.getLoggerSetForTests());
+    cluster.getJournalNode(nodeMissingSegment).stopAndJoin(0);
+    
+    // Open segment on 2/3 nodes
+    EditLogOutputStream stm = qjm.startLogSegment(4);
+    try {
+      waitForAllPendingCalls(qjm.getLoggerSetForTests());
+      
+      // Write transactions to only 1/3 nodes
+      failLoggerAtTxn(spies.get(nodeWithEmptySegment), 4);
+      try {
+        writeTxns(stm, 4, 1);
+        fail("Did not fail even though 2/3 failed");
+      } catch (QuorumException qe) {
+        GenericTestUtils.assertExceptionContains("mock failure", qe);
+      }
+    } finally {
+      stm.abort();
+    }
+    
+    // Bring back the down JN.
+    cluster.restartJournalNode(nodeMissingSegment);
+    
+    // Make a new QJM. At this point, the state is as follows:
+    // A: nodeWithEmptySegment: 1-3 finalized, 4_inprogress (empty)    
+    // B: nodeWithOneTxn:       1-3 finalized, 4_inprogress (1 txn)
+    // C: nodeMissingSegment:   1-3 finalized
+    GenericTestUtils.assertGlobEquals(
+        cluster.getCurrentDir(nodeWithEmptySegment, JID),
+        "edits_.*",
+        NNStorage.getFinalizedEditsFileName(1, 3),
+        NNStorage.getInProgressEditsFileName(4));
+    GenericTestUtils.assertGlobEquals(
+        cluster.getCurrentDir(nodeWithOneTxn, JID),
+        "edits_.*",
+        NNStorage.getFinalizedEditsFileName(1, 3),
+        NNStorage.getInProgressEditsFileName(4));
+    GenericTestUtils.assertGlobEquals(
+        cluster.getCurrentDir(nodeMissingSegment, JID),
+        "edits_.*",
+        NNStorage.getFinalizedEditsFileName(1, 3));
+    
+
+    // Stop one of the nodes. Since we run this test three
+    // times, rotating the roles of the nodes, we'll test
+    // all the permutations.
+    cluster.getJournalNode(2).stopAndJoin(0);
+  
+    qjm = createSpyingQJM();
+    qjm.recoverUnfinalizedSegments();
+    
+    if (nodeWithOneTxn == 0 ||
+        nodeWithOneTxn == 1) {
+      // If the node that had the transaction committed was one of the nodes
+      // that responded during recovery, then we should have recovered txid
+      // 4.
+      checkRecovery(cluster, 4, 4);
+      writeSegment(cluster, qjm, 5, 3, true);
+    } else {
+      // Otherwise, we should have recovered only 1-3 and should be able to
+      // start a segment at 4.
+      checkRecovery(cluster, 1, 3);
+      writeSegment(cluster, qjm, 4, 3, true);
+    }
+  }
+
   
   /**
    * Test case where a new writer picks up from an old one with no failures
@@ -408,8 +529,15 @@ public class TestQuorumJournalManager {
       @Override
       public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
           String journalId, InetSocketAddress addr) {
-        return Mockito.spy(IPCLoggerChannel.FACTORY.createLogger(
-            conf, nsInfo, journalId, addr));
+        AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) {
+          protected ExecutorService createExecutor() {
+            // Don't parallelize calls to the quorum in the tests.
+            // This makes the tests more deterministic.
+            return MoreExecutors.sameThreadExecutor();
+          }
+        };
+        
+        return Mockito.spy(logger);
       }
     };
     return new QuorumJournalManager(

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java Wed Aug 15 00:57:24 2012
@@ -204,6 +204,89 @@ public class TestJournal {
           "No log file to finalize at transaction ID 1000", ise);
     }
   }
+
+  /**
+   * Assume that a client is writing to a journal, but loses its connection
+   * in the middle of a segment. Thus, any future journal() calls in that
+   * segment may fail, because some txns were missed while the connection was
+   * down.
+   *
+   * Eventually, the connection comes back, and the NN tries to start a new
+   * segment at a higher txid. This should abort the old one and succeed.
+   */
+  @Test
+  public void testAbortOldSegmentIfFinalizeIsMissed() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+    
+    // Start a segment at txid 1, and write a batch of 3 txns.
+    journal.startLogSegment(makeRI(1), 1);
+    journal.journal(makeRI(2), 1, 3,
+        QJMTestUtil.createTxnData(1, 3));
+
+    GenericTestUtils.assertExists(
+        journal.getStorage().getInProgressEditLog(1));
+    
+    // Try to start new segment at txid 6, this should abort old segment and
+    // then succeed, allowing us to write txid 6-9.
+    journal.startLogSegment(makeRI(3), 6);
+    journal.journal(makeRI(4), 6, 3,
+        QJMTestUtil.createTxnData(6, 3));
+
+    // The old segment should *not* be finalized.
+    GenericTestUtils.assertExists(
+        journal.getStorage().getInProgressEditLog(1));
+    GenericTestUtils.assertExists(
+        journal.getStorage().getInProgressEditLog(6));
+  }
+  
+  /**
+   * Test behavior of startLogSegment() when a segment with the
+   * same transaction ID already exists.
+   */
+  @Test
+  public void testStartLogSegmentWhenAlreadyExists() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+    
+    // Start a segment at txid 1, and write just 1 transaction. This
+    // would normally be the START_LOG_SEGMENT transaction.
+    journal.startLogSegment(makeRI(1), 1);
+    journal.journal(makeRI(2), 1, 1,
+        QJMTestUtil.createTxnData(1, 1));
+    
+    // Try to start new segment at txid 1, this should succeed, because
+    // we are allowed to re-start a segment if we only ever had the
+    // START_LOG_SEGMENT transaction logged.
+    journal.startLogSegment(makeRI(3), 1);
+    journal.journal(makeRI(4), 1, 1,
+        QJMTestUtil.createTxnData(1, 1));
+
+    // This time through, write more transactions afterwards, simulating
+    // real user transactions.
+    journal.journal(makeRI(5), 2, 3,
+        QJMTestUtil.createTxnData(2, 3));
+
+    try {
+      journal.startLogSegment(makeRI(6), 1);
+      fail("Did not fail to start log segment which would overwrite " +
+          "an existing one");
+    } catch (IllegalStateException ise) {
+      GenericTestUtils.assertExceptionContains(
+          "seems to contain valid transactions", ise);
+    }
+    
+    journal.finalizeLogSegment(makeRI(7), 1, 4);
+    
+    // Ensure that we cannot overwrite a finalized segment
+    try {
+      journal.startLogSegment(makeRI(8), 1);
+      fail("Did not fail to start log segment which would overwrite " +
+          "an existing one");
+    } catch (IllegalStateException ise) {
+      GenericTestUtils.assertExceptionContains(
+          "have a finalized segment", ise);
+    }
+
+  }
   
   private static RequestInfo makeRI(int serial) {
     return new RequestInfo(JID, 1, serial);

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1373183&r1=1373182&r2=1373183&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Wed Aug 15 00:57:24 2012
@@ -118,7 +118,9 @@ public class TestJournalNode {
     ch.startLogSegment(3).get();
     response = ch.newEpoch(4).get();
     ch.setEpoch(4);
-    assertEquals(3, response.getLastSegmentTxId());
+    // Because the new segment is empty, it is equivalent to not having
+    // started writing it.
+    assertEquals(0, response.getLastSegmentTxId());
   }
   
   @Test