You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2022/03/07 16:15:27 UTC

[impala] branch master updated (7a28295 -> 7450c96)

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 7a28295  IMPALA-9433: Improved caching of HdfsFileHandles
     new 8eeb000  IMPALA-11152: Remove dependence on symlink when rotating logs
     new 7450c96  IMPALA-11133 (Addendum): Encode a string in utf8 before printing it

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/common/init.cc                 |  4 +-
 be/src/common/logging.cc              | 89 ++++++++++++++++++++++++++---------
 be/src/common/logging.h               |  6 +--
 be/src/util/filesystem-util.cc        |  3 +-
 bin/compare_branches.py               |  3 +-
 tests/custom_cluster/test_breakpad.py | 86 +++++++++++++++++++++++++--------
 6 files changed, 141 insertions(+), 50 deletions(-)

[impala] 01/02: IMPALA-11152: Remove dependence on symlink when rotating logs

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8eeb000c3515aa2c7dd671d7abe2269d4deffd08
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Tue Mar 1 23:13:05 2022 -0800

    IMPALA-11152: Remove dependence on symlink when rotating logs
    
    IMPALA-5256 implements log rotation by following the glog's symlink and
    checking the size of the pointed file. While this has been robust most
    of the time, there can be a rare situation where the symlink is missing.
    Glog itself does not guarantee that the symlink creation will always be
    successful. It won't retry symlink creation until the next rotation by
    glog. The side effect of this issue is that impala::CheckLogSize() will
    spam ERROR log every second for not finding the symlink.
    
    This patch removes the dependence on the glog symlink for this log
    rotation. We now directly specify the base file name of the targetted
    log kind and pick the latest log path. This patch also makes
    impala::CheckLogSize() less chatty by printing an error message for
    every FLAGS_logbufsecs (default is 30s).
    
    Testing:
    - Add test_breakpad.py::TestLogging::test_excessive_cerr_no_symlink.
    - Pass test_breakpad.py in exhaustive exploration.
    
    Change-Id: I30509e98038dbf9ca293144089f6ee92ce186a97
    Reviewed-on: http://gerrit.cloudera.org:8080/18286
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/init.cc                 |  4 +-
 be/src/common/logging.cc              | 89 ++++++++++++++++++++++++++---------
 be/src/common/logging.h               |  6 +--
 be/src/util/filesystem-util.cc        |  3 +-
 tests/custom_cluster/test_breakpad.py | 86 +++++++++++++++++++++++++--------
 5 files changed, 139 insertions(+), 49 deletions(-)

diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index b0470ad..5879f25 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -162,7 +162,7 @@ extern "C" { void __gcov_flush(); }
     sleep(sleep_duration);
 
     const int64_t now = MonotonicMillis();
-    bool max_log_file_exceeded = RedirectStdoutStderr() && impala::CheckLogSize();
+    bool max_log_file_exceeded = RedirectStdoutStderr() && impala::CheckLogSize(false);
     if ((now - last_flush) / 1000 < FLAGS_logbufsecs && !max_log_file_exceeded) {
       continue;
     }
@@ -171,7 +171,7 @@ extern "C" { void __gcov_flush(); }
 
     // Check log size again and force log rotation this time if they still big after
     // FlushLogFiles.
-    if (max_log_file_exceeded && impala::CheckLogSize()) impala::ForceRotateLog();
+    if (max_log_file_exceeded && impala::CheckLogSize(true)) impala::ForceRotateLog();
 
     // No need to rotate log files in tests.
     if (impala::TestInfo::is_test()) continue;
diff --git a/be/src/common/logging.cc b/be/src/common/logging.cc
index 7291858..2f68920 100644
--- a/be/src/common/logging.cc
+++ b/be/src/common/logging.cc
@@ -17,6 +17,7 @@
 
 #include "common/logging.h"
 
+#include <glob.h>
 #include <stdio.h>
 #include <fstream>
 #include <iomanip>
@@ -94,27 +95,60 @@ impala::Status ResolveLogSymlink(const string& symlink_path, string& canonical_p
   return impala::Status::OK();
 }
 
+impala::Status GetLatestCanonicalLogPath(
+    google::LogSeverity severity, std::string& log_path) {
+  log_path = "";
+  // We specifically target the base file name created by glog.
+  // Glog's default base file name follow this pattern:
+  // "<program name>.<hostname>.<user name>.log.<severity level>.<date>-<time>.<pid>"
+  string path_pattern = strings::Substitute("$0/$1*.log.$2.*.$3", FLAGS_log_dir,
+      FLAGS_log_filename, google::GetLogSeverityName(severity), getpid());
+  glob_t result;
+  int glob_ret = glob(path_pattern.c_str(), GLOB_TILDE, NULL, &result);
+  if (glob_ret != 0) {
+    string msg;
+    if (glob_ret == GLOB_NOMATCH) {
+      msg = Substitute("No match found for glob pattern $0", path_pattern);
+    } else if (glob_ret == GLOB_NOSPACE) {
+      msg = Substitute("Running out of memory for glob pattern $0", path_pattern);
+    } else if (glob_ret == GLOB_ABORTED) {
+      msg = Substitute("Read error for glob pattern $0", path_pattern);
+    } else {
+      msg = Substitute(
+          "glob failed in impala::GetLatestCanonicalLogPath on $0  with ret = $1",
+          path_pattern, glob_ret);
+    }
+    globfree(&result);
+    return impala::Status(impala::ErrorMsg(impala::TErrorCode::RUNTIME_ERROR, msg));
+  }
+
+  // Find the largest full file path which is also the latest.
+  for (size_t i = 0; i < result.gl_pathc; ++i) {
+    if (i == 0 || log_path.compare(result.gl_pathv[i]) < 0) {
+      log_path = result.gl_pathv[i];
+    }
+  }
+  globfree(&result);
+  return impala::Status::OK();
+}
+
 // The main implementation of AttachStdoutStderr().
 // Caller must hold lock over logging_mutex.
 impala::Status AttachStdoutStderrLocked() {
   // Redirect stdout to INFO log and stderr to ERROR log.
   // Needs to be done after InitGoogleLogging, to get the INFO/ERROR file paths.
-  string info_log_symlink_path, error_log_symlink_path;
-  impala::GetFullLogFilename(google::INFO, &info_log_symlink_path);
-  impala::GetFullLogFilename(google::ERROR, &error_log_symlink_path);
-
   string info_log_path, error_log_path;
-  RETURN_IF_ERROR(ResolveLogSymlink(info_log_symlink_path, info_log_path));
-  RETURN_IF_ERROR(ResolveLogSymlink(error_log_symlink_path, error_log_path));
+  RETURN_IF_ERROR(GetLatestCanonicalLogPath(google::INFO, info_log_path));
+  RETURN_IF_ERROR(GetLatestCanonicalLogPath(google::ERROR, error_log_path));
 
   bool info_log_rotated = last_info_log_path != info_log_path;
   bool error_log_rotated = last_error_log_path != error_log_path;
 
   if (info_log_rotated != error_log_rotated) {
-    // Since we're not holding glog's log_mutex when resolving the symlinks, it is
+    // Since we're not holding glog's log_mutex during log file lookup, it is
     // possible that glog rotates the INFO and ERROR log independently in between our
-    // symlink resolution. If this happens, log a warning and continue.
-    // Next AttachStdoutStderrLocked call should redirect the stream to the newer file.
+    // lookup. Log a warning and continue if this happens. The next
+    // AttachStdoutStderrLocked call should redirect the stream to the newer file.
     LOG(WARNING) << "INFO and ERROR log is not rotated at the same time "
                  << "(info_log_rotated=" << info_log_rotated << ", "
                  << "error_log_rotated=" << error_log_rotated << "). "
@@ -124,7 +158,7 @@ impala::Status AttachStdoutStderrLocked() {
   if (info_log_rotated) {
     // Print to stdout before redirecting so people looking for these logs in the standard
     // place know where to look.
-    cout << "Redirecting stdout to " << info_log_symlink_path << endl;
+    cout << "Redirecting stdout to " << info_log_path << endl;
     // TODO: how to handle these errors? Maybe abort the process?
     if (freopen(info_log_path.c_str(), "a", stdout) == NULL) {
       cout << "Could not redirect stdout: " << impala::GetStrErrMsg();
@@ -134,7 +168,7 @@ impala::Status AttachStdoutStderrLocked() {
 
   if (error_log_rotated) {
     // Similar to stdout, do the same thing for stderr.
-    cerr << "Redirecting stderr to " << error_log_symlink_path << endl;
+    cerr << "Redirecting stderr to " << error_log_path << endl;
     if (freopen(error_log_path.c_str(), "a", stderr) == NULL) {
       cerr << "Could not redirect stderr: " << impala::GetStrErrMsg();
     }
@@ -219,29 +253,37 @@ void impala::AttachStdoutStderr() {
   }
 }
 
-bool impala::CheckLogSize() {
+bool impala::CheckLogSize(bool log_error) {
   lock_guard<mutex> logging_lock(logging_mutex);
+  // FLAGS_max_log_size is measured in megabytes. Thus, we convert it to bytes.
+  uintmax_t max_log_size = FLAGS_max_log_size * 1024 * 1024;
   int log_to_check[2] = {google::INFO, google::ERROR};
   bool max_log_file_exceeded = false;
-  for (int log_level : log_to_check) {
+  Status status;
+  int log_level = log_to_check[0];
+  for (int i : log_to_check) {
     uintmax_t file_size = 0;
+    log_level = i;
     string log_path;
-    GetFullLogFilename(log_level, &log_path);
-    Status status = FileSystemUtil::ApproximateFileSize(log_path, file_size);
-    if (status.ok()) {
-      // max_log_size is measured in megabytes. Thus, we SHR the file_size 20 bits to
-      // convert it from bytes to megabytes.
-      max_log_file_exceeded |= (file_size >> 20) >= FLAGS_max_log_size;
-    } else {
-      LOG(ERROR) << "Failed to check log file size: " << status.GetDetail();
-      return false;
+    status = GetLatestCanonicalLogPath(log_level, log_path);
+    if (!status.ok()) break;
+    status = FileSystemUtil::ApproximateFileSize(log_path, file_size);
+    if (!status.ok()) break;
+    max_log_file_exceeded |= file_size > max_log_size;
+  }
+  if (!status.ok()) {
+    if (log_error) {
+      LOG(ERROR) << "Failed to check log file size for log level "
+                 << (log_level == google::INFO ? "INFO" : "ERROR") << ": "
+                 << status.GetDetail();
     }
+    return false;
   }
-
   return max_log_file_exceeded;
 }
 
 void impala::ForceRotateLog() {
+  lock_guard<mutex> logging_lock(logging_mutex);
   google::SetLogFilenameExtension(".cut");
   google::SetLogFilenameExtension("");
   LOG(INFO) << "INFO log rotated by Impala due to max_log_size exceeded.";
@@ -288,6 +330,7 @@ void impala::LogCommandLineFlags() {
 }
 
 void impala::CheckAndRotateLogFiles(int max_log_files) {
+  lock_guard<mutex> logging_lock(logging_mutex);
   // Ignore bad input or disable log rotation
   if (max_log_files <= 1) return;
   // Check log files for all severities
diff --git a/be/src/common/logging.h b/be/src/common/logging.h
index 7486f60..88d5505 100644
--- a/be/src/common/logging.h
+++ b/be/src/common/logging.h
@@ -125,9 +125,9 @@ void CheckAndRotateLogFiles(int max_log_files);
 void AttachStdoutStderr();
 
 /// Check whether INFO or ERROR log size has exceed FLAGS_max_log_size.
-/// If error encountered during individual log size check, print error message to ERROR
-/// log and return false.
-bool CheckLogSize();
+/// Return false if error encountered during individual log size check.
+/// 'log_error' controls whether to log any error occurrence to ERROR log or not.
+bool CheckLogSize(bool log_error);
 
 /// Force glog to do the log rotation.
 void ForceRotateLog();
diff --git a/be/src/util/filesystem-util.cc b/be/src/util/filesystem-util.cc
index 6b1e647..c5810ca 100644
--- a/be/src/util/filesystem-util.cc
+++ b/be/src/util/filesystem-util.cc
@@ -435,7 +435,8 @@ Status FileSystemUtil::ApproximateFileSize(
   bool exist = false;
   RETURN_IF_ERROR(PathExists(path, &exist));
   if (!exist) {
-    return Status("Path does not exist!");
+    return Status(
+        ErrorMsg(TErrorCode::RUNTIME_ERROR, Substitute("Path $0 does not exist!", path)));
   } else {
     error_code errcode;
     file_size = filesystem::file_size(path, errcode);
diff --git a/tests/custom_cluster/test_breakpad.py b/tests/custom_cluster/test_breakpad.py
index e567b65..c537c97 100644
--- a/tests/custom_cluster/test_breakpad.py
+++ b/tests/custom_cluster/test_breakpad.py
@@ -400,47 +400,93 @@ class TestLogging(TestBreakpadBase):
     self._start_impala_cluster(cluster_options, cluster_size=cluster_size,
                                expected_num_impalads=cluster_size, impala_log_dir=log_dir)
 
-  def assert_logs(self, daemon, max_count, max_bytes, base_dir=None):
+  def assert_logs(self, daemon, max_count, max_bytes):
     """Assert that there are at most 'max_count' of INFO + ERROR log files for the
-    specified daemon and the individual file size does not exceed 'max_bytes'."""
-    path = base_dir or self.tmp_dir
-    log_paths = glob.glob("%s/%s*log.ERROR.*" % (path, daemon)) \
-                + glob.glob("%s/%s*log.INFO.*" % (path, daemon))
+    specified daemon and the individual file size does not exceed 'max_bytes'.
+    Also assert that stdout/stderr are redirected to correct file on each rotation."""
+    log_dir = self.tmp_dir
+    log_paths = glob.glob("%s/%s*log.ERROR.*" % (log_dir, daemon)) \
+                + glob.glob("%s/%s*log.INFO.*" % (log_dir, daemon))
     assert len(log_paths) <= max_count
-    for path in log_paths:
-      try:
-        log_size = os.path.getsize(path)
-        assert log_size <= max_bytes, "{} exceed {} bytes".format(path, max_bytes)
-      except OSError:
-        # The daemon might delete the log in the middle of assertion.
-        # In that case, do nothing and move on.
-        pass
 
-  @pytest.mark.execute_serially
-  def test_excessive_cerr(self):
+    # group log_paths by pid and kind
+    log_group = {}
+    for path in sorted(log_paths):
+      tok = path.split('.')
+      key = tok[-1] + '.' + tok[-3]  # pid + kind
+      if key in log_group:
+        log_group[key].append(path)
+      else:
+        log_group[key] = [path]
+
+    for key, paths in log_group.items():
+      for i in range(0, len(paths)):
+        try:
+          curr_path = paths[i]
+          # check log size
+          log_size = os.path.getsize(curr_path)
+          assert log_size <= max_bytes, "{} exceed {} bytes".format(curr_path, max_bytes)
+
+          if i < len(paths) - 1:
+            # check that we print the next_path in last line of this log file
+            next_path = paths[i + 1]
+            with open(curr_path, 'rb') as f:
+              f.seek(-2, os.SEEK_END)
+              while f.read(1) != b'\n':
+                f.seek(-2, os.SEEK_CUR)
+              last_line = f.readline().decode()
+              assert next_path in last_line
+        except OSError:
+          # The daemon might delete the log in the middle of assertion.
+          # In that case, do nothing and move on.
+          pass
+
+  def silent_remove(self, filename):
+    try:
+      os.remove(filename)
+    except OSError:
+      pass
+
+  def start_excessive_cerr_cluster(self, test_cluster_size=1, remove_symlink=False):
     """Check that impalad log is kept being rotated when most writing activity is coming
     from stderr stream.
     Along with LogFaultInjectionThread in init.cc, this test will fill impalad error logs
     with approximately 128kb error messages per second."""
-    test_cluster_size = 1
     test_logbufsecs = 3
     test_max_log_files = 2
     test_max_log_size = 1  # 1 MB
     test_error_msg = ('123456789abcde_' * 64)  # 1 KB error message
     test_debug_actions = 'LOG_MAINTENANCE_STDERR:FAIL@1.0@' + test_error_msg
+    daemon = 'impalad'
     os.chmod(self.tmp_dir, 0744)
 
     expected_log_max_bytes = int(1.2 * 1024**2)  # 1.2 MB
-    self.assert_logs('impalad', 0, expected_log_max_bytes)
+    self.assert_logs(daemon, 0, expected_log_max_bytes)
     self.start_cluster_with_args(test_cluster_size, self.tmp_dir,
                                  logbufsecs=test_logbufsecs,
                                  max_log_files=test_max_log_files,
                                  max_log_size=test_max_log_size,
                                  debug_actions=test_debug_actions)
-    self.wait_for_num_processes('impalad', test_cluster_size, 30)
-    expected_log_max_count = test_max_log_files * 2  # Both INFO and ERROR logs
+    self.wait_for_num_processes(daemon, test_cluster_size, 30)
+    # Count both INFO and ERROR logs
+    expected_log_max_count = test_max_log_files * test_cluster_size * 2
     # Wait for log maintenance thread to flush and rotate the logs asynchronously.
     start = time.time()
     while (time.time() - start < 40):
       time.sleep(1)
-      self.assert_logs('impalad', expected_log_max_count, expected_log_max_bytes)
+      self.assert_logs(daemon, expected_log_max_count, expected_log_max_bytes)
+      if (remove_symlink):
+        pattern = self.tmp_dir + '/' + daemon + '*.'
+        symlinks = glob.glob(pattern + '.INFO') + glob.glob(pattern + '.ERROR')
+        for symlink in symlinks:
+          self.silent_remove(symlink)
+
+  @pytest.mark.execute_serially
+  def test_excessive_cerr(self):
+    """Test excessive cerr activity with single node cluster."""
+    self.start_excessive_cerr_cluster()
+
+  @pytest.mark.execute_serially
+  def test_excessive_cerr_no_symlink(self):
+    """Test excessive cerr activity with two node cluster and missing log symlinks."""
+    self.start_excessive_cerr_cluster(2, True)

[impala] 02/02: IMPALA-11133 (Addendum): Encode a string in utf8 before printing it

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7450c96a76d81e21b2758d0bc5dcb673a6198d65
Author: Fang-Yu Rao <fa...@cloudera.com>
AuthorDate: Wed Feb 23 15:53:39 2022 -0800

    IMPALA-11133 (Addendum): Encode a string in utf8 before printing it
    
    In the first part of this patch, we decoded a string with 'utf8' in
    order to print it (on the command line) since the author field of a
    commit could contain non-ASCII characters.
    
    However, we did not take into consideration that in some scenarios,
    we would like to redirect the output to another file. If this is the
    case, then we may encounter a UnicodeEncodeError due to
    sys.stdout.encoding being None. To resolve the issue, we encode the
    formatted string with 'utf8'.
    
    Testing:
     - Manually verified that we won't get a UnicodeEncodeError if we
       redirect the output to another file.
    
    Change-Id: Iad9b1fb0a523e219bc9f40a57ff7335808be283f
    Reviewed-on: http://gerrit.cloudera.org:8080/18270
    Reviewed-by: Quanlong Huang <hu...@gmail.com>
    Tested-by: Quanlong Huang <hu...@gmail.com>
---
 bin/compare_branches.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/bin/compare_branches.py b/bin/compare_branches.py
index 4027974..da1d5f8 100755
--- a/bin/compare_branches.py
+++ b/bin/compare_branches.py
@@ -268,7 +268,8 @@ def main():
                    .format(commit_hash, options.source_branch, options.target_branch))
     if not change_in_target and not ignore_by_config and not ignore_by_commit_message:
       print u'{0} {1} ({2}) - {3}'\
-          .format(commit_hash, msg.decode('utf8'), date, author.decode('utf8'))
+          .format(commit_hash, msg.decode('utf8'), date, author.decode('utf8'))\
+          .encode('utf8')
       cherry_pick_hashes.append(commit_hash)
       jira_keys += jira_key_pat.findall(msg)