You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/07/23 13:08:52 UTC
[incubator-doris] branch master updated: [Feature][ThreadPool]Add
Web Page to display thread's stats (#4110)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 443b8f1 [Feature][ThreadPool]Add Web Page to display thread's stats (#4110)
443b8f1 is described below
commit 443b8f100bf17b8fd41d3b3a6a7adb2a63016a0f
Author: WangCong <10...@qq.com>
AuthorDate: Thu Jul 23 21:08:36 2020 +0800
[Feature][ThreadPool]Add Web Page to display thread's stats (#4110)
This CL mainly includes:
- add some methods to get thread's stats from Linux's system file in
env.
- support get thread's stats by http method.
- register page handle in BE to show thread's stats to help developer
position some thread relate problem.
---
be/src/env/env.h | 63 ++++++-------
be/src/env/env_util.cpp | 61 ++++++++++++-
be/src/env/env_util.h | 20 +++--
be/src/http/default_path_handlers.cpp | 6 +-
be/src/http/web_page_handler.cpp | 5 +-
be/src/util/CMakeLists.txt | 3 +-
be/src/util/os_util.cpp | 165 ++++++++++++++++++++++++++++++++++
be/src/util/os_util.h | 68 ++++++++++++++
be/src/util/thread.cpp | 135 +++++++++++++++++++++++-----
be/src/util/thread.h | 36 ++++----
be/src/util/url_coding.cpp | 75 +++++++---------
be/src/util/url_coding.h | 8 +-
webroot/be/threadz.mustache | 68 ++++++++++++++
13 files changed, 570 insertions(+), 143 deletions(-)
diff --git a/be/src/env/env.h b/be/src/env/env.h
index 5ad360b..8cbaaf7 100644
--- a/be/src/env/env.h
+++ b/be/src/env/env.h
@@ -9,8 +9,8 @@
#pragma once
-#include <string>
#include <memory>
+#include <string>
#include "common/status.h"
#include "util/slice.h"
@@ -35,15 +35,15 @@ public:
// CREATE_OR_OPEN | opens | creates
// MUST_CREATE | fails | creates
// MUST_EXIST | opens | fails
- enum OpenMode {
- CREATE_OR_OPEN_WITH_TRUNCATE,
- CREATE_OR_OPEN,
- MUST_CREATE,
- MUST_EXIST
+ enum OpenMode {
+ CREATE_OR_OPEN_WITH_TRUNCATE,
+ CREATE_OR_OPEN,
+ MUST_CREATE,
+ MUST_EXIST
};
- Env() { }
- virtual ~Env() { }
+ Env() {}
+ virtual ~Env() {}
// Return a default environment suitable for the current operating
// system. Sophisticated users may wish to provide their own Env
@@ -85,8 +85,7 @@ public:
// Like the previous new_writable_file, but allows options to be
// specified.
- virtual Status new_writable_file(const WritableFileOptions& opts,
- const std::string& fname,
+ virtual Status new_writable_file(const WritableFileOptions& opts, const std::string& fname,
std::unique_ptr<WritableFile>* result) = 0;
// Creates a new readable and writable file. If a file with the same name
@@ -98,8 +97,7 @@ public:
std::unique_ptr<RandomRWFile>* result) = 0;
// Like the previous new_random_rw_file, but allows options to be specified.
- virtual Status new_random_rw_file(const RandomRWFileOptions& opts,
- const std::string& fname,
+ virtual Status new_random_rw_file(const RandomRWFileOptions& opts, const std::string& fname,
std::unique_ptr<RandomRWFile>* result) = 0;
// Returns OK if the path exists.
@@ -116,8 +114,7 @@ public:
// NotFound if "dir" does not exist, the calling process does not have
// permission to access "dir", or if "dir" is invalid.
// IOError if an IO Error was encountered
- virtual Status get_children(const std::string& dir,
- std::vector<std::string>* result) = 0;
+ virtual Status get_children(const std::string& dir, std::vector<std::string>* result) = 0;
// Iterate the specified directory and call given callback function with child's
// name. This function continues execution until all children have been iterated
@@ -168,19 +165,16 @@ public:
virtual Status get_file_size(const std::string& fname, uint64_t* size) = 0;
// Store the last modification time of fname in *file_mtime.
- virtual Status get_file_modified_time(const std::string& fname,
- uint64_t* file_mtime) = 0;
+ virtual Status get_file_modified_time(const std::string& fname, uint64_t* file_mtime) = 0;
// Rename file src to target.
- virtual Status rename_file(const std::string& src,
- const std::string& target) = 0;
+ virtual Status rename_file(const std::string& src, const std::string& target) = 0;
// create a hard-link
- virtual Status link_file(const std::string& /*old_path*/,
- const std::string& /*new_path*/) = 0;
+ virtual Status link_file(const std::string& /*old_path*/, const std::string& /*new_path*/) = 0;
};
struct RandomAccessFileOptions {
- RandomAccessFileOptions() { }
+ RandomAccessFileOptions() {}
};
// Creation-time options for WritableFile
@@ -202,8 +196,8 @@ struct RandomRWFileOptions {
// A file abstraction for reading sequentially through a file
class SequentialFile {
public:
- SequentialFile() { }
- virtual ~SequentialFile() { }
+ SequentialFile() {}
+ virtual ~SequentialFile() {}
// Read up to "result.size" bytes from the file.
// Sets "result.data" to the data that was read.
@@ -229,8 +223,8 @@ public:
class RandomAccessFile {
public:
- RandomAccessFile() { }
- virtual ~RandomAccessFile() { }
+ RandomAccessFile() {}
+ virtual ~RandomAccessFile() {}
// Read "result.size" bytes from the file starting at "offset".
// Copies the resulting data into "result.data".
@@ -271,13 +265,13 @@ public:
// one of Append or PositionedAppend. We support only Append here.
class WritableFile {
public:
- enum FlushMode {
- FLUSH_SYNC,
- FLUSH_ASYNC
+ enum FlushMode {
+ FLUSH_SYNC,
+ FLUSH_ASYNC
};
- WritableFile() { }
- virtual ~WritableFile() { }
+ WritableFile() {}
+ virtual ~WritableFile() {}
// Append data to the end of the file
virtual Status append(const Slice& data) = 0;
@@ -325,12 +319,9 @@ private:
// A file abstraction for random reading and writing.
class RandomRWFile {
public:
- enum FlushMode {
- FLUSH_SYNC,
- FLUSH_ASYNC
- };
+ enum FlushMode { FLUSH_SYNC, FLUSH_ASYNC };
RandomRWFile() {}
- virtual ~RandomRWFile() { }
+ virtual ~RandomRWFile() {}
virtual Status read_at(uint64_t offset, const Slice& result) const = 0;
@@ -350,4 +341,4 @@ public:
virtual const std::string& filename() const = 0;
};
-}
+} // namespace doris
diff --git a/be/src/env/env_util.cpp b/be/src/env/env_util.cpp
index 07bf874..b383439 100644
--- a/be/src/env/env_util.cpp
+++ b/be/src/env/env_util.cpp
@@ -18,6 +18,7 @@
#include "env/env_util.h"
#include "env/env.h"
+#include "util/faststring.h"
using std::shared_ptr;
using std::string;
@@ -30,21 +31,73 @@ Status open_file_for_write(Env* env, const string& path, shared_ptr<WritableFile
return open_file_for_write(WritableFileOptions(), env, path, file);
}
-Status open_file_for_write(const WritableFileOptions& opts,
- Env *env, const string &path,
- shared_ptr<WritableFile> *file) {
+Status open_file_for_write(const WritableFileOptions& opts, Env* env, const string& path,
+ shared_ptr<WritableFile>* file) {
unique_ptr<WritableFile> w;
RETURN_IF_ERROR(env->new_writable_file(opts, path, &w));
file->reset(w.release());
return Status::OK();
}
-Status open_file_for_random(Env *env, const string &path, shared_ptr<RandomAccessFile> *file) {
+Status open_file_for_random(Env* env, const string& path, shared_ptr<RandomAccessFile>* file) {
unique_ptr<RandomAccessFile> r;
RETURN_IF_ERROR(env->new_random_access_file(path, &r));
file->reset(r.release());
return Status::OK();
}
+static Status do_write_string_to_file(Env* env, const Slice& data, const std::string& fname,
+ bool should_sync) {
+ unique_ptr<WritableFile> file;
+ Status s = env->new_writable_file(fname, &file);
+ if (!s.ok()) {
+ return s;
+ }
+ s = file->append(data);
+ if (s.ok() && should_sync) {
+ s = file->sync();
+ }
+ if (s.ok()) {
+ s = file->close();
+ }
+ file.reset(); // Will auto-close if we did not close above
+ if (!s.ok()) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(env->delete_file(fname),
+ "Failed to delete partially-written file " + fname);
+ }
+ return s;
+}
+
+Status write_string_to_file(Env* env, const Slice& data, const std::string& fname) {
+ return do_write_string_to_file(env, data, fname, false);
+}
+
+Status write_string_to_file_sync(Env* env, const Slice& data, const std::string& fname) {
+ return do_write_string_to_file(env, data, fname, true);
+}
+
+Status read_file_to_string(Env* env, const std::string& fname, faststring* data) {
+ data->clear();
+ unique_ptr<SequentialFile> file;
+ Status s = env->new_sequential_file(fname, &file);
+ if (!s.ok()) {
+ return s;
+ }
+ static const int kBufferSize = 8192;
+ unique_ptr<uint8_t[]> scratch(new uint8_t[kBufferSize]);
+ while (true) {
+ Slice fragment(scratch.get(), kBufferSize);
+ s = file->read(&fragment);
+ if (!s.ok()) {
+ break;
+ }
+ data->append(fragment.get_data(), fragment.get_size());
+ if (fragment.empty()) {
+ break;
+ }
+ }
+ return s;
+}
+
} // namespace env_util
} // namespace doris
diff --git a/be/src/env/env_util.h b/be/src/env/env_util.h
index aa2af49..70ea526 100644
--- a/be/src/env/env_util.h
+++ b/be/src/env/env_util.h
@@ -20,6 +20,7 @@
#include <string>
#include "common/status.h"
+#include "env.h"
namespace doris {
@@ -30,14 +31,21 @@ struct WritableFileOptions;
namespace env_util {
-Status open_file_for_write(Env *env, const std::string& path, std::shared_ptr<WritableFile> *file);
+Status open_file_for_write(Env* env, const std::string& path, std::shared_ptr<WritableFile>* file);
-Status open_file_for_write(const WritableFileOptions& opts, Env *env,
- const std::string& path, std::shared_ptr<WritableFile> *file);
+Status open_file_for_write(const WritableFileOptions& opts, Env* env, const std::string& path,
+ std::shared_ptr<WritableFile>* file);
-Status open_file_for_random(Env *env, const std::string& path,
- std::shared_ptr<RandomAccessFile> *file);
+Status open_file_for_random(Env* env, const std::string& path,
+ std::shared_ptr<RandomAccessFile>* file);
+
+// A utility routine: write "data" to the named file.
+Status write_string_to_file(Env* env, const Slice& data, const std::string& fname);
+// Like above but also fsyncs the new file.
+Status write_string_to_file_sync(Env* env, const Slice& data, const std::string& fname);
+
+// A utility routine: read contents of named file into *data
+Status read_file_to_string(Env* env, const std::string& fname, faststring* data);
} // namespace env_util
} // namespace doris
-
diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp
index 0ebbbc0..dd85400 100644
--- a/be/src/http/default_path_handlers.cpp
+++ b/be/src/http/default_path_handlers.cpp
@@ -18,20 +18,17 @@
#include "http/default_path_handlers.h"
#include <gperftools/malloc_extension.h>
-#include <sys/stat.h>
#include <boost/algorithm/string.hpp>
#include <boost/bind.hpp>
-#include <fstream>
#include <sstream>
#include "common/configbase.h"
-#include "common/logging.h"
#include "http/web_page_handler.h"
#include "runtime/mem_tracker.h"
#include "util/debug_util.h"
-#include "util/logging.h"
#include "util/pretty_printer.h"
+#include "util/thread.h"
namespace doris {
@@ -109,6 +106,7 @@ void add_default_path_handlers(WebPageHandler* web_page_handler, MemTracker* pro
web_page_handler->register_page("/varz", "Configs", config_handler, true /* is_on_nav_bar */);
web_page_handler->register_page("/memz", "Memory",
boost::bind<void>(&mem_usage_handler, process_mem_tracker, _1, _2), true /* is_on_nav_bar */);
+ register_thread_display_page(web_page_handler);
}
} // namespace doris
diff --git a/be/src/http/web_page_handler.cpp b/be/src/http/web_page_handler.cpp
index 788b028..f9a4427 100644
--- a/be/src/http/web_page_handler.cpp
+++ b/be/src/http/web_page_handler.cpp
@@ -17,8 +17,7 @@
#include "http/web_page_handler.h"
-#include <boost/bind.hpp>
-#include <boost/mem_fn.hpp>
+#include <functional>
#include "common/config.h"
#include "env/env.h"
@@ -51,7 +50,7 @@ WebPageHandler::WebPageHandler(EvHttpServer* server) : _http_server(server) {
_http_server->register_static_file_handler(this);
TemplatePageHandlerCallback root_callback =
- boost::bind<void>(boost::mem_fn(&WebPageHandler::root_handler), this, _1, _2);
+ std::bind<void>(std::mem_fn(&WebPageHandler::root_handler), this, std::placeholders::_1, std::placeholders::_2);
register_template_page("/", "Home", root_callback, false /* is_on_nav_bar */);
}
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index e9ee89b..5107d7a 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -70,7 +70,8 @@ set(UTIL_FILES
null_load_error_hub.cpp
time.cpp
os_info.cpp
-# coding_util.cpp
+ os_util.cpp
+ # coding_util.cpp
cidr.cpp
core_local.cpp
uid_util.cpp
diff --git a/be/src/util/os_util.cpp b/be/src/util/os_util.cpp
new file mode 100644
index 0000000..15aed51
--- /dev/null
+++ b/be/src/util/os_util.cpp
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/os_util.h"
+
+#include <fcntl.h>
+#include <sys/resource.h>
+#include <unistd.h>
+
+#include <cstddef>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "env/env_util.h"
+#include "gutil/macros.h"
+#include "gutil/strings/numbers.h"
+#include "gutil/strings/split.h"
+#include "gutil/strings/stringpiece.h"
+#include "gutil/strings/substitute.h"
+#include "gutil/strings/util.h"
+#include "util/faststring.h"
+
+using std::string;
+using std::vector;
+using strings::Split;
+using strings::Substitute;
+
+namespace doris {
+
+// Ensure that Impala compiles on earlier kernels. If the target kernel does not support
+// _SC_CLK_TCK, sysconf(_SC_CLK_TCK) will return -1.
+#ifndef _SC_CLK_TCK
+#define _SC_CLK_TCK 2
+#endif
+
+static const int64_t kTicksPerSec = sysconf(_SC_CLK_TCK);
+
+// Offsets into the ../stat file array of per-thread statistics.
+//
+// They are themselves offset by two because the pid and comm fields of the
+// file are parsed separately.
+static const int64_t kUserTicks = 13 - 2;
+static const int64_t kKernelTicks = 14 - 2;
+static const int64_t kIoWait = 41 - 2;
+
+// Largest offset we are interested in, to check we get a well formed stat file.
+static const int64_t kMaxOffset = kIoWait;
+
+Status parse_stat(const std::string& buffer, std::string* name, ThreadStats* stats) {
+ DCHECK(stats != nullptr);
+
+ // The thread name should be the only field with parentheses. But the name
+ // itself may contain parentheses.
+ size_t open_paren = buffer.find('(');
+ size_t close_paren = buffer.rfind(')');
+ if (open_paren == string::npos || // '(' must exist
+ close_paren == string::npos || // ')' must exist
+ open_paren >= close_paren || // '(' must come before ')'
+ close_paren + 2 == buffer.size()) { // there must be at least two chars after ')'
+ return Status::IOError("Unrecognised /proc format");
+ }
+ string extracted_name = buffer.substr(open_paren + 1, close_paren - (open_paren + 1));
+ string rest = buffer.substr(close_paren + 2);
+ vector<string> splits = Split(rest, " ", strings::SkipEmpty());
+ if (splits.size() < kMaxOffset) {
+ return Status::IOError("Unrecognised /proc format");
+ }
+
+ int64_t tmp;
+ if (safe_strto64(splits[kUserTicks], &tmp)) {
+ stats->user_ns = tmp * (1e9 / kTicksPerSec);
+ }
+ if (safe_strto64(splits[kKernelTicks], &tmp)) {
+ stats->kernel_ns = tmp * (1e9 / kTicksPerSec);
+ }
+ if (safe_strto64(splits[kIoWait], &tmp)) {
+ stats->iowait_ns = tmp * (1e9 / kTicksPerSec);
+ }
+ if (name != nullptr) {
+ *name = extracted_name;
+ }
+ return Status::OK();
+}
+
+Status get_thread_stats(int64_t tid, ThreadStats* stats) {
+ DCHECK(stats != nullptr);
+ if (kTicksPerSec <= 0) {
+ return Status::NotSupported("ThreadStats not supported");
+ }
+ faststring buf;
+ RETURN_IF_ERROR(env_util::read_file_to_string(
+ Env::Default(), Substitute("/proc/self/task/$0/stat", tid), &buf));
+
+ return parse_stat(buf.ToString(), nullptr, stats);
+}
+void disable_core_dumps() {
+ struct rlimit lim;
+ PCHECK(getrlimit(RLIMIT_CORE, &lim) == 0);
+ lim.rlim_cur = 0;
+ PCHECK(setrlimit(RLIMIT_CORE, &lim) == 0);
+
+ // Set coredump_filter to not dump any parts of the address space.
+ // Although the above disables core dumps to files, if core_pattern
+ // is set to a pipe rather than a file, it's not sufficient. Setting
+ // this pattern results in piping a very minimal dump into the core
+ // processor (eg abrtd), thus speeding up the crash.
+ int f;
+ RETRY_ON_EINTR(f, open("/proc/self/coredump_filter", O_WRONLY));
+ if (f >= 0) {
+ ssize_t ret;
+ RETRY_ON_EINTR(ret, write(f, "00000000", 8));
+ int close_ret;
+ RETRY_ON_EINTR(close_ret, close(f));
+ }
+}
+
+bool is_being_debugged() {
+#ifndef __linux__
+ return false;
+#else
+ // Look for the TracerPid line in /proc/self/status.
+ // If this is non-zero, we are being ptraced, which is indicative of gdb or strace
+ // being attached.
+ faststring buf;
+ Status s = env_util::read_file_to_string(Env::Default(), "/proc/self/status", &buf);
+ if (!s.ok()) {
+ LOG(WARNING) << "could not read /proc/self/status: " << s.to_string();
+ return false;
+ }
+ StringPiece buf_sp(reinterpret_cast<const char*>(buf.data()), buf.size());
+ vector<StringPiece> lines = Split(buf_sp, "\n");
+ for (const auto& l : lines) {
+ if (!HasPrefixString(l, "TracerPid:")) continue;
+ std::pair<StringPiece, StringPiece> key_val = Split(l, "\t");
+ int64_t tracer_pid = -1;
+ if (!safe_strto64(key_val.second.data(), key_val.second.size(), &tracer_pid)) {
+ LOG(WARNING) << "Invalid line in /proc/self/status: " << l;
+ return false;
+ }
+ return tracer_pid != 0;
+ }
+ LOG(WARNING) << "Could not find TracerPid line in /proc/self/status";
+ return false;
+#endif // __linux__
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/util/os_util.h b/be/src/util/os_util.h
new file mode 100644
index 0000000..7e0f514
--- /dev/null
+++ b/be/src/util/os_util.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_UTIL_OS_UTIL_H
+#define DORIS_BE_UTIL_OS_UTIL_H
+
+#include <cstdint>
+#include <string>
+#include <type_traits>
+
+#include "common/status.h"
+#include "env/env.h"
+
+namespace doris {
+
+// Utility methods to read interesting values from /proc.
+// TODO: Get stats for parent process.
+
+// Container struct for statistics read from the /proc filesystem for a thread.
+struct ThreadStats {
+ int64_t user_ns;
+ int64_t kernel_ns;
+ int64_t iowait_ns;
+
+ // Default constructor zeroes all members in case structure can't be filled by
+ // GetThreadStats.
+ ThreadStats() : user_ns(0), kernel_ns(0), iowait_ns(0) {}
+};
+
+// Populates ThreadStats object using a given buffer. The buffer is expected to
+// conform to /proc/<pid>/task/<tid>/stat layout; an error will be returned otherwise.
+//
+// If 'name' is supplied, the extracted thread name will be written to it.
+Status parse_stat(const std::string& buffer, std::string* name, ThreadStats* stats);
+
+// Populates ThreadStats object for a given thread by reading from
+// /proc/<pid>/task/<tid>/stat. Returns OK unless the file cannot be read or is in an
+// unrecognised format, or if the kernel version is not modern enough.
+Status get_thread_stats(int64_t tid, ThreadStats* stats);
+
+// Disable core dumps for this process.
+//
+// This is useful particularly in tests where we have injected failures and don't
+// want to generate a core dump from an "expected" crash.
+void disable_core_dumps();
+
+// Return true if this process appears to be running under a debugger or strace.
+//
+// This may return false on unsupported (non-Linux) platforms.
+bool is_being_debugged();
+
+} // namespace doris
+
+#endif
diff --git a/be/src/util/thread.cpp b/be/src/util/thread.cpp
index c4b5493..128896d 100644
--- a/be/src/util/thread.cpp
+++ b/be/src/util/thread.cpp
@@ -17,23 +17,29 @@
#include "thread.h"
+#include <sys/prctl.h>
+#include <sys/types.h>
#include <unistd.h>
+
#include <cstring>
#include <limits>
#include <map>
#include <memory>
#include <string>
-#include <sys/prctl.h>
-#include <sys/types.h>
+#include <functional>
#include "common/logging.h"
#include "gutil/atomicops.h"
-#include "gutil/once.h"
#include "gutil/dynamic_annotations.h"
+#include "gutil/map-util.h"
+#include "gutil/once.h"
#include "gutil/strings/substitute.h"
#include "olap/olap_define.h"
+#include "util/easy_json.h"
#include "util/mutex.h"
+#include "util/os_util.h"
#include "util/scoped_cleanup.h"
+#include "util/url_coding.h"
namespace doris {
@@ -55,9 +61,7 @@ static GoogleOnceType once = GOOGLE_ONCE_INIT;
// auditing. Used only by Thread.
class ThreadMgr {
public:
- ThreadMgr()
- : _threads_started_metric(0),
- _threads_running_metric(0) {}
+ ThreadMgr() : _threads_started_metric(0), _threads_running_metric(0) {}
~ThreadMgr() {
MutexLock lock(&_lock);
@@ -74,18 +78,17 @@ public:
// already been removed, this is a no-op.
void remove_thread(const pthread_t& pthread_id, const std::string& category);
-private:
+ void display_thread_callback(const WebPageHandler::ArgumentMap& args, EasyJson* ej) const;
+private:
// Container class for any details we want to capture about a thread
// TODO: Add start-time.
// TODO: Track fragment ID.
class ThreadDescriptor {
public:
- ThreadDescriptor() { }
+ ThreadDescriptor() {}
ThreadDescriptor(std::string category, std::string name, int64_t thread_id)
- : _name(std::move(name)),
- _category(std::move(category)),
- _thread_id(thread_id) {}
+ : _name(std::move(name)), _category(std::move(category)), _thread_id(thread_id) {}
const std::string& name() const { return _name; }
const std::string& category() const { return _category; }
@@ -97,6 +100,8 @@ private:
int64_t _thread_id;
};
+ void summarize_thread_descriptor(const ThreadDescriptor& desc, EasyJson* ej) const;
+
// A ThreadCategory is a set of threads that are logically related.
// TODO: unordered_map is incompatible with pthread_t, but would be more
// efficient here.
@@ -106,7 +111,7 @@ private:
typedef std::map<std::string, ThreadCategory> ThreadCategoryMap;
// Protects _thread_categories and thread metrics.
- Mutex _lock;
+ mutable Mutex _lock;
// All thread categorys that ever contained a thread, even if empty
ThreadCategoryMap _thread_categories;
@@ -121,7 +126,7 @@ private:
void ThreadMgr::set_thread_name(const std::string& name, int64_t tid) {
if (tid == getpid()) {
- return ;
+ return;
}
int err = prctl(PR_SET_NAME, name.c_str());
if (err < 0 && errno != EPERM) {
@@ -169,6 +174,81 @@ void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& ca
ANNOTATE_IGNORE_READS_AND_WRITES_END();
}
+void ThreadMgr::display_thread_callback(const WebPageHandler::ArgumentMap& args, EasyJson* ej) const {
+ const auto* category_name = FindOrNull(args, "group");
+ if (category_name) {
+ bool requested_all = (*category_name == "all");
+ ej->Set("requested_thread_group", EasyJson::kObject);
+ (*ej)["group_name"] = escape_for_html_to_string(*category_name);
+ (*ej)["requested_all"] = requested_all;
+
+ // The critical section is as short as possible so as to minimize the delay
+ // imposed on new threads that acquire the lock in write mode.
+ vector<ThreadDescriptor> descriptors_to_print;
+ if (!requested_all) {
+ MutexLock l(&_lock);
+ const auto* category = FindOrNull(_thread_categories, *category_name);
+ if (!category) {
+ return;
+ }
+ for (const auto& elem : *category) {
+ descriptors_to_print.emplace_back(elem.second);
+ }
+ } else {
+ MutexLock l(&_lock);
+ for (const auto& category : _thread_categories) {
+ for (const auto& elem : category.second) {
+ descriptors_to_print.emplace_back(elem.second);
+ }
+ }
+ }
+
+ EasyJson found = (*ej).Set("found", EasyJson::kObject);
+ EasyJson threads = found.Set("threads", EasyJson::kArray);
+ for (const auto& desc : descriptors_to_print) {
+ summarize_thread_descriptor(desc, &threads);
+ }
+ } else {
+ // List all thread groups and the number of threads running in each.
+ vector<pair<string, uint64_t>> thread_categories_info;
+ uint64_t running;
+ {
+ MutexLock l(&_lock);
+ running = _threads_running_metric;
+ thread_categories_info.reserve(_thread_categories.size());
+ for (const auto& category : _thread_categories) {
+ thread_categories_info.emplace_back(category.first, category.second.size());
+ }
+
+ (*ej)["total_threads_running"] = running;
+ EasyJson groups = ej->Set("groups", EasyJson::kArray);
+ for (const auto& elem : thread_categories_info) {
+ string category_arg;
+ url_encode(elem.first, &category_arg);
+ EasyJson group = groups.PushBack(EasyJson::kObject);
+ group["encoded_group_name"] = category_arg;
+ group["group_name"] = elem.first;
+ group["threads_running"] = elem.second;
+ }
+ }
+ }
+}
+
+void ThreadMgr::summarize_thread_descriptor(const ThreadMgr::ThreadDescriptor& desc,
+ EasyJson* ej) const {
+ ThreadStats stats;
+ Status status = get_thread_stats(desc.thread_id(), &stats);
+ if (!status.ok()) {
+ LOG(WARNING) << "Could not get per-thread statistics: " << status.to_string();
+ }
+
+ EasyJson thread = ej->PushBack(EasyJson::kObject);
+ thread["thread_name"] = desc.name();
+ thread["user_sec"] = static_cast<double>(stats.user_ns) / 1e9;
+ thread["kernel_sec"] = static_cast<double>(stats.kernel_ns) / 1e9;
+ thread["iowait_sec"] = static_cast<double>(stats.iowait_ns) / 1e9;
+}
+
Thread::~Thread() {
if (_joinable) {
int ret = pthread_detach(_thread);
@@ -201,7 +281,8 @@ const std::string& Thread::category() const {
}
std::string Thread::to_string() const {
- return strings::Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), _name, _category);
+ return strings::Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), _name,
+ _category);
}
Thread* Thread::current_thread() {
@@ -210,7 +291,7 @@ Thread* Thread::current_thread() {
int64_t Thread::unique_thread_id() {
return static_cast<int64_t>(pthread_self());
-}
+}
int64_t Thread::current_thread_id() {
return syscall(SYS_gettid);
@@ -268,7 +349,7 @@ Status Thread::start_thread(const std::string& category, const std::string& name
t->_joinable = true;
cleanup.cancel();
- VLOG(3) << "Started thread " << t->tid()<< " - " << category << ":" << name;
+ VLOG(3) << "Started thread " << t->tid() << " - " << category << ":" << name;
return Status::OK();
}
@@ -331,10 +412,10 @@ void Thread::init_threadmgr() {
}
ThreadJoiner::ThreadJoiner(Thread* thr)
- : _thread(CHECK_NOTNULL(thr)),
- _warn_after_ms(kDefaultWarnAfterMs),
- _warn_every_ms(kDefaultWarnEveryMs),
- _give_up_after_ms(kDefaultGiveUpAfterMs) {}
+ : _thread(CHECK_NOTNULL(thr)),
+ _warn_after_ms(kDefaultWarnAfterMs),
+ _warn_every_ms(kDefaultWarnEveryMs),
+ _give_up_after_ms(kDefaultGiveUpAfterMs) {}
ThreadJoiner& ThreadJoiner::warn_after_ms(int ms) {
_warn_after_ms = ms;
@@ -352,8 +433,7 @@ ThreadJoiner& ThreadJoiner::give_up_after_ms(int ms) {
}
Status ThreadJoiner::join() {
- if (Thread::current_thread() &&
- Thread::current_thread()->tid() == _thread->tid()) {
+ if (Thread::current_thread() && Thread::current_thread()->tid() == _thread->tid()) {
return Status::InvalidArgument("Can't join on own thread", -1, _thread->_name);
}
@@ -397,8 +477,15 @@ Status ThreadJoiner::join() {
}
waited_ms += wait_for;
}
- return Status::Aborted(strings::Substitute("Timed out after $0ms joining on $1",
- waited_ms, _thread->_name));
+ return Status::Aborted(
+ strings::Substitute("Timed out after $0ms joining on $1", waited_ms, _thread->_name));
}
+void register_thread_display_page(WebPageHandler* web_page_handler) {
+ web_page_handler->register_template_page(
+ "/threadz", "Threads",
+ std::bind(&ThreadMgr::display_thread_callback, thread_manager.get(),
+ std::placeholders::_1, std::placeholders::_2),
+ true);
+}
} // namespace doris
diff --git a/be/src/util/thread.h b/be/src/util/thread.h
index 1e1b1e9..2f587e6 100644
--- a/be/src/util/thread.h
+++ b/be/src/util/thread.h
@@ -18,27 +18,25 @@
#ifndef DORIS_BE_SRC_UTIL_THREAD_H
#define DORIS_BE_SRC_UTIL_THREAD_H
-#include <atomic>
#include <pthread.h>
#include <syscall.h>
+#include <atomic>
+
#include "common/status.h"
#include "gutil/ref_counted.h"
+#include "http/web_page_handler.h"
#include "util/countdown_latch.h"
namespace doris {
class Thread : public RefCountedThreadSafe<Thread> {
public:
- enum CreateFlags {
- NO_FLAGS = 0,
- NO_STACK_WATCHDOG = 1
- };
+ enum CreateFlags { NO_FLAGS = 0, NO_STACK_WATCHDOG = 1 };
template <class F>
static Status create_with_flags(const std::string& category, const std::string& name,
- const F& f, uint64_t flags,
- scoped_refptr<Thread>* holder) {
+ const F& f, uint64_t flags, scoped_refptr<Thread>* holder) {
return start_thread(category, name, f, flags, holder);
}
@@ -145,17 +143,15 @@ private:
};
// User function to be executed by this thread.
- typedef std::function<void ()> ThreadFunctor;
+ typedef std::function<void()> ThreadFunctor;
Thread(const std::string& category, const std::string& name, ThreadFunctor functor)
- : _thread(0),
- _tid(INVALID_TID),
- _functor(std::move(functor)),
- _category(std::move(category)),
- _name(std::move(name)),
- _done(1),
- _joinable(false)
- {}
-
+ : _thread(0),
+ _tid(INVALID_TID),
+ _functor(std::move(functor)),
+ _category(std::move(category)),
+ _name(std::move(name)),
+ _done(1),
+ _joinable(false) {}
// Library-specific thread ID.
pthread_t _thread;
@@ -172,7 +168,7 @@ private:
int64_t _tid;
const ThreadFunctor _functor;
-
+
const std::string _category;
const std::string _name;
@@ -188,7 +184,7 @@ private:
// Thread local pointer to the current thread of execution. Will be NULL if the current
// thread is not a Thread.
static __thread Thread* _tls;
-
+
// Wait for the running thread to publish its tid.
int64_t wait_for_tid() const;
@@ -280,6 +276,8 @@ private:
DISALLOW_COPY_AND_ASSIGN(ThreadJoiner);
};
+// Registers /threadz with the debug webserver.
+void register_thread_display_page(WebPageHandler* web_page_handler);
} //namespace doris
diff --git a/be/src/util/url_coding.cpp b/be/src/util/url_coding.cpp
index 08a671f..7e2624f 100644
--- a/be/src/util/url_coding.cpp
+++ b/be/src/util/url_coding.cpp
@@ -78,7 +78,7 @@ bool url_decode(const std::string& in, std::string* out) {
} else {
return false;
}
- } else if (in[i] == '+') {
+ } else if (in[i] == '+') {
(*out) += ' ';
} else {
(*out) += in[i];
@@ -122,7 +122,7 @@ static void encode_base64_internal(const std::string& in, std::string* out,
out->assign((char*)buf.get(), d - buf.get());
}
-void base64url_encode(const std::string& in, std::string *out) {
+void base64url_encode(const std::string& in, std::string* out) {
static unsigned char basis64[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
encode_base64_internal(in, out, basis64, false);
@@ -130,51 +130,39 @@ void base64url_encode(const std::string& in, std::string *out) {
void base64_encode(const std::string& in, std::string* out) {
static unsigned char basis64[] =
- "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
encode_base64_internal(in, out, basis64, true);
}
-static char encoding_table[] = {
- 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
- 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P',
- 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
- 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f',
- 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
- 'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
- 'w', 'x', 'y', 'z', '0', '1', '2', '3',
- '4', '5', '6', '7', '8', '9', '+', '/'
-};
+static char encoding_table[] = {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
+ 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
+ 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
+ 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '+', '/'};
static const char base64_pad = '=';
static short decoding_table[256] = {
- -2, -2, -2, -2, -2, -2, -2, -2, -2, -1, -1, -2, -2, -1, -2, -2,
- -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
- -1, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, 62, -2, -2, -2, 63,
- 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, -2, -2, -2, -2, -2, -2,
- -2, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
- 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, -2, -2, -2, -2, -2,
- -2, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
- 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, -2, -2, -2, -2, -2,
- -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
- -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
- -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
- -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
- -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
- -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
- -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
- -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2
-};
+ -2, -2, -2, -2, -2, -2, -2, -2, -2, -1, -1, -2, -2, -1, -2, -2, -2, -2, -2, -2, -2, -2,
+ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -1, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, 62,
+ -2, -2, -2, 63, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, -2, -2, -2, -2, -2, -2, -2, 0,
+ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22,
+ 23, 24, 25, -2, -2, -2, -2, -2, -2, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38,
+ 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, -2, -2, -2, -2, -2, -2, -2, -2, -2,
+ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
+ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
+ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
+ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
+ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2,
+ -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2, -2};
static int mod_table[] = {0, 2, 1};
-size_t base64_encode(const unsigned char *data,
- size_t length,
- unsigned char *encoded_data) {
- size_t output_length = (size_t) (4.0 * ceil((double) length / 3.0));
+size_t base64_encode(const unsigned char* data, size_t length, unsigned char* encoded_data) {
+ size_t output_length = (size_t)(4.0 * ceil((double)length / 3.0));
if (encoded_data == NULL) {
- return 0;
+ return 0;
}
for (uint32_t i = 0, j = 0; i < length;) {
@@ -196,11 +184,8 @@ size_t base64_encode(const unsigned char *data,
return output_length;
}
-static inline int64_t base64_decode(
- const char *data,
- size_t length,
- char *decoded_data) {
- const char *current = data;
+static inline int64_t base64_decode(const char* data, size_t length, char* decoded_data) {
+ const char* current = data;
int ch = 0;
int i = 0;
int j = 0;
@@ -232,7 +217,7 @@ static inline int64_t base64_decode(
decoded_data[j] = (ch & 0x0f) << 4;
break;
case 2:
- decoded_data[j++] |= ch >>2;
+ decoded_data[j++] |= ch >> 2;
decoded_data[j] = (ch & 0x03) << 6;
break;
case 3:
@@ -279,7 +264,7 @@ bool base64_decode(const std::string& in, std::string* out) {
}
void escape_for_html(const std::string& in, std::stringstream* out) {
- for (auto& c: in) {
+ for (auto& c : in) {
switch (c) {
case '<':
(*out) << "<";
@@ -298,5 +283,9 @@ void escape_for_html(const std::string& in, std::stringstream* out) {
}
}
}
-
+std::string escape_for_html_to_string(const std::string& in) {
+ std::stringstream str;
+ escape_for_html(in, &str);
+ return str.str();
+}
}
diff --git a/be/src/util/url_coding.h b/be/src/util/url_coding.h
index 7a9457e..37ca4a7 100644
--- a/be/src/util/url_coding.h
+++ b/be/src/util/url_coding.h
@@ -18,9 +18,9 @@
#ifndef DORIS_BE_SRC_COMMON_UTIL_URL_CODING_H
#define DORIS_BE_SRC_COMMON_UTIL_URL_CODING_H
+#include <boost/cstdint.hpp>
#include <string>
#include <vector>
-#include <boost/cstdint.hpp>
namespace doris {
@@ -39,8 +39,8 @@ void url_encode(const std::vector<uint8_t>& in, std::string* out);
// certain characters like ' '.
bool url_decode(const std::string& in, std::string* out);
-void base64url_encode(const std::string& in, std::string *out);
-void base64_encode(const std::string& in, std::string *out);
+void base64url_encode(const std::string& in, std::string* out);
+void base64_encode(const std::string& in, std::string* out);
// Utility method to decode base64 encoded strings. Also not extremely
// performant.
@@ -54,6 +54,8 @@ bool base64_decode(const std::string& in, std::string* out);
// judiciously.
void escape_for_html(const std::string& in, std::stringstream* out);
+// Same as above, but returns a string.
+std::string escape_for_html_to_string(const std::string& in);
}
#endif
diff --git a/webroot/be/threadz.mustache b/webroot/be/threadz.mustache
new file mode 100644
index 0000000..77f575f
--- /dev/null
+++ b/webroot/be/threadz.mustache
@@ -0,0 +1,68 @@
+{{!
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+}}
+
+{{#requested_thread_group}}
+<h2>Thread Group: {{group_name}}</h2>
+{{#requested_all}}<h3>All Threads : </h3>{{/requested_all}}
+{{#found}}
+<table class='table table-hover' data-sort-name='name' data-toggle='table'>
+ <thead>
+ <tr>
+ <th data-field='name' data-sortable='true' data-sorter='stringsSorter'>Thread name</th>
+ <th data-sortable='true' data-sorter='floatsSorter'>Cumulative User CPU (s)</th>
+ <th data-sortable='true' data-sorter='floatsSorter'>Cumulative Kernel CPU (s)</th>
+ <th data-sortable='true' data-sorter='floatsSorter'>Cumulative IO-wait (s)</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{#threads}}
+ <tr>
+ <td>{{thread_name}}</td>
+ <td>{{user_sec}}</td>
+ <td>{{kernel_sec}}</td>
+ <td>{{iowait_sec}}</td>
+ </tr>
+ {{/threads}}
+ </tbody>
+</table>
+{{/found}}
+{{^found}}Thread group {{group_name}} not found{{/found}}
+{{/requested_thread_group}}
+
+{{^requested_thread_group}}
+<h2>Thread Groups</h2>
+<h4>{{total_threads_running}} thread(s) running</h4>
+<a href='{{base_url}}/threadz?group=all'><h3>All Threads</h3></a>
+<table class='table table-hover' data-sort-name='group' data-toggle='table'>
+ <thead>
+ <tr>
+ <th data-field='group' data-sortable='true' data-sorter='stringsSorter'>Group</th>
+ <th data-sortable='true' data-sorter='numericStringsSorter'>Threads running</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{#groups}}
+ <tr>
+ <td><a href='{{base_url}}/threadz?group={{encoded_group_name}}'>{{group_name}}</a></td>
+ <td>{{threads_running}}</td>
+ </tr>
+ {{/groups}}
+ </tbody>
+</table>
+{{/requested_thread_group}}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org