You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/06/28 23:06:27 UTC

[hadoop] 24/50: HDFS-13791. Limit logging frequency of edit tail related statements. Contributed by Erik Krogen.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 3998bab3da0f28bc3a76a032bd2ca7db2d7291ba
Author: Chen Liang <cl...@apache.org>
AuthorDate: Thu Sep 27 10:12:37 2018 -0700

    HDFS-13791. Limit logging frequency of edit tail related statements. Contributed by Erik Krogen.
---
 .../org/apache/hadoop/log/LogThrottlingHelper.java | 34 +++++++++++++++
 .../hdfs/qjournal/client/QuorumJournalManager.java | 15 ++++++-
 .../hdfs/server/namenode/FSEditLogLoader.java      | 47 +++++++++++++++++----
 .../hadoop/hdfs/server/namenode/FSImage.java       | 19 ++++++++-
 .../namenode/RedundantEditLogInputStream.java      | 15 ++++++-
 .../hdfs/server/namenode/TestFSEditLogLoader.java  | 48 ++++++++++++++++++++++
 6 files changed, 163 insertions(+), 15 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java
index aa4e61c..591c3fb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java
@@ -273,6 +273,40 @@ public class LogThrottlingHelper {
   }
 
   /**
+   * Return the summary information for given index.
+   *
+   * @param recorderName The name of the recorder.
+   * @param idx The index value.
+   * @return The summary information.
+   */
+  public SummaryStatistics getCurrentStats(String recorderName, int idx) {
+    LoggingAction currentLog = currentLogs.get(recorderName);
+    if (currentLog != null) {
+      return currentLog.getStats(idx);
+    }
+
+    return null;
+  }
+
+  /**
+   * Helper function to create a message about how many log statements were
+   * suppressed in the provided log action. If no statements were suppressed,
+   * this returns an empty string. The message has the format (without quotes):
+   *
+   * <p/>' (suppressed logging <i>{suppression_count}</i> times)'
+   *
+   * @param action The log action to produce a message about.
+   * @return A message about suppression within this action.
+   */
+  public static String getLogSupressionMessage(LogAction action) {
+    if (action.getCount() > 1) {
+      return " (suppressed logging " + (action.getCount() - 1) + " times)";
+    } else {
+      return "";
+    }
+  }
+
+  /**
    * A standard log action which keeps track of all of the values which have
    * been logged. This is also used for internal bookkeeping via its private
    * fields and methods; it will maintain whether or not it is ready to be
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
index a948afb..703443e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
@@ -54,6 +54,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.apache.hadoop.log.LogThrottlingHelper;
+import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
@@ -105,6 +107,11 @@ public class QuorumJournalManager implements JournalManager {
   private int outputBufferCapacity = 512 * 1024;
   private final URLConnectionFactory connectionFactory;
 
+  /** Limit logging about input stream selection to every 5 seconds max. */
+  private static final long SELECT_INPUT_STREAM_LOG_INTERVAL_MS = 5000;
+  private final LogThrottlingHelper selectInputStreamLogHelper =
+      new LogThrottlingHelper(SELECT_INPUT_STREAM_LOG_INTERVAL_MS);
+
   @VisibleForTesting
   public QuorumJournalManager(Configuration conf,
                               URI uri,
@@ -568,8 +575,12 @@ public class QuorumJournalManager implements JournalManager {
           "ID " + fromTxnId);
       return;
     }
-    LOG.info("Selected loggers with >= " + maxAllowedTxns +
-        " transactions starting from " + fromTxnId);
+    LogAction logAction = selectInputStreamLogHelper.record(fromTxnId);
+    if (logAction.shouldLog()) {
+      LOG.info("Selected loggers with >= " + maxAllowedTxns + " transactions " +
+          "starting from lowest txn ID " + logAction.getStats(0).getMin() +
+          LogThrottlingHelper.getLogSupressionMessage(logAction));
+    }
     PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<>(
         JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
     for (GetJournaledEditsResponseProto resp : responseMap.values()) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index f3b6b84..6755487 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
-import static org.apache.hadoop.util.Time.monotonicNow;
 
 import java.io.FilterInputStream;
 import java.io.IOException;
@@ -113,11 +112,16 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.util.Holder;
+import org.apache.hadoop.log.LogThrottlingHelper;
 import org.apache.hadoop.util.ChunkedArrayList;
+import org.apache.hadoop.util.Timer;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 
+import static org.apache.hadoop.log.LogThrottlingHelper.LogAction;
+
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class FSEditLogLoader {
@@ -125,16 +129,29 @@ public class FSEditLogLoader {
       LoggerFactory.getLogger(FSEditLogLoader.class.getName());
   static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
 
+  /** Limit logging about edit loading to every 5 seconds max. */
+  @VisibleForTesting
+  static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
+  private final LogThrottlingHelper loadEditsLogHelper =
+      new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
+
   private final FSNamesystem fsNamesys;
   private final BlockManager blockManager;
+  private final Timer timer;
   private long lastAppliedTxId;
   /** Total number of end transactions loaded. */
   private int totalEdits = 0;
   
   public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
+    this(fsNamesys, lastAppliedTxId, new Timer());
+  }
+
+  @VisibleForTesting
+  FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId, Timer timer) {
     this.fsNamesys = fsNamesys;
     this.blockManager = fsNamesys.getBlockManager();
     this.lastAppliedTxId = lastAppliedTxId;
+    this.timer = timer;
   }
   
   long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
@@ -155,14 +172,26 @@ public class FSEditLogLoader {
     prog.beginStep(Phase.LOADING_EDITS, step);
     fsNamesys.writeLock();
     try {
-      long startTime = monotonicNow();
-      FSImage.LOG.info("Start loading edits file " + edits.getName()
-          + " maxTxnsToRead = " + maxTxnsToRead);
+      long startTime = timer.monotonicNow();
+      LogAction preLogAction = loadEditsLogHelper.record("pre", startTime);
+      if (preLogAction.shouldLog()) {
+        FSImage.LOG.info("Start loading edits file " + edits.getName()
+            + " maxTxnsToRead = " + maxTxnsToRead +
+            LogThrottlingHelper.getLogSupressionMessage(preLogAction));
+      }
       long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
           maxTxnsToRead, startOpt, recovery);
-      FSImage.LOG.info("Edits file " + edits.getName() 
-          + " of size " + edits.length() + " edits # " + numEdits 
-          + " loaded in " + (monotonicNow()-startTime)/1000 + " seconds");
+      long endTime = timer.monotonicNow();
+      LogAction postLogAction = loadEditsLogHelper.record("post", endTime,
+          numEdits, edits.length(), endTime - startTime);
+      if (postLogAction.shouldLog()) {
+        FSImage.LOG.info("Loaded {} edits file(s) (the last named {}) of " +
+                "total size {}, total edits {}, total load time {} ms",
+            postLogAction.getCount(), edits.getName(),
+            postLogAction.getStats(1).getSum(),
+            postLogAction.getStats(0).getSum(),
+            postLogAction.getStats(2).getSum());
+      }
       return numEdits;
     } finally {
       edits.close();
@@ -203,7 +232,7 @@ public class FSEditLogLoader {
     Step step = createStartupProgressStep(in);
     prog.setTotal(Phase.LOADING_EDITS, step, numTxns);
     Counter counter = prog.getCounter(Phase.LOADING_EDITS, step);
-    long lastLogTime = monotonicNow();
+    long lastLogTime = timer.monotonicNow();
     long lastInodeId = fsNamesys.dir.getLastInodeId();
     
     try {
@@ -283,7 +312,7 @@ public class FSEditLogLoader {
           }
           // log progress
           if (op.hasTransactionId()) {
-            long now = monotonicNow();
+            long now = timer.monotonicNow();
             if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
               long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1;
               int percent = Math.round((float) deltaTxId / numTxns * 100);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 3d347d9..f8dff1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -69,6 +69,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.log.LogThrottlingHelper;
+import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.Time;
 
@@ -124,6 +126,11 @@ public class FSImage implements Closeable {
   private final Set<Long> currentlyCheckpointing =
       Collections.<Long>synchronizedSet(new HashSet<Long>());
 
+  /** Limit logging about edit loading to every 5 seconds max. */
+  private static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
+  private final LogThrottlingHelper loadEditLogHelper =
+      new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
+
   /**
    * Construct an FSImage
    * @param conf Configuration
@@ -886,8 +893,16 @@ public class FSImage implements Closeable {
       
       // Load latest edits
       for (EditLogInputStream editIn : editStreams) {
-        LOG.info("Reading " + editIn + " expecting start txid #" +
-              (lastAppliedTxId + 1));
+        LogAction logAction = loadEditLogHelper.record();
+        if (logAction.shouldLog()) {
+          String logSuppressed = "";
+          if (logAction.getCount() > 1) {
+            logSuppressed = "; suppressed logging for " +
+                (logAction.getCount() - 1) + " edit reads";
+          }
+          LOG.info("Reading " + editIn + " expecting start txid #" +
+              (lastAppliedTxId + 1) + logSuppressed);
+        }
         try {
           loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead,
               startOpt, recovery);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
index 6c42c82..19e046d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.io.IOUtils;
 
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Longs;
+import org.apache.hadoop.log.LogThrottlingHelper;
+import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
 
 /**
  * A merged input stream that handles failover between different edit logs.
@@ -43,6 +45,11 @@ class RedundantEditLogInputStream extends EditLogInputStream {
   private long prevTxId;
   private final EditLogInputStream[] streams;
 
+  /** Limit logging about fast forwarding the stream to every 5 seconds max. */
+  private static final long FAST_FORWARD_LOGGING_INTERVAL_MS = 5000;
+  private final LogThrottlingHelper fastForwardLoggingHelper =
+      new LogThrottlingHelper(FAST_FORWARD_LOGGING_INTERVAL_MS);
+
   /**
    * States that the RedundantEditLogInputStream can be in.
    *
@@ -174,8 +181,12 @@ class RedundantEditLogInputStream extends EditLogInputStream {
       case SKIP_UNTIL:
        try {
           if (prevTxId != HdfsServerConstants.INVALID_TXID) {
-            LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
-                "' to transaction ID " + (prevTxId + 1));
+            LogAction logAction = fastForwardLoggingHelper.record();
+            if (logAction.shouldLog()) {
+              LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
+                  "' to transaction ID " + (prevTxId + 1) +
+                  LogThrottlingHelper.getLogSupressionMessage(logAction));
+            }
             streams[curIdx].skipUntil(prevTxId + 1);
           }
         } catch (IOException e) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
index daeeff2..57c0453 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
@@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import java.io.BufferedInputStream;
 import java.io.File;
@@ -61,7 +64,9 @@ import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.apache.hadoop.test.PathUtils;
+import org.apache.hadoop.util.FakeTimer;
 import org.slf4j.event.Level;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -101,6 +106,7 @@ public class TestFSEditLogLoader {
   private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class);
 
   private static final int NUM_DATA_NODES = 0;
+  private static final String FAKE_EDIT_STREAM_NAME = "FAKE_STREAM";
 
   private final ErasureCodingPolicy testECPolicy
       = StripedFileTestUtil.getDefaultECPolicy();
@@ -799,4 +805,46 @@ public class TestFSEditLogLoader {
       }
     }
   }
+
+  @Test
+  public void setLoadFSEditLogThrottling() throws Exception {
+    FSNamesystem namesystem = mock(FSNamesystem.class);
+    namesystem.dir = mock(FSDirectory.class);
+
+    FakeTimer timer = new FakeTimer();
+    FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0, timer);
+
+    LogCapturer capture = LogCapturer.captureLogs(FSImage.LOG);
+    loader.loadFSEdits(getFakeEditLogInputStream(1, 10), 1);
+    assertTrue(capture.getOutput().contains("Start loading edits file " +
+        FAKE_EDIT_STREAM_NAME));
+    assertTrue(capture.getOutput().contains("Loaded 1 edits file(s)"));
+    assertFalse(capture.getOutput().contains("suppressed"));
+
+    timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS / 2);
+    capture.clearOutput();
+    loader.loadFSEdits(getFakeEditLogInputStream(11, 20), 11);
+    assertFalse(capture.getOutput().contains("Start loading edits file"));
+    assertFalse(capture.getOutput().contains("edits file(s)"));
+
+    timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS);
+    capture.clearOutput();
+    loader.loadFSEdits(getFakeEditLogInputStream(21, 30), 21);
+    assertTrue(capture.getOutput().contains("Start loading edits file " +
+        FAKE_EDIT_STREAM_NAME));
+    assertTrue(capture.getOutput().contains("suppressed logging 1 times"));
+    assertTrue(capture.getOutput().contains("Loaded 2 edits file(s)"));
+    assertTrue(capture.getOutput().contains("total size 2.0"));
+  }
+
+  private EditLogInputStream getFakeEditLogInputStream(long startTx, long endTx)
+      throws IOException {
+    EditLogInputStream fakeStream = mock(EditLogInputStream.class);
+    when(fakeStream.getName()).thenReturn(FAKE_EDIT_STREAM_NAME);
+    when(fakeStream.getFirstTxId()).thenReturn(startTx);
+    when(fakeStream.getLastTxId()).thenReturn(endTx);
+    when(fakeStream.length()).thenReturn(1L);
+    return fakeStream;
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org