You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2021/11/12 19:55:05 UTC

[impala] branch master updated: IMPALA-5256: Force log rotation when max_log_size exceeded

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new b692a92  IMPALA-5256: Force log rotation when max_log_size exceeded
b692a92 is described below

commit b692a92fa2a2277a185fb5823592609b4603c0d8
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Tue Nov 2 09:21:33 2021 -0700

    IMPALA-5256: Force log rotation when max_log_size exceeded
    
    Impala daemons allow STDOUT/STDERR redirection into INFO/ERROR log
    respectively through redirect_stdout_stderr startup flag. If
    redirect_stdout_stderr is true, daemons redirect STDOUT/STDERR stream to
    write into the log file symlink created by glog. There are two problems
    with this approach:
    
    1. Glog updates the symlink to point to the new log file when it does
       log rotation. However, Impala is not aware that the symlink point to
       a different file. So cout/cerr write still goes to the oldest log
       file.
    
    2. When there is a lot of write activity to cout/cerr, the log file can
       grow big. However, glog is not aware of STDOUT/STDERR activity. It
       only counts the message bytes written to glog (LOG(INFO),
       LOG(ERROR)). Thus, it only uses its internal bytes count when
       deciding to rotate the logs.
    
    This commit addresses the issue by monitoring the log file size every
    second. If Impala sees that the log file has exceeded max_log_size, it
    will call google::FlushLogFiles(), ahead of logbufsecs. If the log file
    stays big after the flush, we will force the glog to rotate the log.
    Since there is no direct way to force glog to rotate, we do this by
    changing the log extension to random extension through
    google::SetLogFilenameExtension(), and immediately return them to
    extensionless (empty string extension).
    
    We also check periodically whether the log file symlink has pointed to a
    new file. If it has changed, we reattach the STDOUT/STDERR stream to the
    new log file.
    
    Testing:
    - Pass the core test.
    - Add new exhaustive test TestLogging::test_excessive_cerr.
    
    Change-Id: I1b94727180354fe69989ebf3cd1a8f8cda1cf0c3
    Reviewed-on: http://gerrit.cloudera.org:8080/17997
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/init.cc                 |  50 ++++++++++++-
 be/src/common/logging.cc              | 129 +++++++++++++++++++++++++++-------
 be/src/common/logging.h               |  15 ++++
 be/src/util/filesystem-util.cc        |  20 +++++-
 be/src/util/filesystem-util.h         |   4 ++
 tests/custom_cluster/test_breakpad.py |  65 +++++++++++++++++
 6 files changed, 251 insertions(+), 32 deletions(-)

diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index d6ab972..58cb601 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -19,7 +19,6 @@
 
 #include <csignal>
 #include <gperftools/heap-profiler.h>
-#include <gperftools/malloc_extension.h>
 #include <third_party/lss/linux_syscall_support.h>
 
 #include "common/global-flags.h"
@@ -34,7 +33,6 @@
 #include "rpc/thrift-util.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/datetime-simple-date-format-parser.h"
-#include "runtime/decimal-value.h"
 #include "runtime/exec-env.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/lib-cache.h"
@@ -76,6 +74,7 @@ DECLARE_string(redaction_rules_file);
 DECLARE_bool(redirect_stdout_stderr);
 DECLARE_string(reserved_words_version);
 DECLARE_bool(symbolize_stacktrace);
+DECLARE_string(debug_actions);
 
 DEFINE_int32(memory_maintenance_sleep_time_ms, 10000, "Sleep time in milliseconds "
     "between memory maintenance iterations");
@@ -123,6 +122,10 @@ static unique_ptr<impala::Thread> pause_monitor;
 // Thread only used in backend tests to implement a test timeout.
 static unique_ptr<impala::Thread> be_timeout_thread;
 
+// Fault injection thread that is spawned if FLAGS_debug_actions has label
+// 'LOG_MAINTENANCE_STDERR'.
+static unique_ptr<impala::Thread> log_fault_inject_thread;
+
 // Timeout after 2 hours - backend tests should generally run in minutes or tens of
 // minutes at worst.
 #if defined(UNDEFINED_SANITIZER_FULL)
@@ -135,20 +138,54 @@ static const int64_t BE_TEST_TIMEOUT_S = 60L * 60L * 2L;
 extern "C" { void __gcov_flush(); }
 #endif
 
+[[noreturn]] static void LogFaultInjectionThread() {
+  const int64_t sleep_duration = 1;
+  while (true) {
+    sleep(sleep_duration);
+
+    const int64_t now = MonotonicMillis();
+    Status status = DebugAction(FLAGS_debug_actions, "LOG_MAINTENANCE_STDERR");
+    if (!status.ok()) {
+      // Fault injection activated. Print the error message several times to cerr.
+      for (int i = 0; i < 128; i++) {
+        std::cerr << now << " " << i << " "
+                  << " LOG_MAINTENANCE_STDERR " << status.msg().msg() << endl;
+      }
+    }
+  }
+}
+
 [[noreturn]] static void LogMaintenanceThread() {
+  int64_t last_flush = MonotonicMillis();
+  const int64_t sleep_duration = std::min(1, FLAGS_logbufsecs);
   while (true) {
-    sleep(FLAGS_logbufsecs);
+    sleep(sleep_duration);
+
+    const int64_t now = MonotonicMillis();
+    bool max_log_file_exceeded = RedirectStdoutStderr() && impala::CheckLogSize();
+    if ((now - last_flush) / 1000 < FLAGS_logbufsecs && !max_log_file_exceeded) {
+      continue;
+    }
 
     google::FlushLogFiles(google::GLOG_INFO);
 
+    // Check log size again and force log rotation this time if they still big after
+    // FlushLogFiles.
+    if (max_log_file_exceeded && impala::CheckLogSize()) impala::ForceRotateLog();
+
     // No need to rotate log files in tests.
     if (impala::TestInfo::is_test()) continue;
+    // Reattach stdout and stderr if necessary.
+    if (impala::RedirectStdoutStderr()) impala::AttachStdoutStderr();
     // Check for log rotation in every interval of the maintenance thread
     impala::CheckAndRotateLogFiles(FLAGS_max_log_files);
     // Check for minidump rotation in every interval of the maintenance thread. This is
     // necessary since an arbitrary number of minidumps can be written by sending SIGUSR1
     // to the process.
     impala::CheckAndRotateMinidumps(FLAGS_max_minidumps);
+
+    // update last_flush.
+    last_flush = MonotonicMillis();
   }
 }
 
@@ -327,6 +364,13 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
       Thread::Create("common", "pause-monitor", &PauseMonitorLoop, &pause_monitor);
   if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail());
 
+  // Initialize log fault injection if such debug action exist.
+  if (strstr(FLAGS_debug_actions.c_str(), "LOG_MAINTENANCE_STDERR") != NULL) {
+    thread_spawn_status = Thread::Create("common", "log-fault-inject-thread",
+        &LogFaultInjectionThread, &log_fault_inject_thread);
+    if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail());
+  }
+
   // Implement timeout for backend tests.
   if (impala::TestInfo::is_be_test()) {
     thread_spawn_status = Thread::Create("common", "be-test-timeout-thread",
diff --git a/be/src/common/logging.cc b/be/src/common/logging.cc
index 21b9ae9..debdd2f 100644
--- a/be/src/common/logging.cc
+++ b/be/src/common/logging.cc
@@ -18,34 +18,31 @@
 #include "common/logging.h"
 
 #include <stdio.h>
-#include <cerrno>
-#include <ctime>
 #include <fstream>
 #include <iomanip>
 #include <iostream>
-#include <map>
 #include <mutex>
-#include <sstream>
 #include <boost/uuid/uuid.hpp>
 #include <boost/uuid/uuid_generators.hpp>
 #include <boost/uuid/uuid_io.hpp>
 #include <gutil/strings/substitute.h>
 
-#include "common/logging.h"
+#include "common/thread-debug-info.h"
 #include "kudu/util/flags.h"
 #include "util/container-util.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
+#include "util/filesystem-util.h"
 #include "util/logging-support.h"
 #include "util/redactor.h"
 #include "util/test-info.h"
-#include "common/thread-debug-info.h"
 
 #include "common/names.h"
 
 DECLARE_string(redaction_rules_file);
 DECLARE_string(log_filename);
 DECLARE_bool(redirect_stdout_stderr);
+DECLARE_int32(max_log_size);
 
 using boost::uuids::random_generator;
 using impala::TUniqueId;
@@ -55,6 +52,9 @@ bool logging_initialized = false;
 // A 0 unique id, which indicates that one has not been set.
 const TUniqueId ZERO_UNIQUE_ID;
 
+string last_info_log_path = "";
+string last_error_log_path = "";
+
 // Prepends fragment id, when available. If unavailable, looks
 // for query id. If unavailable, prepends nothing.
 void PrependFragment(string* s, bool* changed) {
@@ -83,6 +83,58 @@ void MessageListener(string* s, bool* changed) {
 
 mutex logging_mutex;
 
+// Resolve 'symlink_path' into its 'canonical_path'.
+// If 'symlink_path' is not a symlink, copy it to 'canonical_path'.
+impala::Status ResolveLogSymlink(const string& symlink_path, string& canonical_path) {
+  bool is_symbolic_link;
+  string resolved_path;
+  RETURN_IF_ERROR(impala::FileSystemUtil::IsSymbolicLink(
+      symlink_path, &is_symbolic_link, &resolved_path));
+  canonical_path = is_symbolic_link ? resolved_path : symlink_path;
+  return impala::Status::OK();
+}
+
+// The main implementation of AttachStdoutStderr().
+// Caller must hold lock over logging_mutex.
+impala::Status AttachStdoutStderrLocked() {
+  // Needs to be done after InitGoogleLogging, to get the INFO/ERROR file paths.
+  // Redirect stdout to INFO log and stderr to ERROR log
+  string info_log_symlink_path, error_log_symlink_path;
+  impala::GetFullLogFilename(google::INFO, &info_log_symlink_path);
+  impala::GetFullLogFilename(google::ERROR, &error_log_symlink_path);
+
+  string info_log_path, error_log_path;
+  RETURN_IF_ERROR(ResolveLogSymlink(info_log_symlink_path, info_log_path));
+  RETURN_IF_ERROR(ResolveLogSymlink(error_log_symlink_path, error_log_path));
+
+  if (last_info_log_path != info_log_path || last_error_log_path != error_log_path) {
+    // Both INFO and ERROR log should be rotated together at the same time.
+    DCHECK_NE(last_info_log_path, info_log_path);
+    DCHECK_NE(last_error_log_path, error_log_path);
+
+    // The log files are created on first use, log something to each before redirecting.
+    LOG(INFO) << "stdout will be logged to this file.";
+    LOG(ERROR) << "stderr will be logged to this file.";
+
+    // Print to stderr/stdout before redirecting so people looking for these logs in
+    // the standard place know where to look.
+    cout << "Redirecting stdout to " << info_log_symlink_path << endl;
+    cerr << "Redirecting stderr to " << error_log_symlink_path << endl;
+
+    // TODO: how to handle these errors? Maybe abort the process?
+    if (freopen(info_log_path.c_str(), "a", stdout) == NULL) {
+      cout << "Could not redirect stdout: " << impala::GetStrErrMsg();
+    }
+    if (freopen(error_log_path.c_str(), "a", stderr) == NULL) {
+      cerr << "Could not redirect stderr: " << impala::GetStrErrMsg();
+    }
+
+    last_info_log_path = info_log_path;
+    last_error_log_path = error_log_path;
+  }
+  return impala::Status::OK();
+}
+
 void impala::InitGoogleLoggingSafe(const char* arg) {
   lock_guard<mutex> logging_lock(logging_mutex);
   if (logging_initialized) return;
@@ -103,7 +155,7 @@ void impala::InitGoogleLoggingSafe(const char* arg) {
   // Don't double log to stderr on any threshold.
   FLAGS_stderrthreshold = google::FATAL + 1;
 
-  if (FLAGS_redirect_stdout_stderr && !TestInfo::is_test()) {
+  if (RedirectStdoutStderr()) {
     // We will be redirecting stdout/stderr to INFO/LOG so override any glog settings
     // that log to stdout/stderr...
     FLAGS_logtostderr = false;
@@ -136,32 +188,55 @@ void impala::InitGoogleLoggingSafe(const char* arg) {
     FLAGS_log_filename = google::ProgramInvocationShortName();
   }
 
-  if (FLAGS_redirect_stdout_stderr && !TestInfo::is_test()) {
-    // Needs to be done after InitGoogleLogging, to get the INFO/ERROR file paths.
-    // Redirect stdout to INFO log and stderr to ERROR log
-    string info_log_path, error_log_path;
-    GetFullLogFilename(google::INFO, &info_log_path);
-    GetFullLogFilename(google::ERROR, &error_log_path);
+  if (RedirectStdoutStderr()) {
+    Status status = AttachStdoutStderrLocked();
+    if (!status.ok()) {
+      LOG(ERROR) << "Failed to attach STDOUT/STDERR: " << status.GetDetail();
+    }
+  }
 
-    // The log files are created on first use, log something to each before redirecting.
-    LOG(INFO) << "stdout will be logged to this file.";
-    LOG(ERROR) << "stderr will be logged to this file.";
+  logging_initialized = true;
+}
 
-    // Print to stderr/stdout before redirecting so people looking for these logs in
-    // the standard place know where to look.
-    cout << "Redirecting stdout to " << info_log_path << endl;
-    cerr << "Redirecting stderr to " << error_log_path << endl;
+void impala::AttachStdoutStderr() {
+  lock_guard<mutex> logging_lock(logging_mutex);
+  Status status = AttachStdoutStderrLocked();
+  if (!status.ok()) {
+    LOG(ERROR) << "Failed to attach STDOUT/STDERR: " << status.GetDetail();
+  }
+}
 
-    // TODO: how to handle these errors? Maybe abort the process?
-    if (freopen(info_log_path.c_str(), "a", stdout) == NULL) {
-      cout << "Could not redirect stdout: " << GetStrErrMsg();
-    }
-    if (freopen(error_log_path.c_str(), "a", stderr) == NULL) {
-      cerr << "Could not redirect stderr: " << GetStrErrMsg();
+bool impala::CheckLogSize() {
+  lock_guard<mutex> logging_lock(logging_mutex);
+  int log_to_check[2] = {google::INFO, google::ERROR};
+  bool max_log_file_exceeded = false;
+  for (int log_level : log_to_check) {
+    uintmax_t file_size = 0;
+    string log_path;
+    GetFullLogFilename(log_level, &log_path);
+    Status status = FileSystemUtil::ApproximateFileSize(log_path, file_size);
+    if (status.ok()) {
+      // max_log_size is measured in megabytes. Thus, we SHR the file_size 20 bits to
+      // convert it from bytes to megabytes.
+      max_log_file_exceeded |= (file_size >> 20) >= FLAGS_max_log_size;
+    } else {
+      LOG(ERROR) << "Failed to check log file size: " << status.GetDetail();
+      return false;
     }
   }
 
-  logging_initialized = true;
+  return max_log_file_exceeded;
+}
+
+void impala::ForceRotateLog() {
+  google::SetLogFilenameExtension(".cut");
+  google::SetLogFilenameExtension("");
+  LOG(INFO) << "INFO log rotated by Impala due to max_log_size exceeded.";
+  LOG(ERROR) << "ERROR log rotated by Impala due to max_log_size exceeded.";
+}
+
+bool impala::RedirectStdoutStderr() {
+  return FLAGS_redirect_stdout_stderr && !TestInfo::is_test();
 }
 
 void impala::GetFullLogFilename(google::LogSeverity severity, string* filename) {
diff --git a/be/src/common/logging.h b/be/src/common/logging.h
index bcb2389..7486f60 100644
--- a/be/src/common/logging.h
+++ b/be/src/common/logging.h
@@ -120,6 +120,21 @@ void LogCommandLineFlags();
 /// removes the oldest ones given an upper bound of number of logfiles to keep.
 void CheckAndRotateLogFiles(int max_log_files);
 
+/// Redirect stdout to INFO log and stderr to ERROR log.
+/// Needs to be done after InitGoogleLogging, to get the INFO/ERROR file paths.
+void AttachStdoutStderr();
+
+/// Check whether INFO or ERROR log size has exceed FLAGS_max_log_size.
+/// If error encountered during individual log size check, print error message to ERROR
+/// log and return false.
+bool CheckLogSize();
+
+/// Force glog to do the log rotation.
+void ForceRotateLog();
+
+/// Return true if FLAGS_redirect_stdout_stderr is true and TestInfo::is_test() is false.
+bool RedirectStdoutStderr();
+
 #endif // IR_COMPILE
 
 /// Prints v in base 10.
diff --git a/be/src/util/filesystem-util.cc b/be/src/util/filesystem-util.cc
index 89aa007..6b1e647 100644
--- a/be/src/util/filesystem-util.cc
+++ b/be/src/util/filesystem-util.cc
@@ -18,7 +18,6 @@
 #include <dirent.h>
 #include <errno.h>
 #include <fcntl.h>
-#include <limits.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
@@ -32,7 +31,6 @@
 
 #include <boost/filesystem.hpp>
 
-#include "common/logging.h"
 #include "common/status.h"
 #include "gen-cpp/ErrorCodes_types.h"
 #include "gutil/macros.h"
@@ -432,4 +430,22 @@ Status FileSystemUtil::CheckHolePunch(const string& path) {
   return Status::OK();
 }
 
+Status FileSystemUtil::ApproximateFileSize(
+    const std::string& path, uintmax_t& file_size) {
+  bool exist = false;
+  RETURN_IF_ERROR(PathExists(path, &exist));
+  if (!exist) {
+    return Status("Path does not exist!");
+  } else {
+    error_code errcode;
+    file_size = filesystem::file_size(path, errcode);
+    if (errcode != errc::success) {
+      return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
+          Substitute("Encountered exception while checking file size of path $0: $1",
+              path, errcode.message())));
+    }
+  }
+  return Status::OK();
+}
+
 } // namespace impala
diff --git a/be/src/util/filesystem-util.h b/be/src/util/filesystem-util.h
index a587712..e37a76f 100644
--- a/be/src/util/filesystem-util.h
+++ b/be/src/util/filesystem-util.h
@@ -108,6 +108,10 @@ class FileSystemUtil {
   /// Returns OK otherwise.
   static Status CheckHolePunch(const std::string& path);
 
+  /// Return the approximate file size of 'path' into output argument 'file_size', based
+  /// on what file system sees.
+  static Status ApproximateFileSize(const std::string& path, uintmax_t& file_size);
+
   class Directory {
    public:
     // Different types of entry in the directory
diff --git a/tests/custom_cluster/test_breakpad.py b/tests/custom_cluster/test_breakpad.py
index 4ed7b04..e567b65 100644
--- a/tests/custom_cluster/test_breakpad.py
+++ b/tests/custom_cluster/test_breakpad.py
@@ -144,6 +144,7 @@ class TestBreakpadBase(CustomClusterTestSuite):
     self.assert_impalad_log_contains('ERROR', 'Wrote minidump to ',
         expected_count=expected_count)
 
+
 class TestBreakpadCore(TestBreakpadBase):
   """Core tests to check that the breakpad integration into the daemons works as
   expected. This includes writing minidump when the daemons call abort(). Add tests here
@@ -379,3 +380,67 @@ class TestBreakpadExhaustive(TestBreakpadBase):
     reduced_minidump_size = self.trigger_single_minidump_and_get_size()
     # Check that the minidump file size has been reduced.
     assert reduced_minidump_size < full_minidump_size
+
+
+class TestLogging(TestBreakpadBase):
+  """Exhaustive tests to check that impala log is rolled periodically, obeying
+  max_log_size and max_log_files, even in the presence of heavy stderr writing.
+  """
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('These logging tests only run in exhaustive')
+    super(TestLogging, cls).setup_class()
+
+  def start_cluster_with_args(self, cluster_size, log_dir, **kwargs):
+    cluster_options = []
+    for daemon_arg in DAEMON_ARGS:
+      daemon_options = " ".join("-{0}={1}".format(k, v) for k, v in kwargs.iteritems())
+      cluster_options.append("--{0}={1}".format(daemon_arg, daemon_options))
+    self._start_impala_cluster(cluster_options, cluster_size=cluster_size,
+                               expected_num_impalads=cluster_size, impala_log_dir=log_dir)
+
+  def assert_logs(self, daemon, max_count, max_bytes, base_dir=None):
+    """Assert that there are at most 'max_count' of INFO + ERROR log files for the
+    specified daemon and the individual file size does not exceed 'max_bytes'."""
+    path = base_dir or self.tmp_dir
+    log_paths = glob.glob("%s/%s*log.ERROR.*" % (path, daemon)) \
+                + glob.glob("%s/%s*log.INFO.*" % (path, daemon))
+    assert len(log_paths) <= max_count
+    for path in log_paths:
+      try:
+        log_size = os.path.getsize(path)
+        assert log_size <= max_bytes, "{} exceed {} bytes".format(path, max_bytes)
+      except OSError:
+        # The daemon might delete the log in the middle of assertion.
+        # In that case, do nothing and move on.
+        pass
+
+  @pytest.mark.execute_serially
+  def test_excessive_cerr(self):
+    """Check that impalad log is kept being rotated when most writing activity is coming
+    from stderr stream.
+    Along with LogFaultInjectionThread in init.cc, this test will fill impalad error logs
+    with approximately 128kb error messages per second."""
+    test_cluster_size = 1
+    test_logbufsecs = 3
+    test_max_log_files = 2
+    test_max_log_size = 1  # 1 MB
+    test_error_msg = ('123456789abcde_' * 64)  # 1 KB error message
+    test_debug_actions = 'LOG_MAINTENANCE_STDERR:FAIL@1.0@' + test_error_msg
+    os.chmod(self.tmp_dir, 0744)
+
+    expected_log_max_bytes = int(1.2 * 1024**2)  # 1.2 MB
+    self.assert_logs('impalad', 0, expected_log_max_bytes)
+    self.start_cluster_with_args(test_cluster_size, self.tmp_dir,
+                                 logbufsecs=test_logbufsecs,
+                                 max_log_files=test_max_log_files,
+                                 max_log_size=test_max_log_size,
+                                 debug_actions=test_debug_actions)
+    self.wait_for_num_processes('impalad', test_cluster_size, 30)
+    expected_log_max_count = test_max_log_files * 2  # Both INFO and ERROR logs
+    # Wait for log maintenance thread to flush and rotate the logs asynchronously.
+    start = time.time()
+    while (time.time() - start < 40):
+      time.sleep(1)
+      self.assert_logs('impalad', expected_log_max_count, expected_log_max_bytes)