You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2020/09/11 01:43:03 UTC

[kudu] branch master updated: KUDU-3012: Add a Log Throttler

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fc3ce7  KUDU-3012: Add a Log Throttler
8fc3ce7 is described below

commit 8fc3ce7040b485424d196e6f9032c7b22b43b122
Author: Mahesh Reddy <mr...@cloudera.com>
AuthorDate: Tue Sep 1 17:32:37 2020 -0700

    KUDU-3012: Add a Log Throttler
    
    LogThrottler implements a simple log throttler by logging at most one
    message per a time period that can be set at the call-site. Each
    instance of this class is designed to throttle regardless of message.
    
    Change-Id: Ia2089b6fc905a5b54d664b7200060cabb965f40f
    Reviewed-on: http://gerrit.cloudera.org:8080/16400
    Tested-by: Kudu Jenkins
    Reviewed-by: Bankim Bhavsar <ba...@cloudera.com>
    Reviewed-by: Grant Henke <gr...@apache.org>
---
 .../org/apache/kudu/client/AsyncKuduSession.java   |   7 +-
 .../java/org/apache/kudu/util/LogThrottler.java    | 316 +++++++++++++++++++++
 .../org/apache/kudu/util/TestLogThrottler.java     |  62 ++++
 3 files changed, 384 insertions(+), 1 deletion(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index c7d4e64..90eb526 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.client.AsyncKuduClient.LookupType;
 import org.apache.kudu.util.AsyncUtil;
+import org.apache.kudu.util.LogThrottler;
 import org.apache.kudu.util.Slice;
 
 /**
@@ -114,6 +115,10 @@ import org.apache.kudu.util.Slice;
 public class AsyncKuduSession implements SessionConfiguration {
 
   public static final Logger LOG = LoggerFactory.getLogger(AsyncKuduSession.class);
+  /**
+   * Instance of LogThrottler isn't static so we can throttle messages per session
+   */
+  private final LogThrottler throttleClosedLog = new LogThrottler(LOG);
 
   private final AsyncKuduClient client;
   private final Random randomizer = new Random();
@@ -549,7 +554,7 @@ public class AsyncKuduSession implements SessionConfiguration {
     if (closed) {
       // Ideally this would be a precondition, but that may break existing
       // clients who have grown to rely on this unsafe behavior.
-      LOG.warn("Applying an operation in a closed session; this is unsafe");
+      throttleClosedLog.warn(60L, "Applying an operation in a closed session; this is unsafe");
     }
 
     // Freeze the row so that the client cannot concurrently modify it while it is in flight.
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/LogThrottler.java b/java/kudu-client/src/main/java/org/apache/kudu/util/LogThrottler.java
new file mode 100644
index 0000000..00dc224
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/LogThrottler.java
@@ -0,0 +1,316 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.util;
+
+import java.time.Instant;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+
+/**
+ * This class suppresses messages by not allowing more than one message per a number of seconds
+ * provided at the call-site of the logging functions. Each instance of this class is designed to
+ * throttle regardless of the message.
+ * TODO(mreddy): If functionality is ever expanded, use ConcurrentHashMap to store multiple messages
+ * and the last time it was logged, only one instance of LogThrottler will be needed per class
+ * as this would be used at multiple call-sites to throttle different messages
+ * TODO(mreddy): Use integer as hashing key rather than string for performance costs, store integers
+ * on file with call-sites, put onus on devs to provide integers for each unique message to throttle
+ * TODO(mreddy): Add count to keep track of how many messages have been suppressed
+ */
+@InterfaceAudience.Private
+public class LogThrottler {
+
+  private final Logger log;
+  private long lastLoggedTimeSecs = -1;
+
+  public LogThrottler(Logger log) {
+    this.log = log;
+  }
+
+  /**
+   * Throttles the log trace message 'msg' if the last message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param msg string message to be logged
+   */
+  public void trace(long seconds, String msg) {
+    if (shouldLog(seconds)) {
+      log.trace(msg);
+    }
+  }
+
+  /**
+   * Throttles the log trace message according to specified format and argument if the last
+   * message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param format format string
+   * @param arg argument for format string
+   */
+  public void trace(long seconds, String format, Object arg) {
+    if (shouldLog(seconds)) {
+      log.trace(format, arg);
+    }
+  }
+
+  /**
+   * Throttles the log trace message according to specified format and arguments if the last
+   * message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param format format string
+   * @param arg1 first argument for format string
+   * @param arg2 second argument for format string
+   */
+  public void trace(long seconds, String format, Object arg1, Object arg2) {
+    if (shouldLog(seconds)) {
+      log.trace(format, arg1, arg2);
+    }
+  }
+
+  /**
+   * Throttles the log trace message according to specified format and arguments if the last
+   * message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param format format string
+   * @param arguments list of 3 or more arguments for format string
+   */
+  public void trace(long seconds, String format, Object... arguments) {
+    if (shouldLog(seconds)) {
+      log.trace(format, arguments);
+    }
+  }
+
+  /**
+   * Throttles the log warn message 'msg' if the last message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param msg string message to be logged
+   */
+  public void warn(long seconds, String msg) {
+    if (shouldLog(seconds)) {
+      log.warn(msg);
+    }
+  }
+
+  /**
+   * Throttles the log warn message according to specified format and argument if the last
+   * message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param format format string
+   * @param arg argument for format string
+   */
+  public void warn(long seconds, String format, Object arg) {
+    if (shouldLog(seconds)) {
+      log.warn(format, arg);
+    }
+  }
+
+  /**
+   * Throttles the log warn message according to specified format and arguments if the last
+   * message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param format format string
+   * @param arg1 first argument for format string
+   * @param arg2 second argument for format string
+   */
+  public void warn(long seconds, String format, Object arg1, Object arg2) {
+    if (shouldLog(seconds)) {
+      log.warn(format, arg1, arg2);
+    }
+  }
+
+  /**
+   * Throttles the log warn message according to specified format and arguments if the last
+   * message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param format format string
+   * @param arguments list of 3 or more arguments for format string
+   */
+  public void warn(long seconds, String format, Object... arguments) {
+    if (shouldLog(seconds)) {
+      log.warn(format, arguments);
+    }
+  }
+
+  /**
+   * Throttles the log error message 'msg' if the last message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param msg string message to be logged
+   */
+  public void error(long seconds, String msg) {
+    if (shouldLog(seconds)) {
+      log.error(msg);
+    }
+  }
+
+  /**
+   * Throttles the log error message according to specified format and argument if the last
+   * message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param format format string
+   * @param arg argument for format string
+   */
+  public void error(long seconds, String format, Object arg) {
+    if (shouldLog(seconds)) {
+      log.error(format, arg);
+    }
+  }
+
+  /**
+   * Throttles the log error message according to specified format and arguments if the last
+   * message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param format format string
+   * @param arg1 first argument for format string
+   * @param arg2 second argument for format string
+   */
+  public void error(long seconds, String format, Object arg1, Object arg2) {
+    if (shouldLog(seconds)) {
+      log.error(format, arg1, arg2);
+    }
+  }
+
+  /**
+   * Throttles the log error message according to specified format and arguments if the last
+   * message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param format format string
+   * @param arguments list of 3 or more arguments for format string
+   */
+  public void error(long seconds, String format, Object... arguments) {
+    if (shouldLog(seconds)) {
+      log.error(format, arguments);
+    }
+  }
+
+  /**
+   * Throttles the log info message 'msg' if the last message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param msg string message to be logged
+   */
+  public void info(long seconds, String msg) {
+    if (shouldLog(seconds)) {
+      log.info(msg);
+    }
+  }
+
+  /**
+   * Throttles the log info message according to specified format and argument if the last
+   * message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param format format string
+   * @param arg argument for format string
+   */
+  public void info(long seconds, String format, Object arg) {
+    if (shouldLog(seconds)) {
+      log.info(format, arg);
+    }
+  }
+
+  /**
+   * Throttles the log info message according to specified format and arguments if the last
+   * message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param format format string
+   * @param arg1 first argument for format string
+   * @param arg2 second argument for format string
+   */
+  public void info(long seconds, String format, Object arg1, Object arg2) {
+    if (shouldLog(seconds)) {
+      log.info(format, arg1, arg2);
+    }
+  }
+
+  /**
+   * Throttles the log info message according to specified format and arguments if the last
+   * message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param format format string
+   * @param arguments list of 3 or more arguments for format string
+   */
+  public void info(long seconds, String format, Object... arguments) {
+    if (shouldLog(seconds)) {
+      log.info(format, arguments);
+    }
+  }
+
+  /**
+   * Throttles the log debug message 'msg' if the last message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param msg string message to be logged
+   */
+  public void debug(long seconds, String msg) {
+    if (shouldLog(seconds)) {
+      log.debug(msg);
+    }
+  }
+
+  /**
+   * Throttles the log debug message according to specified format and argument if the last
+   * message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param format format string
+   * @param arg argument for format string
+   */
+  public void debug(long seconds, String format, Object arg) {
+    if (shouldLog(seconds)) {
+      log.debug(format, arg);
+    }
+  }
+
+  /**
+   * Throttles the log debug message according to specified format and arguments if the last
+   * message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param format format string
+   * @param arg1 first argument for format string
+   * @param arg2 second argument for format string
+   */
+  public void debug(long seconds, String format, Object arg1, Object arg2) {
+    if (shouldLog(seconds)) {
+      log.debug(format, arg1, arg2);
+    }
+  }
+
+  /**
+   * Throttles the log debug message according to specified format and arguments if the last
+   * message was logged less than 'seconds' ago
+   * @param seconds number of seconds between each desired log message
+   * @param format format string
+   * @param arguments list of 3 or more arguments for format string
+   */
+  public void debug(long seconds, String format, Object... arguments) {
+    if (shouldLog(seconds)) {
+      log.debug(format, arguments);
+    }
+  }
+
+  /**
+   * Returns true if first time logging message or it's been more than longer than the parameter
+   * duration in seconds indicating to call-site to log the message, returns false to let call-site
+   * know not to log the message
+   * @param throttlingIntervalSecs number of seconds between each desired log message
+   * @return boolean indicating whether or not to log
+   */
+  private synchronized boolean shouldLog(long throttlingIntervalSecs) {
+    long nowSecs = Instant.now().getEpochSecond();
+    if (lastLoggedTimeSecs == -1 || lastLoggedTimeSecs + throttlingIntervalSecs < nowSecs) {
+      lastLoggedTimeSecs = nowSecs;
+      return true;
+    }
+    return false;
+  }
+}
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/util/TestLogThrottler.java b/java/kudu-client/src/test/java/org/apache/kudu/util/TestLogThrottler.java
new file mode 100644
index 0000000..9b0a123
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/util/TestLogThrottler.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.util;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Closeable;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.test.CapturingLogAppender;
+
+/**
+ * Test for {@link LogThrottler}. Tries logging eight similar messages but only one should be logged
+ * every second according to the parameter so only first message will be logged. Rest of messages
+ * until thread is paused should be suppressed. Thread is paused for two seconds after fourth
+ * message, the subsequent message should be logged then the remaining logs should be suppressed.
+ */
+public class TestLogThrottler {
+
+  @Test
+  public void test() throws Exception {
+    Logger log = LoggerFactory.getLogger(TestLogThrottler.class);
+    LogThrottler logThrottler = new LogThrottler(log);
+    CapturingLogAppender messageChecker = new CapturingLogAppender();
+    try (Closeable c = messageChecker.attach()) {
+      for (int i = 0; i < 8; i++) {
+        logThrottler.info(1L,"Logging {}", i);
+        if (i == 3) {
+          Thread.sleep(2000);
+        }
+      }
+    }
+    String output = messageChecker.getAppendedText();
+    assertTrue("Log doesn't contain Logging 0", output.contains("Logging 0"));
+    for (int i = 1; i <= 3; i++) {
+      assertFalse("Log contains Logging " + i, output.contains("Logging " + i));
+    }
+    assertTrue("Log doesn't contain Logging 4", output.contains("Logging 4"));
+    for (int i = 5; i <= 7; i++) {
+      assertFalse("Log contains Logging " + i, output.contains("Logging " + i));
+    }
+  }
+}