You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/08/27 21:55:02 UTC

svn commit: r1377809 - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/qjournal/client/ src/main/java/org/apache/hadoop/hdfs/qjournal/server/ src/main/proto/ src/test/java/org/apache/hadoop/...

Author: todd
Date: Mon Aug 27 19:55:01 2012
New Revision: 1377809

URL: http://svn.apache.org/viewvc?rev=1377809&view=rev
Log:
HDFS-3845. Fixes for edge cases in QJM recovery protocol. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestSegmentRecoveryComparator.java
Modified:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1377809&r1=1377808&r2=1377809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Mon Aug 27 19:55:01 2012
@@ -32,3 +32,5 @@ HDFS-3823. QJM: TestQJMWithFaults fails 
 HDFS-3826. QJM: Some trivial logging / exception text improvements. (todd and atm)
 
 HDFS-3839. QJM: hadoop-daemon.sh should be updated to accept "journalnode" (eli)
+
+HDFS-3845. Fixes for edge cases in QJM recovery protocol (todd)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1377809&r1=1377808&r2=1377809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Mon Aug 27 19:55:01 2012
@@ -23,7 +23,6 @@ import java.net.URI;
 import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -52,7 +51,6 @@ import org.apache.hadoop.util.StringUtil
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Lists;
 
 /**
@@ -223,7 +221,7 @@ public class QuorumJournalManager implem
     // TODO: we should collect any "ties" and pass the URL for all of them
     // when syncing, so we can tolerate failure during recovery better.
     Entry<AsyncLogger, PrepareRecoveryResponseProto> bestEntry = Collections.max(
-        prepareResponses.entrySet(), RECOVERY_COMPARATOR); 
+        prepareResponses.entrySet(), SegmentRecoveryComparator.INSTANCE); 
     AsyncLogger bestLogger = bestEntry.getKey();
     PrepareRecoveryResponseProto bestResponse = bestEntry.getValue();
     
@@ -282,35 +280,6 @@ public class QuorumJournalManager implem
     loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs);
   }
   
-  private static final Comparator<Entry<AsyncLogger, PrepareRecoveryResponseProto>> RECOVERY_COMPARATOR =
-  new Comparator<Entry<AsyncLogger, PrepareRecoveryResponseProto>>() {
-      @Override
-      public int compare(
-          Entry<AsyncLogger, PrepareRecoveryResponseProto> a,
-          Entry<AsyncLogger, PrepareRecoveryResponseProto> b) {
-        
-        PrepareRecoveryResponseProto r1 = a.getValue();
-        PrepareRecoveryResponseProto r2 = b.getValue();
-        
-        if (r1.hasSegmentState() && r2.hasSegmentState()) {
-          assert r1.getSegmentState().getStartTxId() ==
-              r2.getSegmentState().getStartTxId() : "bad args: " + r1 + ", " + r2;
-        }
-        
-        return ComparisonChain.start()
-            // If one of them has accepted something and the other hasn't,
-            // use the one with an accepted recovery
-            .compare(r1.hasAcceptedInEpoch(), r2.hasAcceptedInEpoch())
-            // If they both accepted, use the one that's more recent
-            .compare(r1.getAcceptedInEpoch(),
-                     r2.getAcceptedInEpoch())
-            // Otherwise, choose based on which log is longer
-            .compare(r1.hasSegmentState(), r2.hasSegmentState())
-            .compare(r1.getSegmentState().getEndTxId(), r2.getSegmentState().getEndTxId())
-            .result();
-      }
-  };
-
   static List<AsyncLogger> createLoggers(Configuration conf,
       URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory)
           throws IOException {

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java?rev=1377809&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java Mon Aug 27 19:55:01 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.util.Comparator;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.primitives.Booleans;
+
+/**
+ * Compares responses to the prepareRecovery RPC. This is responsible for
+ * determining the correct length to recover.
+ */
+class SegmentRecoveryComparator
+    implements Comparator<Entry<AsyncLogger, PrepareRecoveryResponseProto>> {
+
+  static final SegmentRecoveryComparator INSTANCE = new SegmentRecoveryComparator();
+  
+  @Override
+  public int compare(
+      Entry<AsyncLogger, PrepareRecoveryResponseProto> a,
+      Entry<AsyncLogger, PrepareRecoveryResponseProto> b) {
+    
+    PrepareRecoveryResponseProto r1 = a.getValue();
+    PrepareRecoveryResponseProto r2 = b.getValue();
+    
+    // A response that has data for a segment is always better than one
+    // that doesn't.
+    if (r1.hasSegmentState() != r2.hasSegmentState()) {
+      return Booleans.compare(r1.hasSegmentState(), r2.hasSegmentState());
+    }
+    
+    if (!r1.hasSegmentState()) {
+      // Neither has a segment, so neither can be used for recover.
+      // Call them equal.
+      return 0;
+    }
+    
+    // They both have a segment.
+    SegmentStateProto r1Seg = r1.getSegmentState();
+    SegmentStateProto r2Seg = r2.getSegmentState();
+    
+    Preconditions.checkArgument(r1Seg.getStartTxId() == r2Seg.getStartTxId(),
+        "Should only be called with responses for corresponding segments: " +
+        "%s and %s do not have the same start txid.", r1, r2);
+
+    // If one is in-progress but the other is finalized,
+    // the finalized one is greater.
+    if (r1Seg.getIsInProgress() != r2Seg.getIsInProgress()) {
+      return Booleans.compare(!r1Seg.getIsInProgress(), !r2Seg.getIsInProgress());
+    }
+    
+    if (!r1Seg.getIsInProgress()) {
+      // If both are finalized, they should match lengths, and be considered
+      // equal
+      if (r1Seg.getEndTxId() != r2Seg.getEndTxId()  ||
+          !r1Seg.getMd5Sum().equals(r2Seg.getMd5Sum())) {
+        throw new AssertionError("finalized segs with different lengths: " + 
+            r1 + ", " + r2);
+      }
+      return 0;
+    }
+    
+    // Both are in-progress.
+    long r1SeenEpoch = Math.max(r1.getAcceptedInEpoch(), r1.getLastWriterEpoch());
+    long r2SeenEpoch = Math.max(r2.getAcceptedInEpoch(), r2.getLastWriterEpoch());
+    
+    return ComparisonChain.start()
+        .compare(r1SeenEpoch, r2SeenEpoch)
+        .compare(r1.getSegmentState().getEndTxId(), r2.getSegmentState().getEndTxId())
+        .result();
+  }
+}

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1377809&r1=1377808&r2=1377809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Mon Aug 27 19:55:01 2012
@@ -76,7 +76,17 @@ class Journal implements Closeable {
    * number of that writer is stored persistently on disk.
    */
   private PersistentLongFile lastPromisedEpoch;
+  
+  /**
+   * The epoch number of the last writer to actually write a transaction.
+   * This is used to differentiate log segments after a crash at the very
+   * beginning of a segment. See the the 'testNewerVersionOfSegmentWins'
+   * test case.
+   */
+  private PersistentLongFile lastWriterEpoch;
+  
   private static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
+  private static final String LAST_WRITER_EPOCH = "last-writer-epoch";
 
   private final FileJournalManager fjm;
 
@@ -86,6 +96,8 @@ class Journal implements Closeable {
     File currentDir = storage.getSingularStorageDir().getCurrentDir();
     this.lastPromisedEpoch = new PersistentLongFile(
         new File(currentDir, LAST_PROMISED_FILENAME), 0);
+    this.lastWriterEpoch = new PersistentLongFile(
+        new File(currentDir, LAST_WRITER_EPOCH), 0);
 
     this.fjm = storage.getJournalManager();
   }
@@ -201,7 +213,7 @@ class Journal implements Closeable {
   synchronized void journal(RequestInfo reqInfo,
       long segmentTxId, long firstTxnId,
       int numTxns, byte[] records) throws IOException {
-    checkRequest(reqInfo);
+    checkWriteRequest(reqInfo);
     checkFormatted();
     
     // TODO: if a JN goes down and comes back up, then it will throw
@@ -260,6 +272,16 @@ class Journal implements Closeable {
     // client
   }
   
+  private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOException {
+    checkRequest(reqInfo);
+    
+    if (reqInfo.getEpoch() != lastWriterEpoch.get()) {
+      throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
+          " is not the current writer epoch  " +
+          lastWriterEpoch.get());
+    }
+  }
+  
   private void checkFormatted() throws JournalNotFormattedException {
     if (!storage.isFormatted()) {
       throw new JournalNotFormattedException("Journal " + storage +
@@ -308,6 +330,18 @@ class Journal implements Closeable {
       }
     }
     
+    long curLastWriterEpoch = lastWriterEpoch.get();
+    if (curLastWriterEpoch != reqInfo.getEpoch()) {
+      LOG.info("Recording lastWriterEpoch = " + reqInfo.getEpoch());
+      lastWriterEpoch.set(reqInfo.getEpoch());
+    }
+
+    // The fact that we are starting a segment at this txid indicates
+    // that any previous recovery for this same segment was aborted.
+    // Otherwise, no writer would have started writing. So, we can
+    // remove the record of the older segment here.
+    purgePaxosDecision(txid);
+    
     curSegment = fjm.startLogSegment(txid);
     curSegmentTxId = txid;
     nextTxId = txid;
@@ -350,6 +384,12 @@ class Journal implements Closeable {
           "Trying to re-finalize already finalized log " +
               elf + " with different endTxId " + endTxId);
     }
+
+    // Once logs are finalized, a different length will never be decided.
+    // During recovery, we treat a finalized segment the same as an accepted
+    // recovery. Thus, we no longer need to keep track of the previously-
+    // accepted decision. The existence of the finalized log segment is enough.
+    purgePaxosDecision(elf.getFirstTxId());
   }
   
   /**
@@ -364,6 +404,21 @@ class Journal implements Closeable {
     purgePaxosDecisionsOlderThan(minTxIdToKeep);
   }
   
+  /**
+   * Remove the previously-recorded 'accepted recovery' information
+   * for a given log segment, once it is no longer necessary. 
+   * @param segmentTxId the transaction ID to purge
+   * @throws IOException if the file could not be deleted
+   */
+  private void purgePaxosDecision(long segmentTxId) throws IOException {
+    File paxosFile = storage.getPaxosFile(segmentTxId);
+    if (paxosFile.exists()) {
+      if (!paxosFile.delete()) {
+        throw new IOException("Unable to delete paxos file " + paxosFile);
+      }
+    }
+  }
+
   private void purgePaxosDecisionsOlderThan(long minTxIdToKeep)
       throws IOException {
     File dir = storage.getPaxosDir();
@@ -442,18 +497,29 @@ class Journal implements Closeable {
     
     PrepareRecoveryResponseProto.Builder builder =
         PrepareRecoveryResponseProto.newBuilder();
+
+    SegmentStateProto segInfo = getSegmentInfo(segmentTxId);
+    boolean hasFinalizedSegment = segInfo != null && !segInfo.getIsInProgress();
     
     PersistedRecoveryPaxosData previouslyAccepted = getPersistedPaxosData(segmentTxId);
-    if (previouslyAccepted != null) {
+
+    if (previouslyAccepted != null && !hasFinalizedSegment) {
+      SegmentStateProto acceptedState = previouslyAccepted.getSegmentState();
+      assert acceptedState.getEndTxId() == segInfo.getEndTxId() &&
+             acceptedState.getMd5Sum().equals(segInfo.getMd5Sum()) :
+            "prev accepted: " + TextFormat.shortDebugString(previouslyAccepted)+ "\n" +
+            "on disk:       " + TextFormat.shortDebugString(segInfo);
+            
       builder.setAcceptedInEpoch(previouslyAccepted.getAcceptedInEpoch())
         .setSegmentState(previouslyAccepted.getSegmentState());
     } else {
-      SegmentStateProto segInfo = getSegmentInfo(segmentTxId);
       if (segInfo != null) {
         builder.setSegmentState(segInfo);
       }
     }
     
+    builder.setLastWriterEpoch(lastWriterEpoch.get());
+    
     PrepareRecoveryResponseProto resp = builder.build();
     LOG.info("Prepared recovery for segment " + segmentTxId + ": " +
         TextFormat.shortDebugString(resp));

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto?rev=1377809&r1=1377808&r2=1377809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto Mon Aug 27 19:55:01 2012
@@ -162,6 +162,7 @@ message PrepareRecoveryRequestProto {
 message PrepareRecoveryResponseProto {
   optional SegmentStateProto segmentState = 1;
   optional uint64 acceptedInEpoch = 2;
+  required uint64 lastWriterEpoch = 3;
 }
 
 /**

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1377809&r1=1377808&r2=1377809&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java Mon Aug 27 19:55:01 2012
@@ -30,9 +30,9 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
 import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
@@ -533,6 +534,212 @@ public class TestQuorumJournalManager {
     checkRecovery(cluster, 1, 4);
   }
   
+  /**
+   * Set up the following tricky edge case state which is used by
+   * multiple tests:
+   * 
+   * Initial writer:
+   * - Writing to 3 JNs: JN0, JN1, JN2:
+   * - A log segment with txnid 1 through 100 succeeds.
+   * - The first transaction in the next segment only goes to JN0
+   *   before the writer crashes (eg it is partitioned)
+   *   
+   * Recovery by another writer:
+   * - The new NN starts recovery and talks to all three. Thus, it sees
+   *   that the newest log segment which needs recovery is 101.
+   * - It sends the prepareRecovery(101) call, and decides that the
+   *   recovery length for 101 is only the 1 transaction.
+   * - It sends acceptRecovery(101-101) to only JN0, before crashing
+   * 
+   * This yields the following state:
+   * - JN0: 1-100 finalized, 101_inprogress, accepted recovery: 101-101
+   * - JN1: 1-100 finalized, 101_inprogress.empty
+   * - JN2: 1-100 finalized, 101_inprogress.empty
+   *  (the .empty files got moved aside during recovery)
+   * @throws Exception 
+   */
+  private void setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery() throws Exception {
+    // Log segment with txns 1-100 succeeds 
+    writeSegment(cluster, qjm, 1, 100, true);
+
+    // startLogSegment only makes it to one of the three nodes
+    failLoggerAtTxn(spies.get(1), 101);
+    failLoggerAtTxn(spies.get(2), 101);
+    
+    try {
+      writeSegment(cluster, qjm, 101, 1, true);
+      fail("Should have failed");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains("mock failure", qe);
+    } finally {
+      qjm.close();
+    }
+    
+    // Recovery 1:
+    // make acceptRecovery() only make it to the node which has txid 101
+    // this should fail because only 1/3 accepted the recovery
+    qjm = createSpyingQJM();
+    spies = qjm.getLoggerSetForTests().getLoggersForTests();
+    futureThrows(new IOException("mock failure")).when(spies.get(1))
+      .acceptRecovery(Mockito.<SegmentStateProto>any(), Mockito.<URL>any());
+    futureThrows(new IOException("mock failure")).when(spies.get(2))
+      .acceptRecovery(Mockito.<SegmentStateProto>any(), Mockito.<URL>any());
+    
+    try {
+      qjm.recoverUnfinalizedSegments();
+      fail("Should have failed to recover");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains("mock failure", qe);
+    } finally {
+      qjm.close();
+    }
+    
+    // Check that we have entered the expected state as described in the
+    // method javadoc.
+    GenericTestUtils.assertGlobEquals(cluster.getCurrentDir(0, JID),
+        "edits_.*",
+        NNStorage.getFinalizedEditsFileName(1, 100),
+        NNStorage.getInProgressEditsFileName(101));
+    GenericTestUtils.assertGlobEquals(cluster.getCurrentDir(1, JID),
+        "edits_.*",
+        NNStorage.getFinalizedEditsFileName(1, 100),
+        NNStorage.getInProgressEditsFileName(101) + ".empty");
+    GenericTestUtils.assertGlobEquals(cluster.getCurrentDir(2, JID),
+        "edits_.*",
+        NNStorage.getFinalizedEditsFileName(1, 100),
+        NNStorage.getInProgressEditsFileName(101) + ".empty");
+
+    File paxos0 = new File(cluster.getCurrentDir(0, JID), "paxos");
+    File paxos1 = new File(cluster.getCurrentDir(1, JID), "paxos");
+    File paxos2 = new File(cluster.getCurrentDir(2, JID), "paxos");
+    
+    GenericTestUtils.assertGlobEquals(paxos0, ".*", "101");
+    GenericTestUtils.assertGlobEquals(paxos1, ".*");
+    GenericTestUtils.assertGlobEquals(paxos2, ".*");
+  }
+  
+  /**
+   * Test an edge case discovered by randomized testing.
+   * 
+   * Starts with the edge case state set up by
+   * {@link #setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery()}
+   * 
+   * Recovery 2:
+   * - New NN starts recovery and only talks to JN1 and JN2. JN0 has
+   *   crashed. Since they have no logs open, they say they don't need
+   *   recovery.
+   * - Starts writing segment 101, and writes 50 transactions before crashing.
+   *
+   * Recovery 3:
+   * - JN0 has come back to life.
+   * - New NN starts recovery and talks to all three. All three have
+   *   segments open from txid 101, so it calls prepareRecovery(101)
+   * - JN0 has an already-accepted value for segment 101, so it replies
+   *   "you should recover 101-101"
+   * - Former incorrect behavior: NN truncates logs to txid 101 even though
+   *   it should have recovered through 150.
+   *   
+   * In this case, even though there is an accepted recovery decision,
+   * the newer log segments should take precedence, since they were written
+   * in a newer epoch than the recorded decision.
+   */
+  @Test
+  public void testNewerVersionOfSegmentWins() throws Exception {
+    setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery();
+    
+    // Now start writing again without JN0 present:
+    cluster.getJournalNode(0).stopAndJoin(0);
+    
+    qjm = createSpyingQJM();
+    try {
+      assertEquals(100, QJMTestUtil.recoverAndReturnLastTxn(qjm));
+      
+      // Write segment but do not finalize
+      writeSegment(cluster, qjm, 101, 50, false);
+    } finally {
+      qjm.close();
+    }
+    
+    // Now try to recover a new writer, with JN0 present,
+    // and ensure that all of the above-written transactions are recovered.
+    cluster.restartJournalNode(0);
+    qjm = createSpyingQJM();
+    try {
+      assertEquals(150, QJMTestUtil.recoverAndReturnLastTxn(qjm));
+    } finally {
+      qjm.close();
+    }
+  }
+  
+  /**
+   * Test another edge case discovered by randomized testing.
+   * 
+   * Starts with the edge case state set up by
+   * {@link #setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery()}
+   * 
+   * Recovery 2:
+   * - New NN starts recovery and only talks to JN1 and JN2. JN0 has
+   *   crashed. Since they have no logs open, they say they don't need
+   *   recovery.
+   * - Before writing any transactions, JN0 comes back to life and
+   *   JN1 crashes.
+   * - Starts writing segment 101, and writes 50 transactions before crashing.
+   *
+   * Recovery 3:
+   * - JN1 has come back to life. JN2 crashes.
+   * - New NN starts recovery and talks to all three. All three have
+   *   segments open from txid 101, so it calls prepareRecovery(101)
+   * - JN0 has an already-accepted value for segment 101, so it replies
+   *   "you should recover 101-101"
+   * - Former incorrect behavior: NN truncates logs to txid 101 even though
+   *   it should have recovered through 150.
+   *   
+   * In this case, even though there is an accepted recovery decision,
+   * the newer log segments should take precedence, since they were written
+   * in a newer epoch than the recorded decision.
+   */
+  @Test
+  public void testNewerVersionOfSegmentWins2() throws Exception {
+    setupEdgeCaseOneJnHasSegmentWithAcceptedRecovery();
+
+    // Recover without JN0 present.
+    cluster.getJournalNode(0).stopAndJoin(0);
+    
+    qjm = createSpyingQJM();
+    try {
+      assertEquals(100, QJMTestUtil.recoverAndReturnLastTxn(qjm));
+
+      // After recovery, JN0 comes back to life and JN1 crashes.
+      cluster.restartJournalNode(0);
+      cluster.getJournalNode(1).stopAndJoin(0);
+      
+      // Write segment but do not finalize
+      writeSegment(cluster, qjm, 101, 50, false);
+    } finally {
+      qjm.close();
+    }
+    
+    // State:
+    // JN0: 1-100 finalized, 101_inprogress (txns up to 150)
+    // Previously, JN0 had an accepted recovery 101-101 from an earlier recovery
+    // attempt.
+    // JN1: 1-100 finalized
+    // JN2: 1-100 finalized, 101_inprogress (txns up to 150)
+    
+    // We need to test that the accepted recovery 101-101 on JN0 doesn't
+    // end up truncating the log back to 101.
+
+    cluster.restartJournalNode(1);
+    cluster.getJournalNode(2).stopAndJoin(0);
+
+    qjm = createSpyingQJM();
+    try {
+      assertEquals(150, QJMTestUtil.recoverAndReturnLastTxn(qjm));
+    } finally {
+      qjm.close();
+    }
+  }
+  
   @Test
   public void testPurgeLogs() throws Exception {
     for (int txid = 1; txid <= 5; txid++) {

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestSegmentRecoveryComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestSegmentRecoveryComparator.java?rev=1377809&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestSegmentRecoveryComparator.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestSegmentRecoveryComparator.java Mon Aug 27 19:55:01 2012
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+
+import static org.apache.hadoop.hdfs.qjournal.client.SegmentRecoveryComparator.INSTANCE;
+
+public class TestSegmentRecoveryComparator {
+  
+  private static Map.Entry<AsyncLogger, PrepareRecoveryResponseProto> makeEntry(
+      PrepareRecoveryResponseProto proto) {
+    return Maps.immutableEntry(Mockito.mock(AsyncLogger.class), proto);
+  }
+  
+  @Test
+  public void testComparisons() {
+    Entry<AsyncLogger, PrepareRecoveryResponseProto> INPROGRESS_1_3 =
+        makeEntry(PrepareRecoveryResponseProto.newBuilder()
+          .setSegmentState(SegmentStateProto.newBuilder()
+              .setStartTxId(1L)
+              .setEndTxId(3L)
+              .setMd5Sum(ByteString.EMPTY)
+              .setIsInProgress(true))
+          .setLastWriterEpoch(0L)
+          .build());
+    Entry<AsyncLogger, PrepareRecoveryResponseProto> INPROGRESS_1_4 =
+        makeEntry(PrepareRecoveryResponseProto.newBuilder()
+          .setSegmentState(SegmentStateProto.newBuilder()
+              .setStartTxId(1L)
+              .setEndTxId(4L)
+              .setMd5Sum(ByteString.EMPTY)
+              .setIsInProgress(true))
+          .setLastWriterEpoch(0L)
+          .build());
+    Entry<AsyncLogger, PrepareRecoveryResponseProto> INPROGRESS_1_4_ACCEPTED =
+        makeEntry(PrepareRecoveryResponseProto.newBuilder()
+          .setSegmentState(SegmentStateProto.newBuilder()
+              .setStartTxId(1L)
+              .setEndTxId(4L)
+              .setMd5Sum(ByteString.EMPTY)
+              .setIsInProgress(true))
+          .setLastWriterEpoch(0L)
+          .setAcceptedInEpoch(1L)
+          .build());
+
+    Entry<AsyncLogger, PrepareRecoveryResponseProto> FINALIZED_1_3 =
+        makeEntry(PrepareRecoveryResponseProto.newBuilder()
+          .setSegmentState(SegmentStateProto.newBuilder()
+              .setStartTxId(1L)
+              .setEndTxId(3L)
+              .setMd5Sum(ByteString.EMPTY)
+              .setIsInProgress(false))
+          .setLastWriterEpoch(0L)
+          .build());
+
+    // Should compare equal to itself
+    assertEquals(0, INSTANCE.compare(INPROGRESS_1_3, INPROGRESS_1_3));
+    
+    // Longer log wins.
+    assertEquals(-1, INSTANCE.compare(INPROGRESS_1_3, INPROGRESS_1_4));
+    assertEquals(1, INSTANCE.compare(INPROGRESS_1_4, INPROGRESS_1_3));
+    
+    // Finalized log wins even over a longer in-progress
+    assertEquals(-1, INSTANCE.compare(INPROGRESS_1_4, FINALIZED_1_3));
+    assertEquals(1, INSTANCE.compare(FINALIZED_1_3, INPROGRESS_1_4));
+
+    // Finalized log wins even if the in-progress one has an accepted
+    // recovery proposal.
+    assertEquals(-1, INSTANCE.compare(INPROGRESS_1_4_ACCEPTED, FINALIZED_1_3));
+    assertEquals(1, INSTANCE.compare(FINALIZED_1_3, INPROGRESS_1_4_ACCEPTED));
+  }
+}