You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/06/28 23:06:27 UTC
[hadoop] 24/50: HDFS-13791. Limit logging frequency of edit tail
related statements. Contributed by Erik Krogen.
This is an automated email from the ASF dual-hosted git repository.
cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 3998bab3da0f28bc3a76a032bd2ca7db2d7291ba
Author: Chen Liang <cl...@apache.org>
AuthorDate: Thu Sep 27 10:12:37 2018 -0700
HDFS-13791. Limit logging frequency of edit tail related statements. Contributed by Erik Krogen.
---
.../org/apache/hadoop/log/LogThrottlingHelper.java | 34 +++++++++++++++
.../hdfs/qjournal/client/QuorumJournalManager.java | 15 ++++++-
.../hdfs/server/namenode/FSEditLogLoader.java | 47 +++++++++++++++++----
.../hadoop/hdfs/server/namenode/FSImage.java | 19 ++++++++-
.../namenode/RedundantEditLogInputStream.java | 15 ++++++-
.../hdfs/server/namenode/TestFSEditLogLoader.java | 48 ++++++++++++++++++++++
6 files changed, 163 insertions(+), 15 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java
index aa4e61c..591c3fb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java
@@ -273,6 +273,40 @@ public class LogThrottlingHelper {
}
/**
+ * Return the summary information for given index.
+ *
+ * @param recorderName The name of the recorder.
+ * @param idx The index value.
+ * @return The summary information.
+ */
+ public SummaryStatistics getCurrentStats(String recorderName, int idx) {
+ LoggingAction currentLog = currentLogs.get(recorderName);
+ if (currentLog != null) {
+ return currentLog.getStats(idx);
+ }
+
+ return null;
+ }
+
+ /**
+ * Helper function to create a message about how many log statements were
+ * suppressed in the provided log action. If no statements were suppressed,
+ * this returns an empty string. The message has the format (without quotes):
+ *
+ * <p/>' (suppressed logging <i>{suppression_count}</i> times)'
+ *
+ * @param action The log action to produce a message about.
+ * @return A message about suppression within this action.
+ */
+ public static String getLogSupressionMessage(LogAction action) {
+ if (action.getCount() > 1) {
+ return " (suppressed logging " + (action.getCount() - 1) + " times)";
+ } else {
+ return "";
+ }
+ }
+
+ /**
* A standard log action which keeps track of all of the values which have
* been logged. This is also used for internal bookkeeping via its private
* fields and methods; it will maintain whether or not it is ready to be
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
index a948afb..703443e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
@@ -54,6 +54,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.apache.hadoop.log.LogThrottlingHelper;
+import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@@ -105,6 +107,11 @@ public class QuorumJournalManager implements JournalManager {
private int outputBufferCapacity = 512 * 1024;
private final URLConnectionFactory connectionFactory;
+ /** Limit logging about input stream selection to every 5 seconds max. */
+ private static final long SELECT_INPUT_STREAM_LOG_INTERVAL_MS = 5000;
+ private final LogThrottlingHelper selectInputStreamLogHelper =
+ new LogThrottlingHelper(SELECT_INPUT_STREAM_LOG_INTERVAL_MS);
+
@VisibleForTesting
public QuorumJournalManager(Configuration conf,
URI uri,
@@ -568,8 +575,12 @@ public class QuorumJournalManager implements JournalManager {
"ID " + fromTxnId);
return;
}
- LOG.info("Selected loggers with >= " + maxAllowedTxns +
- " transactions starting from " + fromTxnId);
+ LogAction logAction = selectInputStreamLogHelper.record(fromTxnId);
+ if (logAction.shouldLog()) {
+ LOG.info("Selected loggers with >= " + maxAllowedTxns + " transactions " +
+ "starting from lowest txn ID " + logAction.getStats(0).getMin() +
+ LogThrottlingHelper.getLogSupressionMessage(logAction));
+ }
PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<>(
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
for (GetJournaledEditsResponseProto resp : responseMap.values()) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index f3b6b84..6755487 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
-import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.FilterInputStream;
import java.io.IOException;
@@ -113,11 +112,16 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.util.Holder;
+import org.apache.hadoop.log.LogThrottlingHelper;
import org.apache.hadoop.util.ChunkedArrayList;
+import org.apache.hadoop.util.Timer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import static org.apache.hadoop.log.LogThrottlingHelper.LogAction;
+
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class FSEditLogLoader {
@@ -125,16 +129,29 @@ public class FSEditLogLoader {
LoggerFactory.getLogger(FSEditLogLoader.class.getName());
static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
+ /** Limit logging about edit loading to every 5 seconds max. */
+ @VisibleForTesting
+ static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
+ private final LogThrottlingHelper loadEditsLogHelper =
+ new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
+
private final FSNamesystem fsNamesys;
private final BlockManager blockManager;
+ private final Timer timer;
private long lastAppliedTxId;
/** Total number of end transactions loaded. */
private int totalEdits = 0;
public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
+ this(fsNamesys, lastAppliedTxId, new Timer());
+ }
+
+ @VisibleForTesting
+ FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId, Timer timer) {
this.fsNamesys = fsNamesys;
this.blockManager = fsNamesys.getBlockManager();
this.lastAppliedTxId = lastAppliedTxId;
+ this.timer = timer;
}
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
@@ -155,14 +172,26 @@ public class FSEditLogLoader {
prog.beginStep(Phase.LOADING_EDITS, step);
fsNamesys.writeLock();
try {
- long startTime = monotonicNow();
- FSImage.LOG.info("Start loading edits file " + edits.getName()
- + " maxTxnsToRead = " + maxTxnsToRead);
+ long startTime = timer.monotonicNow();
+ LogAction preLogAction = loadEditsLogHelper.record("pre", startTime);
+ if (preLogAction.shouldLog()) {
+ FSImage.LOG.info("Start loading edits file " + edits.getName()
+ + " maxTxnsToRead = " + maxTxnsToRead +
+ LogThrottlingHelper.getLogSupressionMessage(preLogAction));
+ }
long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
maxTxnsToRead, startOpt, recovery);
- FSImage.LOG.info("Edits file " + edits.getName()
- + " of size " + edits.length() + " edits # " + numEdits
- + " loaded in " + (monotonicNow()-startTime)/1000 + " seconds");
+ long endTime = timer.monotonicNow();
+ LogAction postLogAction = loadEditsLogHelper.record("post", endTime,
+ numEdits, edits.length(), endTime - startTime);
+ if (postLogAction.shouldLog()) {
+ FSImage.LOG.info("Loaded {} edits file(s) (the last named {}) of " +
+ "total size {}, total edits {}, total load time {} ms",
+ postLogAction.getCount(), edits.getName(),
+ postLogAction.getStats(1).getSum(),
+ postLogAction.getStats(0).getSum(),
+ postLogAction.getStats(2).getSum());
+ }
return numEdits;
} finally {
edits.close();
@@ -203,7 +232,7 @@ public class FSEditLogLoader {
Step step = createStartupProgressStep(in);
prog.setTotal(Phase.LOADING_EDITS, step, numTxns);
Counter counter = prog.getCounter(Phase.LOADING_EDITS, step);
- long lastLogTime = monotonicNow();
+ long lastLogTime = timer.monotonicNow();
long lastInodeId = fsNamesys.dir.getLastInodeId();
try {
@@ -283,7 +312,7 @@ public class FSEditLogLoader {
}
// log progress
if (op.hasTransactionId()) {
- long now = monotonicNow();
+ long now = timer.monotonicNow();
if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1;
int percent = Math.round((float) deltaTxId / numTxns * 100);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 3d347d9..f8dff1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -69,6 +69,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.log.LogThrottlingHelper;
+import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Time;
@@ -124,6 +126,11 @@ public class FSImage implements Closeable {
private final Set<Long> currentlyCheckpointing =
Collections.<Long>synchronizedSet(new HashSet<Long>());
+ /** Limit logging about edit loading to every 5 seconds max. */
+ private static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
+ private final LogThrottlingHelper loadEditLogHelper =
+ new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
+
/**
* Construct an FSImage
* @param conf Configuration
@@ -886,8 +893,16 @@ public class FSImage implements Closeable {
// Load latest edits
for (EditLogInputStream editIn : editStreams) {
- LOG.info("Reading " + editIn + " expecting start txid #" +
- (lastAppliedTxId + 1));
+ LogAction logAction = loadEditLogHelper.record();
+ if (logAction.shouldLog()) {
+ String logSuppressed = "";
+ if (logAction.getCount() > 1) {
+ logSuppressed = "; suppressed logging for " +
+ (logAction.getCount() - 1) + " edit reads";
+ }
+ LOG.info("Reading " + editIn + " expecting start txid #" +
+ (lastAppliedTxId + 1) + logSuppressed);
+ }
try {
loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead,
startOpt, recovery);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
index 6c42c82..19e046d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
+import org.apache.hadoop.log.LogThrottlingHelper;
+import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
/**
* A merged input stream that handles failover between different edit logs.
@@ -43,6 +45,11 @@ class RedundantEditLogInputStream extends EditLogInputStream {
private long prevTxId;
private final EditLogInputStream[] streams;
+ /** Limit logging about fast forwarding the stream to every 5 seconds max. */
+ private static final long FAST_FORWARD_LOGGING_INTERVAL_MS = 5000;
+ private final LogThrottlingHelper fastForwardLoggingHelper =
+ new LogThrottlingHelper(FAST_FORWARD_LOGGING_INTERVAL_MS);
+
/**
* States that the RedundantEditLogInputStream can be in.
*
@@ -174,8 +181,12 @@ class RedundantEditLogInputStream extends EditLogInputStream {
case SKIP_UNTIL:
try {
if (prevTxId != HdfsServerConstants.INVALID_TXID) {
- LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
- "' to transaction ID " + (prevTxId + 1));
+ LogAction logAction = fastForwardLoggingHelper.record();
+ if (logAction.shouldLog()) {
+ LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
+ "' to transaction ID " + (prevTxId + 1) +
+ LogThrottlingHelper.getLogSupressionMessage(logAction));
+ }
streams[curIdx].skipUntil(prevTxId + 1);
}
} catch (IOException e) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
index daeeff2..57c0453 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
@@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import java.io.BufferedInputStream;
import java.io.File;
@@ -61,7 +64,9 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.test.PathUtils;
+import org.apache.hadoop.util.FakeTimer;
import org.slf4j.event.Level;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -101,6 +106,7 @@ public class TestFSEditLogLoader {
private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class);
private static final int NUM_DATA_NODES = 0;
+ private static final String FAKE_EDIT_STREAM_NAME = "FAKE_STREAM";
private final ErasureCodingPolicy testECPolicy
= StripedFileTestUtil.getDefaultECPolicy();
@@ -799,4 +805,46 @@ public class TestFSEditLogLoader {
}
}
}
+
+ @Test
+ public void setLoadFSEditLogThrottling() throws Exception {
+ FSNamesystem namesystem = mock(FSNamesystem.class);
+ namesystem.dir = mock(FSDirectory.class);
+
+ FakeTimer timer = new FakeTimer();
+ FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0, timer);
+
+ LogCapturer capture = LogCapturer.captureLogs(FSImage.LOG);
+ loader.loadFSEdits(getFakeEditLogInputStream(1, 10), 1);
+ assertTrue(capture.getOutput().contains("Start loading edits file " +
+ FAKE_EDIT_STREAM_NAME));
+ assertTrue(capture.getOutput().contains("Loaded 1 edits file(s)"));
+ assertFalse(capture.getOutput().contains("suppressed"));
+
+ timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS / 2);
+ capture.clearOutput();
+ loader.loadFSEdits(getFakeEditLogInputStream(11, 20), 11);
+ assertFalse(capture.getOutput().contains("Start loading edits file"));
+ assertFalse(capture.getOutput().contains("edits file(s)"));
+
+ timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS);
+ capture.clearOutput();
+ loader.loadFSEdits(getFakeEditLogInputStream(21, 30), 21);
+ assertTrue(capture.getOutput().contains("Start loading edits file " +
+ FAKE_EDIT_STREAM_NAME));
+ assertTrue(capture.getOutput().contains("suppressed logging 1 times"));
+ assertTrue(capture.getOutput().contains("Loaded 2 edits file(s)"));
+ assertTrue(capture.getOutput().contains("total size 2.0"));
+ }
+
+ private EditLogInputStream getFakeEditLogInputStream(long startTx, long endTx)
+ throws IOException {
+ EditLogInputStream fakeStream = mock(EditLogInputStream.class);
+ when(fakeStream.getName()).thenReturn(FAKE_EDIT_STREAM_NAME);
+ when(fakeStream.getFirstTxId()).thenReturn(startTx);
+ when(fakeStream.getLastTxId()).thenReturn(endTx);
+ when(fakeStream.length()).thenReturn(1L);
+ return fakeStream;
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org