You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2012/10/16 03:59:32 UTC

svn commit: r1398608 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/resources/ src/test/java/org/apache/hadoop/hdfs/server/n...

Author: atm
Date: Tue Oct 16 01:59:31 2012
New Revision: 1398608

URL: http://svn.apache.org/viewvc?rev=1398608&view=rev
Log:
HDFS-2946. HA: Put a cap on the number of completed edits files retained by the NN. Contributed by Aaron T. Myers.

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1398608&r1=1398607&r2=1398608&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Oct 16 01:59:31 2012
@@ -57,6 +57,9 @@ Release 2.0.3-alpha - Unreleased
     HDFS-4036. Remove "throw UnresolvedLinkException" from
     FSDirectory.unprotectedAddFile(..). (Jing Zhao via szetszwo)
 
+    HDFS-2946. HA: Put a cap on the number of completed edits files retained
+    by the NN. (atm)
+
   OPTIMIZATIONS
 
   BUG FIXES 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1398608&r1=1398607&r2=1398608&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Oct 16 01:59:31 2012
@@ -162,6 +162,8 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT = 2;
   public static final String  DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY = "dfs.namenode.num.extra.edits.retained";
   public static final int     DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT = 1000000; //1M
+  public static final String  DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY = "dfs.namenode.max.extra.edits.segments.retained";
+  public static final int     DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT = 10000; // 10k
   public static final String  DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY = "dfs.namenode.min.supported.datanode.version";
   public static final String  DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "2.0.0-SNAPSHOT";
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1398608&r1=1398607&r2=1398608&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Oct 16 01:59:31 2012
@@ -1071,6 +1071,11 @@ public class FSEditLog implements LogsPu
       // All journals have failed, it is handled in logSync.
     }
   }
+  
+  public void selectInputStreams(Collection<EditLogInputStream> streams,
+      long fromTxId, boolean inProgressOk) throws IOException {
+    journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
+  }
 
   public Collection<EditLogInputStream> selectInputStreams(
       long fromTxId, long toAtLeastTxId) throws IOException {
@@ -1088,7 +1093,7 @@ public class FSEditLog implements LogsPu
       long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
       boolean inProgressOk) throws IOException {
     List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
-    journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
+    selectInputStreams(streams, fromTxId, inProgressOk);
 
     try {
       checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1398608&r1=1398607&r2=1398608&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Tue Oct 16 01:59:31 2012
@@ -232,6 +232,11 @@ class FileJournalManager implements Jour
     LOG.debug(this + ": selecting input streams starting at " + fromTxId + 
         (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
         "from among " + elfs.size() + " candidate file(s)");
+    addStreamsToCollectionFromFiles(elfs, streams, fromTxId, inProgressOk);
+  }
+  
+  static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs,
+      Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) {
     for (EditLogFile elf : elfs) {
       if (elf.lastTxId < fromTxId) {
         LOG.debug("passing over " + elf + " because it ends at " +

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1398608&r1=1398607&r2=1398608&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Tue Oct 16 01:59:31 2012
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collection;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -46,19 +45,6 @@ public interface JournalManager extends 
    */
   void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException;
 
-   /**
-   * Get a list of edit log input streams.  The list will start with the
-   * stream that contains fromTxnId, and continue until the end of the journal
-   * being managed.
-   * 
-   * @param fromTxnId the first transaction id we want to read
-   * @param inProgressOk whether or not in-progress streams should be returned
-   *
-   * @return a list of streams
-   */
-  void selectInputStreams(Collection<EditLogInputStream> streams,
-      long fromTxnId, boolean inProgressOk);
-
   /**
    * Set the amount of memory that this stream should use to buffer edits
    */

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java?rev=1398608&r1=1398607&r2=1398608&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java Tue Oct 16 01:59:31 2012
@@ -216,7 +216,7 @@ public class JournalSet implements Journ
    */
   @Override
   public void selectInputStreams(Collection<EditLogInputStream> streams,
-      long fromTxId, boolean inProgressOk) {
+      long fromTxId, boolean inProgressOk) throws IOException {
     final TreeMultiset<EditLogInputStream> allStreams =
         TreeMultiset.create(EDIT_LOG_INPUT_STREAM_COMPARATOR);
     for (JournalAndStream jas : journals) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java?rev=1398608&r1=1398607&r2=1398608&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java Tue Oct 16 01:59:31 2012
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.Collection;
 
 /**
  * Interface used to abstract over classes which manage edit logs that may need
@@ -33,5 +34,20 @@ interface LogsPurgeable {
    * @throws IOException in the event of error
    */
   public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
-
+  
+  /**
+   * Get a list of edit log input streams.  The list will start with the
+   * stream that contains fromTxnId, and continue until the end of the journal
+   * being managed.
+   * 
+   * @param fromTxId the first transaction id we want to read
+   * @param inProgressOk whether or not in-progress streams should be returned
+   *
+   * @return a list of streams
+   * @throws IOException if the underlying storage has an error or is otherwise
+   * inaccessible
+   */
+  void selectInputStreams(Collection<EditLogInputStream> streams,
+      long fromTxId, boolean inProgressOk) throws IOException;
+  
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java?rev=1398608&r1=1398607&r2=1398608&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java Tue Oct 16 01:59:31 2012
@@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.TreeSet;
 
@@ -32,6 +34,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -48,6 +51,7 @@ public class NNStorageRetentionManager {
   
   private final int numCheckpointsToRetain;
   private final long numExtraEditsToRetain;
+  private final int maxExtraEditsSegmentsToRetain;
   private static final Log LOG = LogFactory.getLog(
       NNStorageRetentionManager.class);
   private final NNStorage storage;
@@ -65,6 +69,9 @@ public class NNStorageRetentionManager {
     this.numExtraEditsToRetain = conf.getLong(
         DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY,
         DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT);
+    this.maxExtraEditsSegmentsToRetain = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY,
+        DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT);
     Preconditions.checkArgument(numCheckpointsToRetain > 0,
         "Must retain at least one checkpoint");
     Preconditions.checkArgument(numExtraEditsToRetain >= 0,
@@ -94,7 +101,39 @@ public class NNStorageRetentionManager {
     // provide a "cushion" of older txns that we keep, which is
     // handy for HA, where a remote node may not have as many
     // new images.
-    long purgeLogsFrom = Math.max(0, minImageTxId + 1 - numExtraEditsToRetain);
+    //
+    // First, determine the target number of extra transactions to retain based
+    // on the configured amount.
+    long minimumRequiredTxId = minImageTxId + 1;
+    long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
+    
+    ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
+    purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false);
+    Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
+      @Override
+      public int compare(EditLogInputStream a, EditLogInputStream b) {
+        return ComparisonChain.start()
+            .compare(a.getFirstTxId(), b.getFirstTxId())
+            .compare(a.getLastTxId(), b.getLastTxId())
+            .result();
+      }
+    });
+    
+    // Next, adjust the number of transactions to retain if doing so would mean
+    // keeping too many segments around.
+    while (editLogs.size() > maxExtraEditsSegmentsToRetain) {
+      purgeLogsFrom = editLogs.get(0).getFirstTxId();
+      editLogs.remove(0);
+    }
+    
+    // Finally, ensure that we're not trying to purge any transactions that we
+    // actually need.
+    if (purgeLogsFrom > minimumRequiredTxId) {
+      throw new AssertionError("Should not purge more edits than required to "
+          + "restore: " + purgeLogsFrom + " should be <= "
+          + minimumRequiredTxId);
+    }
+    
     purgeableLogs.purgeLogsOlderThan(purgeLogsFrom);
   }
   

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1398608&r1=1398607&r2=1398608&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Tue Oct 16 01:59:31 2012
@@ -751,6 +751,24 @@ public class SecondaryNameNode implement
           }
         }
       }
+
+      @Override
+      public void selectInputStreams(Collection<EditLogInputStream> streams,
+          long fromTxId, boolean inProgressOk) {
+        Iterator<StorageDirectory> iter = storage.dirIterator();
+        while (iter.hasNext()) {
+          StorageDirectory dir = iter.next();
+          List<EditLogFile> editFiles;
+          try {
+            editFiles = FileJournalManager.matchEditLogs(
+                dir.getCurrentDir());
+          } catch (IOException ioe) {
+            throw new RuntimeException(ioe);
+          }
+          FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams,
+              fromTxId, inProgressOk);
+        }
+      }
       
     }
     

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1398608&r1=1398607&r2=1398608&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Tue Oct 16 01:59:31 2012
@@ -655,6 +655,20 @@
   edits in order to start again.
   Typically each edit is on the order of a few hundred bytes, so the default
   of 1 million edits should be on the order of hundreds of MBs or low GBs.
+
+  NOTE: Fewer extra edits may be retained than value specified for this setting
+  if doing so would mean that more segments would be retained than the number
+  configured by dfs.namenode.max.extra.edits.segments.retained.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.max.extra.edits.segments.retained</name>
+  <value>10000</value>
+  <description>The maximum number of extra edit log segments which should be retained
+  beyond what is minimally necessary for a NN restart. When used in conjunction with
+  dfs.namenode.num.extra.edits.retained, this configuration property serves to cap
+  the number of extra edits files to a reasonable value.
   </description>
 </property>
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java?rev=1398608&r1=1398607&r2=1398608&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java Tue Oct 16 01:59:31 2012
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.ser
 import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -196,6 +197,35 @@ public class TestNNStorageRetentionManag
     runTest(tc);
   }
   
+  @Test
+  public void testRetainExtraLogsLimitedSegments() throws IOException {
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY,
+        150);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY, 2);
+    TestCaseDescription tc = new TestCaseDescription();
+    tc.addRoot("/foo1", NameNodeDirType.IMAGE);
+    tc.addRoot("/foo2", NameNodeDirType.EDITS);
+    tc.addImage("/foo1/current/" + getImageFileName(100), true);
+    tc.addImage("/foo1/current/" + getImageFileName(200), true);
+    tc.addImage("/foo1/current/" + getImageFileName(300), false);
+    tc.addImage("/foo1/current/" + getImageFileName(400), false);
+
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(1, 100), true);
+    // Without lowering the max segments to retain, we'd retain all segments
+    // going back to txid 150 (300 - 150).
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(101, 175), true);
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(176, 200), true);
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(201, 225), true);
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(226, 240), true);
+    // Only retain 2 extra segments. The 301-400 segment is considered required,
+    // not extra.
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(241, 275), false);
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(276, 300), false);
+    tc.addLog("/foo2/current/" + getFinalizedEditsFileName(301, 400), false);
+    tc.addLog("/foo2/current/" + getInProgressEditsFileName(401), false);
+    runTest(tc);
+  }
+  
   private void runTest(TestCaseDescription tc) throws IOException {
     StoragePurger mockPurger =
       Mockito.mock(NNStorageRetentionManager.StoragePurger.class);
@@ -287,8 +317,10 @@ public class TestNNStorageRetentionManag
       return mockStorageForDirs(sds.toArray(new StorageDirectory[0]));
     }
     
-    public FSEditLog mockEditLog(StoragePurger purger) {
+    @SuppressWarnings("unchecked")
+    public FSEditLog mockEditLog(StoragePurger purger) throws IOException {
       final List<JournalManager> jms = Lists.newArrayList();
+      final JournalSet journalSet = new JournalSet(0);
       for (FakeRoot root : dirRoots.values()) {
         if (!root.type.isOfType(NameNodeDirType.EDITS)) continue;
         
@@ -297,6 +329,7 @@ public class TestNNStorageRetentionManag
             root.mockStorageDir(), null);
         fjm.purger = purger;
         jms.add(fjm);
+        journalSet.add(fjm, false);
       }
 
       FSEditLog mockLog = Mockito.mock(FSEditLog.class);
@@ -314,6 +347,18 @@ public class TestNNStorageRetentionManag
           return null;
         }
       }).when(mockLog).purgeLogsOlderThan(Mockito.anyLong());
+      
+      Mockito.doAnswer(new Answer<Void>() {
+        
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          Object[] args = invocation.getArguments();
+          journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
+              (long)((Long)args[1]), (boolean)((Boolean)args[2]));
+          return null;
+        }
+      }).when(mockLog).selectInputStreams(Mockito.anyCollection(),
+          Mockito.anyLong(), Mockito.anyBoolean());
       return mockLog;
     }
   }