You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/08/29 23:55:38 UTC

[hadoop] branch branch-3.2 updated: HDFS-8178. QJM doesn't move aside stale inprogress edits files. Contributed by Istvan Fajth.

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 7c6fc96  HDFS-8178. QJM doesn't move aside stale inprogress edits files. Contributed by Istvan Fajth.
7c6fc96 is described below

commit 7c6fc964fd30299ee9e818b88a829efe59bab876
Author: Istvan Fajth <pi...@cloudera.com>
AuthorDate: Thu Aug 29 14:55:17 2019 -0700

    HDFS-8178. QJM doesn't move aside stale inprogress edits files. Contributed by Istvan Fajth.
    
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
    (cherry picked from commit 03049290fe8ef1459a3c8bd05cf198320e6e4973)
    
    Conflicts:
    	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java
---
 .../hadoop/hdfs/qjournal/server/JNStorage.java     |  11 +-
 .../hdfs/server/namenode/FileJournalManager.java   |  48 +++++-
 .../server/namenode/NNStorageRetentionManager.java |  19 ++-
 .../namenode/TestNNStorageRetentionManager.java    | 169 +++++++++++++++------
 4 files changed, 180 insertions(+), 67 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
index 0ef54a8..b7d6523 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
@@ -52,12 +52,7 @@ class JNStorage extends Storage {
   private final StorageDirectory sd;
   private StorageState state;
 
-  private static final List<Pattern> CURRENT_DIR_PURGE_REGEXES =
-      ImmutableList.of(
-        Pattern.compile("edits_\\d+-(\\d+)"),
-        Pattern.compile("edits_inprogress_(\\d+)(?:\\..*)?"));
-  
-  private static final List<Pattern> PAXOS_DIR_PURGE_REGEXES = 
+  private static final List<Pattern> PAXOS_DIR_PURGE_REGEXES =
       ImmutableList.of(Pattern.compile("(\\d+)"));
 
   private static final String STORAGE_EDITS_SYNC = "edits.sync";
@@ -181,8 +176,8 @@ class JNStorage extends Storage {
    * the given txid.
    */
   void purgeDataOlderThan(long minTxIdToKeep) throws IOException {
-    purgeMatching(sd.getCurrentDir(),
-        CURRENT_DIR_PURGE_REGEXES, minTxIdToKeep);
+    fjm.purgeLogsOlderThan(minTxIdToKeep);
+
     purgeMatching(getOrCreatePaxosDir(),
         PAXOS_DIR_PURGE_REGEXES, minTxIdToKeep);
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
index d08a644..78394ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
@@ -75,7 +75,8 @@ public class FileJournalManager implements JournalManager {
   private static final Pattern EDITS_INPROGRESS_STALE_REGEX = Pattern.compile(
       NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+).*(\\S+)");
 
-  private File currentInProgress = null;
+  @VisibleForTesting
+  File currentInProgress = null;
 
   /**
    * A FileJournalManager should maintain the largest Tx ID that has been
@@ -178,20 +179,50 @@ public class FileJournalManager implements JournalManager {
     this.lastReadableTxId = id;
   }
 
+  /**
+   * Purges the unnecessary edits and edits_inprogress files.
+   *
+   * Edits files that are ending before the minTxIdToKeep are purged.
+   * Edits in progress files that are starting before minTxIdToKeep are purged.
+   * Edits in progress files that are marked as empty, trash, corrupted or
+   * stale by file extension and starting before minTxIdToKeep are purged.
+   * Edits in progress files that are after minTxIdToKeep, but before the
+   * current edits in progress files are marked as stale for clarity.
+   *
+   * In case file removal or rename is failing a warning is logged, but that
+   * does not fail the operation.
+   *
+   * @param minTxIdToKeep the lowest transaction ID that should be retained
+   * @throws IOException if listing the storage directory fails.
+   */
   @Override
   public void purgeLogsOlderThan(long minTxIdToKeep)
       throws IOException {
     LOG.info("Purging logs older than " + minTxIdToKeep);
     File[] files = FileUtil.listFiles(sd.getCurrentDir());
     List<EditLogFile> editLogs = matchEditLogs(files, true);
-    for (EditLogFile log : editLogs) {
-      if (log.getFirstTxId() < minTxIdToKeep &&
-          log.getLastTxId() < minTxIdToKeep) {
-        purger.purgeLog(log);
+    synchronized (this) {
+      for (EditLogFile log : editLogs) {
+        if (log.getFirstTxId() < minTxIdToKeep &&
+            log.getLastTxId() < minTxIdToKeep) {
+          purger.purgeLog(log);
+        } else if (isStaleInProgressLog(minTxIdToKeep, log)) {
+          purger.markStale(log);
+        }
       }
     }
   }
 
+  private boolean isStaleInProgressLog(long minTxIdToKeep, EditLogFile log) {
+    return log.isInProgress() &&
+        !log.getFile().equals(currentInProgress) &&
+        log.getFirstTxId() >= minTxIdToKeep &&
+        // at last we check if this segment is not already marked as .trash,
+        // .empty or .corrupted, in which case it does not match the strict
+        // regex pattern.
+        EDITS_INPROGRESS_REGEX.matcher(log.getFile().getName()).matches();
+  }
+
   /**
    * Find all editlog segments starting at or above the given txid.
    * @param firstTxId the txnid which to start looking
@@ -596,7 +627,12 @@ public class FileJournalManager implements JournalManager {
       assert lastTxId == HdfsServerConstants.INVALID_TXID;
       renameSelf(".empty");
     }
-      
+
+    public void moveAsideStaleInprogressFile() throws IOException {
+      assert isInProgress;
+      renameSelf(".stale");
+    }
+
     private void renameSelf(String newSuffix) throws IOException {
       File src = file;
       File dst = new File(src.getParent(), src.getName() + newSuffix);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
index fc54dfc..473080f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
@@ -208,21 +208,22 @@ public class NNStorageRetentionManager {
   /**
    * Interface responsible for disposing of old checkpoints and edit logs.
    */
-  static interface StoragePurger {
+  interface StoragePurger {
     void purgeLog(EditLogFile log);
     void purgeImage(FSImageFile image);
+    void markStale(EditLogFile log);
   }
   
   static class DeletionStoragePurger implements StoragePurger {
     @Override
     public void purgeLog(EditLogFile log) {
-      LOG.info("Purging old edit log " + log);
+      LOG.info("Purging old edit log {}", log);
       deleteOrWarn(log.getFile());
     }
 
     @Override
     public void purgeImage(FSImageFile image) {
-      LOG.info("Purging old image " + image);
+      LOG.info("Purging old image {}", image);
       deleteOrWarn(image.getFile());
       deleteOrWarn(MD5FileUtils.getDigestFileForFile(image.getFile()));
     }
@@ -231,9 +232,19 @@ public class NNStorageRetentionManager {
       if (!file.delete()) {
         // It's OK if we fail to delete something -- we'll catch it
         // next time we swing through this directory.
-        LOG.warn("Could not delete " + file);
+        LOG.warn("Could not delete {}", file);
       }      
     }
+
+    public void markStale(EditLogFile log){
+      try {
+        log.moveAsideStaleInprogressFile();
+      } catch (IOException e) {
+        // It is ok to just log the rename failure and go on, we will try next
+        // time just as with deletions.
+        LOG.warn("Could not mark {} as stale", log, e);
+      }
+    }
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java
index b94e5a3..360f626 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java
@@ -24,12 +24,17 @@ import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEdit
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.ToLongFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
@@ -214,6 +219,8 @@ public class TestNNStorageRetentionManager {
 
     // Segments containing txns upto txId 250 are extra and should be purged.
     tc.addLog("/foo2/current/" + getFinalizedEditsFileName(1, 100), true);
+    tc.addLog("/foo2/current/" + getInProgressEditsFileName(101) + ".trash",
+        true);
     tc.addLog("/foo2/current/" + getFinalizedEditsFileName(101, 175), true);
     tc.addLog("/foo2/current/" + getInProgressEditsFileName(176) + ".empty",
         true);
@@ -225,6 +232,8 @@ public class TestNNStorageRetentionManager {
     // Only retain 2 extra segments. The 301-350 and 351-400 segments are
     // considered required, not extra.
     tc.addLog("/foo2/current/" + getFinalizedEditsFileName(241, 275), false);
+    tc.addLog("/foo2/current/" + getInProgressEditsFileName(276) + ".trash",
+        false);
     tc.addLog("/foo2/current/" + getFinalizedEditsFileName(276, 300), false);
     tc.addLog("/foo2/current/" + getInProgressEditsFileName(301) + ".empty",
         false);
@@ -235,14 +244,53 @@ public class TestNNStorageRetentionManager {
     tc.addLog("/foo2/current/" + getInProgressEditsFileName(401), false);
     runTest(tc);
   }
+
+  /* We are checking here the JournalNode environment hence added the paxos
+   * directory, but as the test here is about the FileJournalManager it happens
+   * via the NNStorageRetentionManager and that needs the fsImage files as well
+   * to be present in the folder to calculate the minimum transaction id we want
+   * to keep based on the config.
+   */
+  @Test
+  public void testExtraInprogressFilesAreRemovedOrMarkedStale()
+      throws IOException {
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, 150);
+    TestCaseDescription tc = new TestCaseDescription();
+    tc.addRoot("/foo", NameNodeDirType.IMAGE_AND_EDITS);
+    final String PATH = "/foo/current/";
+
+    tc.addImage(PATH + getImageFileName(200), true);
+    tc.addImage(PATH + getImageFileName(300), false);
+    tc.addImage(PATH + getImageFileName(400), false);
+
+    File file = Mockito.spy(new File(PATH + "paxos"));
+    Mockito.when(file.isDirectory()).thenReturn(true);
+    tc.addFile(file);
+
+    tc.addLog(PATH + getFinalizedEditsFileName(1,75), true);
+    tc.addLog(PATH + getInProgressEditsFileName(76), true);
+    tc.addLog(PATH + getFinalizedEditsFileName(76, 120), true);
+    tc.addLog(PATH + getInProgressEditsFileName(121) + ".stale", true);
+    tc.addLog(PATH + getFinalizedEditsFileName(121, 150), true);
+    // everything down from here should be kept.
+    tc.addLog(PATH + getInProgressEditsFileName(151), false, true);
+    tc.addLog(PATH + getFinalizedEditsFileName(151, 320), false);
+    tc.addLog(PATH + getInProgressEditsFileName(321), false, true);
+    tc.addLog(PATH + getFinalizedEditsFileName(321, 430), false);
+    tc.addLog(PATH + getInProgressEditsFileName(431), false);
+
+    runTest(tc);
+  }
   
   private void runTest(TestCaseDescription tc) throws IOException {
     StoragePurger mockPurger =
       Mockito.mock(NNStorageRetentionManager.StoragePurger.class);
     ArgumentCaptor<FSImageFile> imagesPurgedCaptor =
-      ArgumentCaptor.forClass(FSImageFile.class);    
+      ArgumentCaptor.forClass(FSImageFile.class);
     ArgumentCaptor<EditLogFile> logsPurgedCaptor =
-      ArgumentCaptor.forClass(EditLogFile.class);    
+      ArgumentCaptor.forClass(EditLogFile.class);
+    ArgumentCaptor<EditLogFile> staleLogsCaptor =
+      ArgumentCaptor.forClass(EditLogFile.class);
 
     // Ask the manager to purge files we don't need any more
     new NNStorageRetentionManager(conf,
@@ -254,31 +302,43 @@ public class TestNNStorageRetentionManager {
       .purgeImage(imagesPurgedCaptor.capture());
     Mockito.verify(mockPurger, Mockito.atLeast(0))
       .purgeLog(logsPurgedCaptor.capture());
+    Mockito.verify(mockPurger, Mockito.atLeast(0))
+      .markStale(staleLogsCaptor.capture());
 
+    Set<String> capturedPaths = Sets.newLinkedHashSet();
     // Check images
-    Set<String> purgedPaths = Sets.newLinkedHashSet();
-    for (FSImageFile purged : imagesPurgedCaptor.getAllValues()) {
-      purgedPaths.add(fileToPath(purged.getFile()));
-    }    
-    Assert.assertEquals(
+    for (FSImageFile captured : imagesPurgedCaptor.getAllValues()) {
+      capturedPaths.add(fileToPath(captured.getFile()));
+    }
+    Assert.assertEquals("Image file check.",
       Joiner.on(",").join(filesToPaths(tc.expectedPurgedImages)),
-      Joiner.on(",").join(purgedPaths));
+      Joiner.on(",").join(capturedPaths));
 
-    // Check images
-    purgedPaths.clear();
-    for (EditLogFile purged : logsPurgedCaptor.getAllValues()) {
-      purgedPaths.add(fileToPath(purged.getFile()));
-    }    
-    Assert.assertEquals(
+    capturedPaths.clear();
+    // Check edit logs, and also in progress edits older than minTxIdToKeep
+    for (EditLogFile captured : logsPurgedCaptor.getAllValues()) {
+      capturedPaths.add(fileToPath(captured.getFile()));
+    }
+    Assert.assertEquals("Check old edits are removed.",
       Joiner.on(",").join(filesToPaths(tc.expectedPurgedLogs)),
-      Joiner.on(",").join(purgedPaths));
+      Joiner.on(",").join(capturedPaths));
+
+    capturedPaths.clear();
+    // Check in progress edits to keep are marked as stale
+    for (EditLogFile captured : staleLogsCaptor.getAllValues()) {
+      capturedPaths.add(fileToPath(captured.getFile()));
+    }
+    Assert.assertEquals("Check unnecessary but kept edits are marked stale",
+      Joiner.on(",").join(filesToPaths(tc.expectedStaleLogs)),
+      Joiner.on(",").join(capturedPaths));
   }
-  
+
   private class TestCaseDescription {
     private final Map<File, FakeRoot> dirRoots = Maps.newLinkedHashMap();
     private final Set<File> expectedPurgedLogs = Sets.newLinkedHashSet();
     private final Set<File> expectedPurgedImages = Sets.newLinkedHashSet();
-    
+    private final Set<File> expectedStaleLogs = Sets.newLinkedHashSet();
+
     private class FakeRoot {
       final NameNodeDirType type;
       final List<File> files;
@@ -306,13 +366,20 @@ public class TestNNStorageRetentionManager {
         }
       }
     }
-    
+
     void addLog(String path, boolean expectPurge) {
+      addLog(path, expectPurge, false);
+    }
+
+    void addLog(String path, boolean expectPurge, boolean expectStale) {
       File file = new File(path);
       addFile(file);
       if (expectPurge) {
         expectedPurgedLogs.add(file);
       }
+      if (expectStale) {
+        expectedStaleLogs.add(file);
+      }
     }
     
     void addImage(String path, boolean expectPurge) {
@@ -330,7 +397,22 @@ public class TestNNStorageRetentionManager {
       }
       return mockStorageForDirs(sds.toArray(new StorageDirectory[0]));
     }
-    
+
+    private File findLastInProgressEdit(FakeRoot root){
+      Pattern p = Pattern.compile(
+          NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
+      ToLongFunction<File> fileNameToTxId =
+          f -> {
+            Matcher m = p.matcher(f.getName());
+            return m.matches() ?
+                Long.parseLong(m.group(1)):
+                HdfsServerConstants.INVALID_TXID;
+          };
+      return root.files.stream().
+          sorted(Comparator.comparingLong(fileNameToTxId).reversed()).
+          findFirst().orElse(null);
+    }
+
     @SuppressWarnings("unchecked")
     public FSEditLog mockEditLog(StoragePurger purger) throws IOException {
       final List<JournalManager> jms = Lists.newArrayList();
@@ -341,36 +423,28 @@ public class TestNNStorageRetentionManager {
         // passing null NNStorage for unit test because it does not use it
         FileJournalManager fjm = new FileJournalManager(conf,
             root.mockStorageDir(), null);
+        fjm.currentInProgress = findLastInProgressEdit(root);
         fjm.purger = purger;
         jms.add(fjm);
         journalSet.add(fjm, false);
       }
 
       FSEditLog mockLog = Mockito.mock(FSEditLog.class);
-      Mockito.doAnswer(new Answer<Void>() {
-
-        @Override
-        public Void answer(InvocationOnMock invocation) throws Throwable {
-          Object[] args = invocation.getArguments();
-          assert args.length == 1;
-          long txId = (Long) args[0];
-          
-          for (JournalManager jm : jms) {
-            jm.purgeLogsOlderThan(txId);
-          }
-          return null;
+      Mockito.doAnswer(invocation -> {
+        Object[] args = invocation.getArguments();
+        assert args.length == 1;
+        long txId = (Long) args[0];
+        for (JournalManager jm : jms) {
+          jm.purgeLogsOlderThan(txId);
         }
+        return null;
       }).when(mockLog).purgeLogsOlderThan(Mockito.anyLong());
       
-      Mockito.doAnswer(new Answer<Void>() {
-        
-        @Override
-        public Void answer(InvocationOnMock invocation) throws Throwable {
-          Object[] args = invocation.getArguments();
-          journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
-              (Long)args[1], (Boolean)args[2], (Boolean)args[3]);
-          return null;
-        }
+      Mockito.doAnswer(invocation -> {
+        Object[] args = invocation.getArguments();
+        journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
+            (Long)args[1], (Boolean)args[2], (Boolean)args[3]);
+        return null;
       }).when(mockLog).selectInputStreams(Mockito.anyCollection(),
           Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
       return mockLog;
@@ -401,19 +475,16 @@ public class TestNNStorageRetentionManager {
     return paths;
   }
 
-  private static NNStorage mockStorageForDirs(final StorageDirectory ... mockDirs)
+  private static NNStorage mockStorageForDirs(final StorageDirectory... mockDirs)
       throws IOException {
     NNStorage mockStorage = Mockito.mock(NNStorage.class);
-    Mockito.doAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock invocation) throws Throwable {
-        FSImageStorageInspector inspector =
+    Mockito.doAnswer(invocation -> {
+      FSImageStorageInspector inspector =
           (FSImageStorageInspector) invocation.getArguments()[0];
-        for (StorageDirectory sd : mockDirs) {
-          inspector.inspectDirectory(sd);
-        }
-        return null;
+      for (StorageDirectory sd : mockDirs) {
+        inspector.inspectDirectory(sd);
       }
+      return null;
     }).when(mockStorage).inspectStorageDirs(
         Mockito.<FSImageStorageInspector>anyObject());
     return mockStorage;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org