You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/08/30 20:14:39 UTC
[hadoop] branch branch-3.1 updated: HDFS-8178. QJM doesn't move
aside stale inprogress edits files. Contributed by Istvan Fajth.
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new e3c01e1 HDFS-8178. QJM doesn't move aside stale inprogress edits files. Contributed by Istvan Fajth.
e3c01e1 is described below
commit e3c01e174c8885c6b50d8d505c60b28ac941ffb5
Author: Istvan Fajth <pi...@cloudera.com>
AuthorDate: Fri Aug 30 13:13:01 2019 -0700
HDFS-8178. QJM doesn't move aside stale inprogress edits files. Contributed by Istvan Fajth.
Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
---
.../hadoop/hdfs/qjournal/server/JNStorage.java | 11 +-
.../hdfs/server/namenode/FileJournalManager.java | 48 +++++-
.../server/namenode/NNStorageRetentionManager.java | 13 +-
.../namenode/TestNNStorageRetentionManager.java | 174 ++++++++++++++-------
4 files changed, 178 insertions(+), 68 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
index 3789156..e886432 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
@@ -50,12 +50,7 @@ class JNStorage extends Storage {
private final StorageDirectory sd;
private StorageState state;
- private static final List<Pattern> CURRENT_DIR_PURGE_REGEXES =
- ImmutableList.of(
- Pattern.compile("edits_\\d+-(\\d+)"),
- Pattern.compile("edits_inprogress_(\\d+)(?:\\..*)?"));
-
- private static final List<Pattern> PAXOS_DIR_PURGE_REGEXES =
+ private static final List<Pattern> PAXOS_DIR_PURGE_REGEXES =
ImmutableList.of(Pattern.compile("(\\d+)"));
private static final String STORAGE_EDITS_SYNC = "edits.sync";
@@ -177,8 +172,8 @@ class JNStorage extends Storage {
* the given txid.
*/
void purgeDataOlderThan(long minTxIdToKeep) throws IOException {
- purgeMatching(sd.getCurrentDir(),
- CURRENT_DIR_PURGE_REGEXES, minTxIdToKeep);
+ fjm.purgeLogsOlderThan(minTxIdToKeep);
+
purgeMatching(getOrCreatePaxosDir(),
PAXOS_DIR_PURGE_REGEXES, minTxIdToKeep);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
index c71c09a..83a2d68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
@@ -74,7 +74,8 @@ public class FileJournalManager implements JournalManager {
private static final Pattern EDITS_INPROGRESS_STALE_REGEX = Pattern.compile(
NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+).*(\\S+)");
- private File currentInProgress = null;
+ @VisibleForTesting
+ File currentInProgress = null;
/**
* A FileJournalManager should maintain the largest Tx ID that has been
@@ -177,20 +178,50 @@ public class FileJournalManager implements JournalManager {
this.lastReadableTxId = id;
}
+ /**
+ * Purges the unnecessary edits and edits_inprogress files.
+ *
+ * Edits files that are ending before the minTxIdToKeep are purged.
+ * Edits in progress files that are starting before minTxIdToKeep are purged.
+ * Edits in progress files that are marked as empty, trash, corrupted or
+ * stale by file extension and starting before minTxIdToKeep are purged.
+ * Edits in progress files that are after minTxIdToKeep, but before the
+ * current edits in progress files are marked as stale for clarity.
+ *
+ * In case file removal or rename is failing a warning is logged, but that
+ * does not fail the operation.
+ *
+ * @param minTxIdToKeep the lowest transaction ID that should be retained
+ * @throws IOException if listing the storage directory fails.
+ */
@Override
public void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException {
LOG.info("Purging logs older than " + minTxIdToKeep);
File[] files = FileUtil.listFiles(sd.getCurrentDir());
List<EditLogFile> editLogs = matchEditLogs(files, true);
- for (EditLogFile log : editLogs) {
- if (log.getFirstTxId() < minTxIdToKeep &&
- log.getLastTxId() < minTxIdToKeep) {
- purger.purgeLog(log);
+ synchronized (this) {
+ for (EditLogFile log : editLogs) {
+ if (log.getFirstTxId() < minTxIdToKeep &&
+ log.getLastTxId() < minTxIdToKeep) {
+ purger.purgeLog(log);
+ } else if (isStaleInProgressLog(minTxIdToKeep, log)) {
+ purger.markStale(log);
+ }
}
}
}
+ private boolean isStaleInProgressLog(long minTxIdToKeep, EditLogFile log) {
+ return log.isInProgress() &&
+ !log.getFile().equals(currentInProgress) &&
+ log.getFirstTxId() >= minTxIdToKeep &&
+ // at last we check if this segment is not already marked as .trash,
+ // .empty or .corrupted, in which case it does not match the strict
+ // regex pattern.
+ EDITS_INPROGRESS_REGEX.matcher(log.getFile().getName()).matches();
+ }
+
/**
* Find all editlog segments starting at or above the given txid.
* @param firstTxId the txnid which to start looking
@@ -595,7 +626,12 @@ public class FileJournalManager implements JournalManager {
assert lastTxId == HdfsServerConstants.INVALID_TXID;
renameSelf(".empty");
}
-
+
+ public void moveAsideStaleInprogressFile() throws IOException {
+ assert isInProgress;
+ renameSelf(".stale");
+ }
+
private void renameSelf(String newSuffix) throws IOException {
File src = file;
File dst = new File(src.getParent(), src.getName() + newSuffix);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
index 2a83541..be8cbfa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
@@ -208,9 +208,10 @@ public class NNStorageRetentionManager {
/**
* Interface responsible for disposing of old checkpoints and edit logs.
*/
- static interface StoragePurger {
+ interface StoragePurger {
void purgeLog(EditLogFile log);
void purgeImage(FSImageFile image);
+ void markStale(EditLogFile log);
}
static class DeletionStoragePurger implements StoragePurger {
@@ -234,6 +235,16 @@ public class NNStorageRetentionManager {
LOG.warn("Could not delete " + file);
}
}
+
+ public void markStale(EditLogFile log){
+ try {
+ log.moveAsideStaleInprogressFile();
+ } catch (IOException e) {
+ // It is ok to just log the rename failure and go on, we will try next
+ // time just as with deletions.
+ LOG.warn("Could not mark " + log + " as stale", e);
+ }
+ }
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java
index b94e5a3..323c3f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java
@@ -24,12 +24,17 @@ import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEdit
import java.io.File;
import java.io.IOException;
import java.util.Collection;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.ToLongFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
@@ -41,8 +46,6 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
@@ -214,6 +217,8 @@ public class TestNNStorageRetentionManager {
// Segments containing txns upto txId 250 are extra and should be purged.
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(1, 100), true);
+ tc.addLog("/foo2/current/" + getInProgressEditsFileName(101) + ".trash",
+ true);
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(101, 175), true);
tc.addLog("/foo2/current/" + getInProgressEditsFileName(176) + ".empty",
true);
@@ -225,6 +230,8 @@ public class TestNNStorageRetentionManager {
// Only retain 2 extra segments. The 301-350 and 351-400 segments are
// considered required, not extra.
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(241, 275), false);
+ tc.addLog("/foo2/current/" + getInProgressEditsFileName(276) + ".trash",
+ false);
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(276, 300), false);
tc.addLog("/foo2/current/" + getInProgressEditsFileName(301) + ".empty",
false);
@@ -235,14 +242,53 @@ public class TestNNStorageRetentionManager {
tc.addLog("/foo2/current/" + getInProgressEditsFileName(401), false);
runTest(tc);
}
+
+ /* We are checking here the JournalNode environment hence added the paxos
+ * directory, but as the test here is about the FileJournalManager it happens
+ * via the NNStorageRetentionManager and that needs the fsImage files as well
+ * to be present in the folder to calculate the minimum transaction id we want
+ * to keep based on the config.
+ */
+ @Test
+ public void testExtraInprogressFilesAreRemovedOrMarkedStale()
+ throws IOException {
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, 150);
+ TestCaseDescription tc = new TestCaseDescription();
+ tc.addRoot("/foo", NameNodeDirType.IMAGE_AND_EDITS);
+ final String PATH = "/foo/current/";
+
+ tc.addImage(PATH + getImageFileName(200), true);
+ tc.addImage(PATH + getImageFileName(300), false);
+ tc.addImage(PATH + getImageFileName(400), false);
+
+ File file = Mockito.spy(new File(PATH + "paxos"));
+ Mockito.when(file.isDirectory()).thenReturn(true);
+ tc.addFile(file);
+
+ tc.addLog(PATH + getFinalizedEditsFileName(1,75), true);
+ tc.addLog(PATH + getInProgressEditsFileName(76), true);
+ tc.addLog(PATH + getFinalizedEditsFileName(76, 120), true);
+ tc.addLog(PATH + getInProgressEditsFileName(121) + ".stale", true);
+ tc.addLog(PATH + getFinalizedEditsFileName(121, 150), true);
+ // everything down from here should be kept.
+ tc.addLog(PATH + getInProgressEditsFileName(151), false, true);
+ tc.addLog(PATH + getFinalizedEditsFileName(151, 320), false);
+ tc.addLog(PATH + getInProgressEditsFileName(321), false, true);
+ tc.addLog(PATH + getFinalizedEditsFileName(321, 430), false);
+ tc.addLog(PATH + getInProgressEditsFileName(431), false);
+
+ runTest(tc);
+ }
private void runTest(TestCaseDescription tc) throws IOException {
StoragePurger mockPurger =
Mockito.mock(NNStorageRetentionManager.StoragePurger.class);
ArgumentCaptor<FSImageFile> imagesPurgedCaptor =
- ArgumentCaptor.forClass(FSImageFile.class);
+ ArgumentCaptor.forClass(FSImageFile.class);
ArgumentCaptor<EditLogFile> logsPurgedCaptor =
- ArgumentCaptor.forClass(EditLogFile.class);
+ ArgumentCaptor.forClass(EditLogFile.class);
+ ArgumentCaptor<EditLogFile> staleLogsCaptor =
+ ArgumentCaptor.forClass(EditLogFile.class);
// Ask the manager to purge files we don't need any more
new NNStorageRetentionManager(conf,
@@ -254,31 +300,43 @@ public class TestNNStorageRetentionManager {
.purgeImage(imagesPurgedCaptor.capture());
Mockito.verify(mockPurger, Mockito.atLeast(0))
.purgeLog(logsPurgedCaptor.capture());
+ Mockito.verify(mockPurger, Mockito.atLeast(0))
+ .markStale(staleLogsCaptor.capture());
+ Set<String> capturedPaths = Sets.newLinkedHashSet();
// Check images
- Set<String> purgedPaths = Sets.newLinkedHashSet();
- for (FSImageFile purged : imagesPurgedCaptor.getAllValues()) {
- purgedPaths.add(fileToPath(purged.getFile()));
- }
- Assert.assertEquals(
+ for (FSImageFile captured : imagesPurgedCaptor.getAllValues()) {
+ capturedPaths.add(fileToPath(captured.getFile()));
+ }
+ Assert.assertEquals("Image file check.",
Joiner.on(",").join(filesToPaths(tc.expectedPurgedImages)),
- Joiner.on(",").join(purgedPaths));
+ Joiner.on(",").join(capturedPaths));
- // Check images
- purgedPaths.clear();
- for (EditLogFile purged : logsPurgedCaptor.getAllValues()) {
- purgedPaths.add(fileToPath(purged.getFile()));
- }
- Assert.assertEquals(
+ capturedPaths.clear();
+ // Check edit logs, and also in progress edits older than minTxIdToKeep
+ for (EditLogFile captured : logsPurgedCaptor.getAllValues()) {
+ capturedPaths.add(fileToPath(captured.getFile()));
+ }
+ Assert.assertEquals("Check old edits are removed.",
Joiner.on(",").join(filesToPaths(tc.expectedPurgedLogs)),
- Joiner.on(",").join(purgedPaths));
+ Joiner.on(",").join(capturedPaths));
+
+ capturedPaths.clear();
+ // Check in progress edits to keep are marked as stale
+ for (EditLogFile captured : staleLogsCaptor.getAllValues()) {
+ capturedPaths.add(fileToPath(captured.getFile()));
+ }
+ Assert.assertEquals("Check unnecessary but kept edits are marked stale",
+ Joiner.on(",").join(filesToPaths(tc.expectedStaleLogs)),
+ Joiner.on(",").join(capturedPaths));
}
-
+
private class TestCaseDescription {
private final Map<File, FakeRoot> dirRoots = Maps.newLinkedHashMap();
private final Set<File> expectedPurgedLogs = Sets.newLinkedHashSet();
private final Set<File> expectedPurgedImages = Sets.newLinkedHashSet();
-
+ private final Set<File> expectedStaleLogs = Sets.newLinkedHashSet();
+
private class FakeRoot {
final NameNodeDirType type;
final List<File> files;
@@ -306,13 +364,20 @@ public class TestNNStorageRetentionManager {
}
}
}
-
+
void addLog(String path, boolean expectPurge) {
+ addLog(path, expectPurge, false);
+ }
+
+ void addLog(String path, boolean expectPurge, boolean expectStale) {
File file = new File(path);
addFile(file);
if (expectPurge) {
expectedPurgedLogs.add(file);
}
+ if (expectStale) {
+ expectedStaleLogs.add(file);
+ }
}
void addImage(String path, boolean expectPurge) {
@@ -330,7 +395,22 @@ public class TestNNStorageRetentionManager {
}
return mockStorageForDirs(sds.toArray(new StorageDirectory[0]));
}
-
+
+ private File findLastInProgressEdit(FakeRoot root){
+ Pattern p = Pattern.compile(
+ NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
+ ToLongFunction<File> fileNameToTxId =
+ f -> {
+ Matcher m = p.matcher(f.getName());
+ return m.matches() ?
+ Long.parseLong(m.group(1)):
+ HdfsServerConstants.INVALID_TXID;
+ };
+ return root.files.stream().
+ sorted(Comparator.comparingLong(fileNameToTxId).reversed()).
+ findFirst().orElse(null);
+ }
+
@SuppressWarnings("unchecked")
public FSEditLog mockEditLog(StoragePurger purger) throws IOException {
final List<JournalManager> jms = Lists.newArrayList();
@@ -341,36 +421,28 @@ public class TestNNStorageRetentionManager {
// passing null NNStorage for unit test because it does not use it
FileJournalManager fjm = new FileJournalManager(conf,
root.mockStorageDir(), null);
+ fjm.currentInProgress = findLastInProgressEdit(root);
fjm.purger = purger;
jms.add(fjm);
journalSet.add(fjm, false);
}
FSEditLog mockLog = Mockito.mock(FSEditLog.class);
- Mockito.doAnswer(new Answer<Void>() {
-
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- Object[] args = invocation.getArguments();
- assert args.length == 1;
- long txId = (Long) args[0];
-
- for (JournalManager jm : jms) {
- jm.purgeLogsOlderThan(txId);
- }
- return null;
+ Mockito.doAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ assert args.length == 1;
+ long txId = (Long) args[0];
+ for (JournalManager jm : jms) {
+ jm.purgeLogsOlderThan(txId);
}
+ return null;
}).when(mockLog).purgeLogsOlderThan(Mockito.anyLong());
- Mockito.doAnswer(new Answer<Void>() {
-
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- Object[] args = invocation.getArguments();
- journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
- (Long)args[1], (Boolean)args[2], (Boolean)args[3]);
- return null;
- }
+ Mockito.doAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
+ (Long)args[1], (Boolean)args[2], (Boolean)args[3]);
+ return null;
}).when(mockLog).selectInputStreams(Mockito.anyCollection(),
Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
return mockLog;
@@ -401,21 +473,17 @@ public class TestNNStorageRetentionManager {
return paths;
}
- private static NNStorage mockStorageForDirs(final StorageDirectory ... mockDirs)
+ private static NNStorage mockStorageForDirs(final StorageDirectory... mockDirs)
throws IOException {
NNStorage mockStorage = Mockito.mock(NNStorage.class);
- Mockito.doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- FSImageStorageInspector inspector =
+ Mockito.doAnswer(invocation -> {
+ FSImageStorageInspector inspector =
(FSImageStorageInspector) invocation.getArguments()[0];
- for (StorageDirectory sd : mockDirs) {
- inspector.inspectDirectory(sd);
- }
- return null;
+ for (StorageDirectory sd : mockDirs) {
+ inspector.inspectDirectory(sd);
}
- }).when(mockStorage).inspectStorageDirs(
- Mockito.<FSImageStorageInspector>anyObject());
+ return null;
+ }).when(mockStorage).inspectStorageDirs(Mockito.any());
return mockStorage;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org