You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/01/15 01:45:22 UTC

[incubator-doris] branch master updated: [TSAN] Fix tsan bugs (part 1) (#5162)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 58e58c9  [TSAN] Fix tsan bugs (part 1) (#5162)
58e58c9 is described below

commit 58e58c94d81a4b4fafeb64617b822c98bb577e69
Author: Yingchun Lai <40...@qq.com>
AuthorDate: Fri Jan 15 09:45:11 2021 +0800

    [TSAN] Fix tsan bugs (part 1) (#5162)
    
    ThreadSanitizer, aka TSAN, is a useful tool to detect multi-thread
    problems, such as data race, mutex problems, etc.
    We should detect TSAN problems for Doris BE, both unit tests and
    server should pass through TSAN mode, to make Doris more robustness.
    This is the very beginning patch to fix TSAN problems, and some
    difficult problems are suppressed in file 'tsan_suppressions', you
    can suppress these problems by setting:
    export TSAN_OPTIONS="suppressions=tsan_suppressions"
    
    before running:
    `BUILD_TYPE=tsan ./run-be-ut.sh --run`
---
 be/CMakeLists.txt                       |   4 +-
 be/src/exec/data_sink.cpp               |   1 +
 be/src/exec/tablet_sink.cpp             |  38 ++++++------
 be/src/exec/tablet_sink.h               |  14 ++---
 be/src/http/ev_http_server.cpp          |  33 +++++------
 be/src/http/ev_http_server.h            |   7 ++-
 be/src/olap/data_dir.cpp                | 101 ++++++++++++++++----------------
 be/src/olap/data_dir.h                  |   2 +-
 be/src/olap/olap_server.cpp             |   8 ++-
 be/src/olap/storage_engine.cpp          |   1 +
 be/src/runtime/disk_io_mgr.h            |   2 +-
 be/src/runtime/fragment_mgr.cpp         |   9 +--
 be/src/util/blocking_priority_queue.hpp |   8 +--
 be/src/util/condition_variable.cpp      |   4 ++
 be/src/util/debug/sanitizer_scopes.h    |  46 +++++++++++++++
 be/src/util/priority_thread_pool.hpp    |  11 +---
 be/src/util/runtime_profile.h           |  11 ++--
 be/src/util/spinlock.h                  |   5 +-
 be/src/util/thread.cpp                  |   7 +--
 be/src/util/threadpool.cpp              |   3 +
 be/test/exec/tablet_sink_test.cpp       |  32 +++++-----
 be/test/plugin/plugin_zip_test.cpp      |   2 -
 thirdparty/build-thirdparty.sh          |  12 ++++
 thirdparty/vars.sh                      |   8 ++-
 tsan_suppressions                       |  24 ++++++++
 25 files changed, 238 insertions(+), 155 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index af30e50..a9d0c6b 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -315,7 +315,8 @@ SET(CXX_FLAGS_UBSAN "${CXX_GCC_FLAGS} -ggdb3 -O0 -gdwarf-2 -fno-wrapv -fsanitize
 
 # Set the flags to the thread sanitizer, also known as "tsan"
 # Turn on sanitizer and debug symbols to get stack traces:
-SET(CXX_FLAGS_TSAN "${CXX_GCC_FLAGS} -O0 -ggdb3 -fsanitize=thread -DTHREAD_SANITIZER")
+# Use -Wno-builtin-declaration-mismatch to mute warnings like "new declaration ‘__tsan_atomic16 __tsan_atomic16_fetch_nand(..."
+SET(CXX_FLAGS_TSAN "${CXX_GCC_FLAGS} -O0 -ggdb3 -fsanitize=thread -DTHREAD_SANITIZER -Wno-builtin-declaration-mismatch")
 
 # Set compile flags based on the build type.
 if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG")
@@ -467,6 +468,7 @@ elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN")
     set(DORIS_LINK_LIBS ${DORIS_LINK_LIBS} -static-libubsan tcmalloc)
 elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
     set(DORIS_LINK_LIBS ${DORIS_LINK_LIBS} -static-libtsan)
+    add_definitions("-DTHREAD_SANITIZER")
 else()
     message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
 endif()
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 218e29f..84c46ab 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -159,6 +159,7 @@ Status DataSink::init(const TDataSink& thrift_sink) {
 
 Status DataSink::prepare(RuntimeState* state) {
     _expr_mem_tracker = MemTracker::CreateTracker(
+            // TODO(yingchun): use subclass' name
             -1, std::string("DataSink:") + std::to_string(state->load_job_id()),
             state->instance_mem_tracker());
     return Status::OK();
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 47305d9..1567721 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -27,6 +27,7 @@
 #include "runtime/tuple_row.h"
 #include "service/brpc.h"
 #include "util/brpc_stub_cache.h"
+#include "util/debug/sanitizer_scopes.h"
 #include "util/monotime.h"
 #include "util/uid_util.h"
 
@@ -200,14 +201,14 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
     // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close().
     while (!_cancelled && _parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD) &&
            _pending_batches_num > 0) {
-        SCOPED_RAW_TIMER(&_mem_exceeded_block_ns);
+        SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
         SleepFor(MonoDelta::FromMilliseconds(10));
     }
 
     auto row_no = _cur_batch->add_row();
     if (row_no == RowBatch::INVALID_ROW_INDEX) {
         {
-            SCOPED_RAW_TIMER(&_queue_push_lock_ns);
+            SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
             std::lock_guard<std::mutex> l(_pending_batches_lock);
             //To simplify the add_row logic, postpone adding batch into req until the time of sending req
             _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request);
@@ -235,6 +236,7 @@ Status NodeChannel::mark_close() {
 
     _cur_add_batch_request.set_eos(true);
     {
+        debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
         std::lock_guard<std::mutex> l(_pending_batches_lock);
         _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request);
         _pending_batches_num++;
@@ -311,10 +313,11 @@ int NodeChannel::try_send_and_fetch_status() {
     }
 
     if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0) {
-        SCOPED_RAW_TIMER(&_actual_consume_ns);
+        SCOPED_ATOMIC_TIMER(&_actual_consume_ns);
         AddBatchReq send_batch;
         {
-            std::lock_guard<std::mutex> lg(_pending_batches_lock);
+            debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+            std::lock_guard<std::mutex> l(_pending_batches_lock);
             DCHECK(!_pending_batches.empty());
             send_batch = std::move(_pending_batches.front());
             _pending_batches.pop();
@@ -327,7 +330,7 @@ int NodeChannel::try_send_and_fetch_status() {
         // tablet_ids has already set when add row
         request.set_packet_seq(_next_packet_seq);
         if (row_batch->num_rows() > 0) {
-            SCOPED_RAW_TIMER(&_serialize_batch_ns);
+            SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
             row_batch->serialize(request.mutable_row_batch());
         }
 
@@ -394,7 +397,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart
         for (auto& node_id : location->node_ids) {
             NodeChannel* channel = nullptr;
             auto it = _node_channels.find(node_id);
-            if (it == std::end(_node_channels)) {
+            if (it == _node_channels.end()) {
                 channel = _parent->_pool->add(
                         new NodeChannel(_parent, _index_id, node_id, _schema_hash));
                 _node_channels.emplace(node_id, channel);
@@ -414,7 +417,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart
 
 Status IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) {
     auto it = _channels_by_tablet.find(tablet_id);
-    DCHECK(it != std::end(_channels_by_tablet)) << "unknown tablet, tablet_id=" << tablet_id;
+    DCHECK(it != _channels_by_tablet.end()) << "unknown tablet, tablet_id=" << tablet_id;
     for (auto channel : it->second) {
         // if this node channel is already failed, this add_row will be skipped
         auto st = channel->add_row(tuple, tablet_id);
@@ -460,12 +463,8 @@ Status OlapTableSink::init(const TDataSink& t_sink) {
     _load_id.set_hi(table_sink.load_id.hi);
     _load_id.set_lo(table_sink.load_id.lo);
     _txn_id = table_sink.txn_id;
-    _db_id = table_sink.db_id;
-    _table_id = table_sink.table_id;
     _num_replicas = table_sink.num_replicas;
     _need_gen_rollup = table_sink.need_gen_rollup;
-    _db_name = table_sink.db_name;
-    _table_name = table_sink.table_name;
     _tuple_desc_id = table_sink.tuple_id;
     _schema.reset(new OlapTableSchemaParam());
     RETURN_IF_ERROR(_schema->init(table_sink.schema));
@@ -572,13 +571,13 @@ Status OlapTableSink::prepare(RuntimeState* state) {
     _load_mem_limit = state->get_load_mem_limit();
 
     // open all channels
-    auto& partitions = _partition->get_partitions();
+    const auto& partitions = _partition->get_partitions();
     for (int i = 0; i < _schema->indexes().size(); ++i) {
         // collect all tablets belong to this rollup
         std::vector<TTabletWithPartition> tablets;
         auto index = _schema->indexes()[i];
-        for (auto part : partitions) {
-            for (auto tablet : part->indexes[i].tablets) {
+        for (const auto& part : partitions) {
+            for (const auto& tablet : part->indexes[i].tablets) {
                 TTabletWithPartition tablet_with_partition;
                 tablet_with_partition.partition_id = part->id;
                 tablet_with_partition.tablet_id = tablet;
@@ -710,11 +709,13 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
                                                       &serialize_batch_ns, &mem_exceeded_block_ns,
                                                       &queue_push_lock_ns,
                                                       &actual_consume_ns](NodeChannel* ch) {
-                    status = ch->close_wait(state);
-                    if (!status.ok()) {
+                    auto s = ch->close_wait(state);
+                    if (!s.ok()) {
+                        // 'status' will store the last non-ok status of all channels
+                        status = s;
                         LOG(WARNING)
                                 << ch->name() << ": close channel failed, " << ch->print_load_info()
-                                << ". error_msg=" << status.get_error_msg();
+                                << ". error_msg=" << s.get_error_msg();
                     }
                     ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns,
                                     &mem_exceeded_block_ns, &queue_push_lock_ns,
@@ -733,7 +734,6 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
         COUNTER_SET(_send_data_timer, _send_data_ns);
         COUNTER_SET(_convert_batch_timer, _convert_batch_ns);
         COUNTER_SET(_validate_data_timer, _validate_data_ns);
-        COUNTER_SET(_non_blocking_send_timer, _non_blocking_send_ns);
         COUNTER_SET(_serialize_batch_timer, serialize_batch_ns);
         // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
         int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() +
@@ -939,7 +939,7 @@ int OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitmap*
 }
 
 void OlapTableSink::_send_batch_process() {
-    SCOPED_RAW_TIMER(&_non_blocking_send_ns);
+    SCOPED_TIMER(_non_blocking_send_timer);
     do {
         int running_channels_num = 0;
         for (auto index_channel : _channels) {
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index d3ae181..8d894c6 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -237,11 +237,10 @@ private:
     std::vector<TTabletCommitInfo> _tablet_commit_infos;
 
     AddBatchCounter _add_batch_counter;
-    int64_t _serialize_batch_ns = 0;
-
-    int64_t _mem_exceeded_block_ns = 0;
-    int64_t _queue_push_lock_ns = 0;
-    int64_t _actual_consume_ns = 0;
+    std::atomic<int64_t> _serialize_batch_ns;
+    std::atomic<int64_t> _mem_exceeded_block_ns;
+    std::atomic<int64_t> _queue_push_lock_ns;
+    std::atomic<int64_t> _actual_consume_ns;
 };
 
 class IndexChannel {
@@ -328,12 +327,8 @@ private:
     // unique load id
     PUniqueId _load_id;
     int64_t _txn_id = -1;
-    int64_t _db_id = -1;
-    int64_t _table_id = -1;
     int _num_replicas = -1;
     bool _need_gen_rollup = false;
-    std::string _db_name;
-    std::string _table_name;
     int _tuple_desc_id = -1;
 
     // this is tuple descriptor of destination OLAP table
@@ -378,7 +373,6 @@ private:
     int64_t _convert_batch_ns = 0;
     int64_t _validate_data_ns = 0;
     int64_t _send_data_ns = 0;
-    int64_t _non_blocking_send_ns = 0;
     int64_t _serialize_batch_ns = 0;
     int64_t _number_input_rows = 0;
     int64_t _number_output_rows = 0;
diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp
index b2ff9ad..e958246 100644
--- a/be/src/http/ev_http_server.cpp
+++ b/be/src/http/ev_http_server.cpp
@@ -74,20 +74,15 @@ static int on_connection(struct evhttp_request* req, void* param) {
 EvHttpServer::EvHttpServer(int port, int num_workers)
         : _host("0.0.0.0"), _port(port), _num_workers(num_workers), _real_port(0) {
     DCHECK_GT(_num_workers, 0);
-    auto res = pthread_rwlock_init(&_rw_lock, nullptr);
-    DCHECK_EQ(res, 0);
 }
 
 EvHttpServer::EvHttpServer(const std::string& host, int port, int num_workers)
         : _host(host), _port(port), _num_workers(num_workers), _real_port(0) {
     DCHECK_GT(_num_workers, 0);
-    auto res = pthread_rwlock_init(&_rw_lock, nullptr);
-    DCHECK_EQ(res, 0);
 }
 
 EvHttpServer::~EvHttpServer() {
     stop();
-    pthread_rwlock_destroy(&_rw_lock);
 }
 
 void EvHttpServer::start() {
@@ -100,14 +95,17 @@ void EvHttpServer::start() {
             .build(&_workers);
 
     evthread_use_pthreads();
-    event_bases.resize(_num_workers);
+    _event_bases.resize(_num_workers);
     for (int i = 0; i < _num_workers; ++i) {
         CHECK(_workers->submit_func([this, i]() {
                           std::shared_ptr<event_base> base(event_base_new(), [](event_base* base) {
                               event_base_free(base);
                           });
                           CHECK(base != nullptr) << "Couldn't create an event_base.";
-                          event_bases[i] = base;
+                          {
+                              std::lock_guard<std::mutex> lock(_event_bases_lock);
+                              _event_bases[i] = base;
+                          }
 
                           /* Create a new evhttp object to handle requests. */
                           std::shared_ptr<evhttp> http(evhttp_new(base.get()),
@@ -127,9 +125,13 @@ void EvHttpServer::start() {
 }
 
 void EvHttpServer::stop() {
-    for (int i = 0; i < _num_workers; ++i) {
-        LOG(WARNING) << "event_base_loopexit ret: "
-                     << event_base_loopexit(event_bases[i].get(), nullptr);
+    {
+        std::lock_guard<std::mutex> lock(_event_bases_lock);
+        for (int i = 0; i < _num_workers; ++i) {
+            LOG(WARNING) << "event_base_loopexit ret: "
+                         << event_base_loopexit(_event_bases[i].get(), nullptr);
+        }
+        _event_bases.clear();
     }
     _workers->shutdown();
     close(_server_fd);
@@ -180,7 +182,7 @@ bool EvHttpServer::register_handler(const HttpMethod& method, const std::string&
     }
 
     bool result = true;
-    pthread_rwlock_wrlock(&_rw_lock);
+    std::lock_guard<std::mutex> lock(_handler_lock);
     PathTrie<HttpHandler*>* root = nullptr;
     switch (method) {
     case GET:
@@ -208,17 +210,15 @@ bool EvHttpServer::register_handler(const HttpMethod& method, const std::string&
     if (result) {
         result = root->insert(path, handler);
     }
-    pthread_rwlock_unlock(&_rw_lock);
-
+    
     return result;
 }
 
 void EvHttpServer::register_static_file_handler(HttpHandler* handler) {
     DCHECK(handler != nullptr);
     DCHECK(_static_file_handler == nullptr);
-    pthread_rwlock_wrlock(&_rw_lock);
+    std::lock_guard<std::mutex> lock(_handler_lock);
     _static_file_handler = handler;
-    pthread_rwlock_unlock(&_rw_lock);
 }
 
 int EvHttpServer::on_header(struct evhttp_request* ev_req) {
@@ -258,7 +258,7 @@ HttpHandler* EvHttpServer::_find_handler(HttpRequest* req) {
 
     HttpHandler* handler = nullptr;
 
-    pthread_rwlock_rdlock(&_rw_lock);
+    std::lock_guard<std::mutex> lock(_handler_lock);
     switch (req->method()) {
     case GET:
         _get_handlers.retrieve(path, &handler, req->params());
@@ -286,7 +286,6 @@ HttpHandler* EvHttpServer::_find_handler(HttpRequest* req) {
         LOG(WARNING) << "unknown HTTP method, method=" << req->method();
         break;
     }
-    pthread_rwlock_unlock(&_rw_lock);
     return handler;
 }
 
diff --git a/be/src/http/ev_http_server.h b/be/src/http/ev_http_server.h
index d6ff336..af25d58 100644
--- a/be/src/http/ev_http_server.h
+++ b/be/src/http/ev_http_server.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <mutex>
 #include <string>
 #include <thread>
 #include <vector>
@@ -66,10 +67,10 @@ private:
 
     int _server_fd = -1;
     std::unique_ptr<ThreadPool> _workers;
-    std::vector<std::shared_ptr<event_base>> event_bases;
-
-    pthread_rwlock_t _rw_lock;
+    std::mutex _event_bases_lock;    // protect _event_bases
+    std::vector<std::shared_ptr<event_base>> _event_bases;
 
+    std::mutex _handler_lock;
     PathTrie<HttpHandler*> _get_handlers;
     HttpHandler* _static_file_handler = nullptr;
     PathTrie<HttpHandler*> _put_handlers;
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 9c20ddf..4bf7b97 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -114,8 +114,9 @@ Status DataDir::init() {
 }
 
 void DataDir::stop_bg_worker() {
+    std::unique_lock<std::mutex> lck(_check_path_mutex);
     _stop_bg_worker = true;
-    _cv.notify_one();
+    _check_path_cv.notify_one();
 }
 
 Status DataDir::_init_cluster_id() {
@@ -807,13 +808,13 @@ void DataDir::remove_pending_ids(const std::string& id) {
 // gc unused tablet schemahash dir
 void DataDir::perform_path_gc_by_tablet() {
     std::unique_lock<std::mutex> lck(_check_path_mutex);
-    _cv.wait(lck, [this] { return _stop_bg_worker || !_all_tablet_schemahash_paths.empty(); });
+    _check_path_cv.wait(lck, [this] { return _stop_bg_worker || !_all_tablet_schemahash_paths.empty(); });
     if (_stop_bg_worker) {
         return;
     }
     LOG(INFO) << "start to path gc by tablet schemahash.";
     int counter = 0;
-    for (auto& path : _all_tablet_schemahash_paths) {
+    for (const auto& path : _all_tablet_schemahash_paths) {
         ++counter;
         if (config::path_gc_check_step > 0 && counter % config::path_gc_check_step == 0) {
             SleepFor(MonoDelta::FromMilliseconds(config::path_gc_check_step_interval_ms));
@@ -857,13 +858,13 @@ void DataDir::perform_path_gc_by_rowsetid() {
     // init the set of valid path
     // validate the path in data dir
     std::unique_lock<std::mutex> lck(_check_path_mutex);
-    _cv.wait(lck, [this] { return _stop_bg_worker || !_all_check_paths.empty(); });
+    _check_path_cv.wait(lck, [this] { return _stop_bg_worker || !_all_check_paths.empty(); });
     if (_stop_bg_worker) {
         return;
     }
     LOG(INFO) << "start to path gc by rowsetid.";
     int counter = 0;
-    for (auto& path : _all_check_paths) {
+    for (const auto& path : _all_check_paths) {
         ++counter;
         if (config::path_gc_check_step > 0 && counter % config::path_gc_check_step == 0) {
             SleepFor(MonoDelta::FromMilliseconds(config::path_gc_check_step_interval_ms));
@@ -899,65 +900,63 @@ void DataDir::perform_path_gc_by_rowsetid() {
 
 // path producer
 void DataDir::perform_path_scan() {
-    {
-        std::unique_lock<std::mutex> lck(_check_path_mutex);
-        if (!_all_check_paths.empty()) {
-            LOG(INFO) << "_all_check_paths is not empty when path scan.";
-            return;
-        }
-        LOG(INFO) << "start to scan data dir path:" << _path;
-        std::set<std::string> shards;
-        std::string data_path = _path + DATA_PREFIX;
+    std::unique_lock<std::mutex> lck(_check_path_mutex);
+    if (!_all_check_paths.empty()) {
+        LOG(INFO) << "_all_check_paths is not empty when path scan.";
+        return;
+    }
+    LOG(INFO) << "start to scan data dir path:" << _path;
+    std::set<std::string> shards;
+    std::string data_path = _path + DATA_PREFIX;
 
-        Status ret = FileUtils::list_dirs_files(data_path, &shards, nullptr, Env::Default());
+    Status ret = FileUtils::list_dirs_files(data_path, &shards, nullptr, Env::Default());
+    if (!ret.ok()) {
+        LOG(WARNING) << "fail to walk dir. path=[" + data_path << "] error[" << ret.to_string()
+                     << "]";
+        return;
+    }
+
+    for (const auto& shard : shards) {
+        std::string shard_path = data_path + "/" + shard;
+        std::set<std::string> tablet_ids;
+        ret = FileUtils::list_dirs_files(shard_path, &tablet_ids, nullptr, Env::Default());
         if (!ret.ok()) {
-            LOG(WARNING) << "fail to walk dir. path=[" + data_path << "] error[" << ret.to_string()
-                         << "]";
-            return;
+            LOG(WARNING) << "fail to walk dir. [path=" << shard_path << "] error["
+                         << ret.to_string() << "]";
+            continue;
         }
-
-        for (const auto& shard : shards) {
-            std::string shard_path = data_path + "/" + shard;
-            std::set<std::string> tablet_ids;
-            ret = FileUtils::list_dirs_files(shard_path, &tablet_ids, nullptr, Env::Default());
+        for (const auto& tablet_id : tablet_ids) {
+            std::string tablet_id_path = shard_path + "/" + tablet_id;
+            std::set<std::string> schema_hashes;
+            ret = FileUtils::list_dirs_files(tablet_id_path, &schema_hashes, nullptr,
+                                             Env::Default());
             if (!ret.ok()) {
-                LOG(WARNING) << "fail to walk dir. [path=" << shard_path << "] error["
-                             << ret.to_string() << "]";
+                LOG(WARNING) << "fail to walk dir. [path=" << tablet_id_path << "]"
+                             << " error[" << ret.to_string() << "]";
                 continue;
             }
-            for (const auto& tablet_id : tablet_ids) {
-                std::string tablet_id_path = shard_path + "/" + tablet_id;
-                std::set<std::string> schema_hashes;
-                ret = FileUtils::list_dirs_files(tablet_id_path, &schema_hashes, nullptr,
-                                                 Env::Default());
+            for (const auto& schema_hash : schema_hashes) {
+                std::string tablet_schema_hash_path = tablet_id_path + "/" + schema_hash;
+                _all_tablet_schemahash_paths.insert(tablet_schema_hash_path);
+
+                std::set<std::string> rowset_files;
+                ret = FileUtils::list_dirs_files(tablet_schema_hash_path, nullptr,
+                                                 &rowset_files, Env::Default());
                 if (!ret.ok()) {
-                    LOG(WARNING) << "fail to walk dir. [path=" << tablet_id_path << "]"
-                                 << " error[" << ret.to_string() << "]";
+                    LOG(WARNING) << "fail to walk dir. [path=" << tablet_schema_hash_path
+                                 << "] error[" << ret.to_string() << "]";
                     continue;
                 }
-                for (const auto& schema_hash : schema_hashes) {
-                    std::string tablet_schema_hash_path = tablet_id_path + "/" + schema_hash;
-                    _all_tablet_schemahash_paths.insert(tablet_schema_hash_path);
-                    std::set<std::string> rowset_files;
-
-                    ret = FileUtils::list_dirs_files(tablet_schema_hash_path, nullptr,
-                                                     &rowset_files, Env::Default());
-                    if (!ret.ok()) {
-                        LOG(WARNING) << "fail to walk dir. [path=" << tablet_schema_hash_path
-                                     << "] error[" << ret.to_string() << "]";
-                        continue;
-                    }
-                    for (const auto& rowset_file : rowset_files) {
-                        std::string rowset_file_path = tablet_schema_hash_path + "/" + rowset_file;
-                        _all_check_paths.insert(rowset_file_path);
-                    }
+                for (const auto& rowset_file : rowset_files) {
+                    std::string rowset_file_path = tablet_schema_hash_path + "/" + rowset_file;
+                    _all_check_paths.insert(rowset_file_path);
                 }
             }
         }
-        LOG(INFO) << "scan data dir path:" << _path
-                  << " finished. path size:" << _all_check_paths.size();
     }
-    _cv.notify_one();
+    LOG(INFO) << "scan data dir path: " << _path
+              << " finished. path size: " << _all_check_paths.size() + _all_tablet_schemahash_paths.size();
+    _check_path_cv.notify_one();
 }
 
 void DataDir::_process_garbage_path(const std::string& path) {
diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h
index a3c2d22..c05b50e 100644
--- a/be/src/olap/data_dir.h
+++ b/be/src/olap/data_dir.h
@@ -190,7 +190,7 @@ private:
     RowsetIdGenerator* _id_generator = nullptr;
 
     std::mutex _check_path_mutex;
-    std::condition_variable _cv;
+    std::condition_variable _check_path_cv;
     std::set<std::string> _all_check_paths;
     std::set<std::string> _all_tablet_schemahash_paths;
 
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 8f87c25..2026150 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -324,7 +324,8 @@ void StorageEngine::_compaction_tasks_producer_callback() {
 
     int round = 0;
     CompactionType compaction_type;
-    while (true) {
+    int32_t interval = 1;
+    do {
         if (!config::disable_auto_compaction) {
             if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
                 compaction_type = CompactionType::CUMULATIVE_COMPACTION;
@@ -387,10 +388,11 @@ void StorageEngine::_compaction_tasks_producer_callback() {
                     tablet->reset_compaction(compaction_type);
                 }
             }
+            interval = 1;
         } else {
-            sleep(config::check_auto_compaction_interval_seconds);
+            interval = config::check_auto_compaction_interval_seconds * 1000;
         }
-    }
+    } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromMilliseconds(interval)));
 }
 
 std::vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 0385650..3b952c3 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -510,6 +510,7 @@ void StorageEngine::stop() {
         thread->join();     \
     }
 
+    THREAD_JOIN(_compaction_tasks_producer_thread);
     THREAD_JOIN(_unused_rowset_monitor_thread);
     THREAD_JOIN(_garbage_sweeper_thread);
     THREAD_JOIN(_disk_stat_monitor_thread);
diff --git a/be/src/runtime/disk_io_mgr.h b/be/src/runtime/disk_io_mgr.h
index e863107..86e6630 100644
--- a/be/src/runtime/disk_io_mgr.h
+++ b/be/src/runtime/disk_io_mgr.h
@@ -715,7 +715,7 @@ private:
 
     // True if the IoMgr should be torn down. Worker threads watch for this to
     // know to terminate. This variable is read/written to by different threads.
-    volatile bool _shut_down;
+    std::atomic<bool> _shut_down;
 
     // Total bytes read by the IoMgr.
     RuntimeProfile::Counter _total_bytes_read_counter;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index d74875b..089921a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -384,18 +384,19 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
         return _fragment_map.size();
     });
 
-    CHECK(Thread::create(
+    auto s = Thread::create(
                   "FragmentMgr", "cancel_timeout_plan_fragment",
-                  [this]() { this->cancel_worker(); }, &_cancel_thread)
-                  .ok());
+                  [this]() { this->cancel_worker(); }, &_cancel_thread);
+    CHECK(s.ok()) << s.to_string();
 
     // TODO(zc): we need a better thread-pool
     // now one user can use all the thread pool, others have no resource.
-    ThreadPoolBuilder("FragmentMgrThreadPool")
+    s = ThreadPoolBuilder("FragmentMgrThreadPool")
             .set_min_threads(config::fragment_pool_thread_num_min)
             .set_max_threads(config::fragment_pool_thread_num_max)
             .set_max_queue_size(config::fragment_pool_queue_size)
             .build(&_thread_pool);
+    CHECK(s.ok()) << s.to_string();
 }
 
 FragmentMgr::~FragmentMgr() {
diff --git a/be/src/util/blocking_priority_queue.hpp b/be/src/util/blocking_priority_queue.hpp
index c68dfbe..815fa96 100644
--- a/be/src/util/blocking_priority_queue.hpp
+++ b/be/src/util/blocking_priority_queue.hpp
@@ -140,11 +140,7 @@ public:
 
     // Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put.
     void shutdown() {
-        {
-            boost::lock_guard<boost::mutex> guard(_lock);
-            _shutdown = true;
-        }
-
+        _shutdown = true;
         _get_cv.notify_all();
         _put_cv.notify_all();
     }
@@ -167,7 +163,7 @@ public:
     }
 
 private:
-    bool _shutdown;
+    std::atomic<bool> _shutdown;
     const int _max_element;
     boost::condition_variable _get_cv;   // 'get' callers wait on this
     boost::condition_variable _put_cv;   // 'put' callers wait on this
diff --git a/be/src/util/condition_variable.cpp b/be/src/util/condition_variable.cpp
index aaf72ca..ac763fc 100644
--- a/be/src/util/condition_variable.cpp
+++ b/be/src/util/condition_variable.cpp
@@ -11,6 +11,7 @@
 #include <ctime>
 
 #include "common/logging.h"
+#include "util/debug/sanitizer_scopes.h"
 #include "util/monotime.h"
 #include "util/mutex.h"
 
@@ -33,11 +34,13 @@ ConditionVariable::~ConditionVariable() {
 }
 
 void ConditionVariable::wait() const {
+    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
     int rv = pthread_cond_wait(&_condition, _user_mutex);
     DCHECK_EQ(0, rv);
 }
 
 bool ConditionVariable::wait_until(const MonoTime& until) const {
+    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
     // Have we already timed out?
     MonoTime now = MonoTime::Now();
     if (now > until) {
@@ -53,6 +56,7 @@ bool ConditionVariable::wait_until(const MonoTime& until) const {
 }
 
 bool ConditionVariable::wait_for(const MonoDelta& delta) const {
+    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
     // Negative delta means we've already timed out.
     int64_t nsecs = delta.ToNanoseconds();
     if (nsecs < 0) {
diff --git a/be/src/util/debug/sanitizer_scopes.h b/be/src/util/debug/sanitizer_scopes.h
new file mode 100644
index 0000000..363d6d7
--- /dev/null
+++ b/be/src/util/debug/sanitizer_scopes.h
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Wrappers around the annotations from gutil/dynamic_annotations.h,
+// provided as C++-style scope guards.
+
+#pragma once
+
+#include "gutil/dynamic_annotations.h"
+#include "gutil/macros.h"
+
+namespace doris {
+namespace debug {
+
+// Scope guard which instructs TSAN to ignore all reads and writes
+// on the current thread as long as it is alive. These may be safely
+// nested.
+class ScopedTSANIgnoreReadsAndWrites {
+ public:
+  ScopedTSANIgnoreReadsAndWrites() {
+    ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
+  }
+  ~ScopedTSANIgnoreReadsAndWrites() {
+    ANNOTATE_IGNORE_READS_AND_WRITES_END();
+  }
+ private:
+  DISALLOW_COPY_AND_ASSIGN(ScopedTSANIgnoreReadsAndWrites);
+};
+
+} // namespace debug
+} // namespace doris
+
diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp
index 68f50d7..b0e836c 100644
--- a/be/src/util/priority_thread_pool.hpp
+++ b/be/src/util/priority_thread_pool.hpp
@@ -97,10 +97,7 @@ public:
     // Returns once the shutdown flag has been set, does not wait for the threads to
     // terminate.
     void shutdown() {
-        {
-            boost::lock_guard<boost::mutex> l(_lock);
-            _shutdown = true;
-        }
+        _shutdown = true;
         _work_queue.shutdown();
     }
 
@@ -143,9 +140,7 @@ private:
         }
     }
 
-    // Returns value of _shutdown under a lock, forcing visibility to threads in the pool.
     bool is_shutdown() {
-        boost::lock_guard<boost::mutex> l(_lock);
         return _shutdown;
     }
 
@@ -156,11 +151,11 @@ private:
     // Collection of worker threads that process work from the queue.
     boost::thread_group _threads;
 
-    // Guards _shutdown and _empty_cv
+    // Guards _empty_cv
     boost::mutex _lock;
 
     // Set to true when threads should stop doing work and terminate.
-    bool _shutdown;
+    std::atomic<bool> _shutdown;
 
     // Signalled when the queue becomes empty
     boost::condition_variable _empty_cv;
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index 2bd2da7..58eedb1 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -52,7 +52,9 @@ namespace doris {
 #define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \
     ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled)
 #define SCOPED_RAW_TIMER(c) \
-    ScopedRawTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_RAW_TIMER, __COUNTER__)(c)
+    ScopedRawTimer<MonotonicStopWatch, int64_t> MACRO_CONCAT(SCOPED_RAW_TIMER, __COUNTER__)(c)
+#define SCOPED_ATOMIC_TIMER(c) \
+    ScopedRawTimer<MonotonicStopWatch, std::atomic<int64_t>> MACRO_CONCAT(SCOPED_ATOMIC_TIMER, __COUNTER__)(c)
 #define COUNTER_UPDATE(c, v) (c)->update(v)
 #define COUNTER_SET(c, v) (c)->set(v)
 #define ADD_THREAD_COUNTERS(profile, prefix) (profile)->add_thread_counters(prefix)
@@ -64,6 +66,7 @@ namespace doris {
 #define ADD_TIMER(profile, name) NULL
 #define SCOPED_TIMER(c)
 #define SCOPED_RAW_TIMER(c)
+#define SCOPED_ATOMIC_TIMER(c)
 #define COUNTER_UPDATE(c, v)
 #define COUNTER_SET(c, v)
 #define ADD_THREADCOUNTERS(profile, prefix) NULL
@@ -670,10 +673,10 @@ private:
 // Utility class to update time elapsed when the object goes out of scope.
 // 'T' must implement the stopWatch "interface" (start,stop,elapsed_time) but
 // we use templates not to pay for virtual function overhead.
-template <class T>
+template <class T, class C>
 class ScopedRawTimer {
 public:
-    ScopedRawTimer(int64_t* counter) : _counter(counter) { _sw.start(); }
+    ScopedRawTimer(C* counter) : _counter(counter) { _sw.start(); }
     // Update counter when object is destroyed
     ~ScopedRawTimer() { *_counter += _sw.elapsed_time(); }
 
@@ -683,7 +686,7 @@ private:
     ScopedRawTimer& operator=(const ScopedRawTimer& timer);
 
     T _sw;
-    int64_t* _counter;
+    C* _counter;
 };
 
 } // namespace doris
diff --git a/be/src/util/spinlock.h b/be/src/util/spinlock.h
index 13a227f..1329e52 100644
--- a/be/src/util/spinlock.h
+++ b/be/src/util/spinlock.h
@@ -38,10 +38,7 @@ public:
     }
 
     void unlock() {
-        // Memory barrier here. All updates before the unlock need to be made visible.
-        __sync_synchronize();
-        DCHECK(_locked);
-        _locked = false;
+        __sync_bool_compare_and_swap(&_locked, true, false);
     }
 
     // Tries to acquire the lock
diff --git a/be/src/util/thread.cpp b/be/src/util/thread.cpp
index ed97860..3c7a45b 100644
--- a/be/src/util/thread.cpp
+++ b/be/src/util/thread.cpp
@@ -35,6 +35,7 @@
 #include "gutil/once.h"
 #include "gutil/strings/substitute.h"
 #include "olap/olap_define.h"
+#include "util/debug/sanitizer_scopes.h"
 #include "util/easy_json.h"
 #include "util/mutex.h"
 #include "util/os_util.h"
@@ -149,7 +150,7 @@ void ThreadMgr::add_thread(const pthread_t& pthread_id, const std::string& name,
     // relationship between thread functors, ignoring potential data races.
     // The annotations prevent this from happening.
     ANNOTATE_IGNORE_SYNC_BEGIN();
-    ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
+    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
     {
         MutexLock l(&_lock);
         _thread_categories[category][pthread_id] = ThreadDescriptor(category, name, tid);
@@ -157,12 +158,11 @@ void ThreadMgr::add_thread(const pthread_t& pthread_id, const std::string& name,
         _threads_started_metric++;
     }
     ANNOTATE_IGNORE_SYNC_END();
-    ANNOTATE_IGNORE_READS_AND_WRITES_END();
 }
 
 void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& category) {
     ANNOTATE_IGNORE_SYNC_BEGIN();
-    ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
+    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
     {
         MutexLock l(&_lock);
         auto category_it = _thread_categories.find(category);
@@ -171,7 +171,6 @@ void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& ca
         _threads_running_metric--;
     }
     ANNOTATE_IGNORE_SYNC_END();
-    ANNOTATE_IGNORE_READS_AND_WRITES_END();
 }
 
 void ThreadMgr::display_thread_callback(const WebPageHandler::ArgumentMap& args,
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index c9c9bed..afe6f14 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -26,6 +26,7 @@
 #include "gutil/map-util.h"
 #include "gutil/strings/substitute.h"
 #include "gutil/sysinfo.h"
+#include "util/debug/sanitizer_scopes.h"
 #include "util/scoped_cleanup.h"
 #include "util/thread.h"
 
@@ -278,6 +279,7 @@ Status ThreadPool::init() {
 }
 
 void ThreadPool::shutdown() {
+    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
     MutexLock unique_lock(&_lock);
     check_not_pool_thread_unlocked();
 
@@ -476,6 +478,7 @@ bool ThreadPool::wait_for(const MonoDelta& delta) {
 }
 
 void ThreadPool::dispatch_thread() {
+    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
     MutexLock unique_lock(&_lock);
     InsertOrDie(&_threads, Thread::current_thread());
     DCHECK_GT(_num_threads_pending_start, 0);
diff --git a/be/test/exec/tablet_sink_test.cpp b/be/test/exec/tablet_sink_test.cpp
index 54e3fea..7b4e112 100644
--- a/be/test/exec/tablet_sink_test.cpp
+++ b/be/test/exec/tablet_sink_test.cpp
@@ -307,27 +307,28 @@ public:
                        const ::doris::PTransmitDataParams* request,
                        ::doris::PTransmitDataResult* response,
                        ::google::protobuf::Closure* done) override {
-        done->Run();
+        brpc::ClosureGuard done_guard(done);
     }
 
     void tablet_writer_open(google::protobuf::RpcController* controller,
                             const PTabletWriterOpenRequest* request,
                             PTabletWriterOpenResult* response,
                             google::protobuf::Closure* done) override {
+        brpc::ClosureGuard done_guard(done);
         Status status;
         status.to_protobuf(response->mutable_status());
-        done->Run();
     }
 
     void tablet_writer_add_batch(google::protobuf::RpcController* controller,
                                  const PTabletWriterAddBatchRequest* request,
                                  PTabletWriterAddBatchResult* response,
                                  google::protobuf::Closure* done) override {
+        brpc::ClosureGuard done_guard(done);
         {
             std::lock_guard<std::mutex> l(_lock);
-            row_counters += request->tablet_ids_size();
+            _row_counters += request->tablet_ids_size();
             if (request->eos()) {
-                eof_counters++;
+                _eof_counters++;
             }
             k_add_batch_status.to_protobuf(response->mutable_status());
 
@@ -340,20 +341,19 @@ public:
                 }
             }
         }
-        done->Run();
     }
     void tablet_writer_cancel(google::protobuf::RpcController* controller,
                               const PTabletWriterCancelRequest* request,
                               PTabletWriterCancelResult* response,
                               google::protobuf::Closure* done) override {
-        done->Run();
+        brpc::ClosureGuard done_guard(done);
     }
 
     std::mutex _lock;
-    int64_t eof_counters = 0;
-    int64_t row_counters = 0;
+    int64_t _eof_counters = 0;
+    int64_t _row_counters = 0;
     RowDescriptor* _row_desc = nullptr;
-    std::set<std::string>* _output_set;
+    std::set<std::string>* _output_set = nullptr;
 };
 
 TEST_F(OlapTableSinkTest, normal) {
@@ -453,11 +453,11 @@ TEST_F(OlapTableSinkTest, normal) {
     ASSERT_TRUE(st.ok());
     // close
     st = sink.close(&state, Status::OK());
-    ASSERT_TRUE(st.ok());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: already stopped, skip waiting for close. cancelled/!eos: : 1/1") << st.to_string();
 
     // each node has a eof
-    ASSERT_EQ(2, service->eof_counters);
-    ASSERT_EQ(2 * 2, service->row_counters);
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 2, service->_row_counters);
 
     // 2node * 2
     ASSERT_EQ(1, state.num_rows_load_filtered());
@@ -586,11 +586,11 @@ TEST_F(OlapTableSinkTest, convert) {
     ASSERT_TRUE(st.ok());
     // close
     st = sink.close(&state, Status::OK());
-    ASSERT_TRUE(st.ok());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: already stopped, skip waiting for close. cancelled/!eos: : 1/1") << st.to_string();
 
     // each node has a eof
-    ASSERT_EQ(2, service->eof_counters);
-    ASSERT_EQ(2 * 3, service->row_counters);
+    ASSERT_EQ(2, service->_eof_counters);
+    ASSERT_EQ(2 * 3, service->_row_counters);
 
     // 2node * 2
     ASSERT_EQ(0, state.num_rows_load_filtered());
@@ -966,7 +966,7 @@ TEST_F(OlapTableSinkTest, decimal) {
     ASSERT_TRUE(st.ok());
     // close
     st = sink.close(&state, Status::OK());
-    ASSERT_TRUE(st.ok());
+    ASSERT_TRUE(st.ok() || st.to_string() == "Internal error: already stopped, skip waiting for close. cancelled/!eos: : 1/1") << st.to_string();
 
     ASSERT_EQ(2, output_set.size());
     ASSERT_TRUE(output_set.count("[(12 12.3)]") > 0);
diff --git a/be/test/plugin/plugin_zip_test.cpp b/be/test/plugin/plugin_zip_test.cpp
index 473a6c7..7ae6c95 100644
--- a/be/test/plugin/plugin_zip_test.cpp
+++ b/be/test/plugin/plugin_zip_test.cpp
@@ -80,8 +80,6 @@ public:
         std::cout << "the path: " << _path << std::endl;
     }
 
-    ~PluginZipTest() { _server->stop(); };
-
 public:
     std::string _path;
     std::unique_ptr<EvHttpServer> _server;
diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh
index bbc0d9a..d224070 100755
--- a/thirdparty/build-thirdparty.sh
+++ b/thirdparty/build-thirdparty.sh
@@ -699,6 +699,17 @@ build_js_and_css() {
     cp bootstrap-table.min.css $TP_INSTALL_DIR/webroot/Bootstrap-3.3.7/css
 }
 
+build_tsan_header() {
+    cd $TP_SOURCE_DIR/
+    if [[ ! -f $TSAN_HEADER_FILE ]]; then
+        echo "$TSAN_HEADER_FILE should exist."
+        exit 1
+    fi
+
+    mkdir -p $TP_INSTALL_DIR/include/sanitizer/
+    cp $TSAN_HEADER_FILE $TP_INSTALL_DIR/include/sanitizer/
+}
+
 # See https://github.com/apache/incubator-doris/issues/2910
 # LLVM related codes have already be removed in master, so there is
 # no need to build llvm tool here.
@@ -737,5 +748,6 @@ build_croaringbitmap
 build_orc
 build_cctz
 build_js_and_css
+build_tsan_header
 
 echo "Finihsed to build all thirdparties"
diff --git a/thirdparty/vars.sh b/thirdparty/vars.sh
index 3421d63..020bd55 100644
--- a/thirdparty/vars.sh
+++ b/thirdparty/vars.sh
@@ -300,6 +300,12 @@ BOOTSTRAP_TABLE_CSS_NAME="bootstrap-table.min.css"
 BOOTSTRAP_TABLE_CSS_FILE="bootstrap-table.min.css"
 BOOTSTRAP_TABLE_CSS_MD5SUM="23389d4456da412e36bae30c469a766a"
 
+# tsan_header
+TSAN_HEADER_DOWNLOAD="https://gcc.gnu.org/git/?p=gcc.git;a=blob_plain;f=libsanitizer/include/sanitizer/tsan_interface_atomic.h;hb=refs/heads/releases/gcc-7"
+TSAN_HEADER_NAME="tsan_interface_atomic.h"
+TSAN_HEADER_FILE="tsan_interface_atomic.h"
+TSAN_HEADER_MD5SUM="d72679bea167d6a513d959f5abd149dc"
+
 # all thirdparties which need to be downloaded is set in array TP_ARCHIVES
-export TP_ARCHIVES="LIBEVENT OPENSSL THRIFT LLVM CLANG COMPILER_RT PROTOBUF GFLAGS GLOG GTEST RAPIDJSON SNAPPY GPERFTOOLS ZLIB LZ4 BZIP LZO2 CURL RE2 BOOST MYSQL BOOST_FOR_MYSQL ODBC LEVELDB BRPC ROCKSDB LIBRDKAFKA FLATBUFFERS ARROW BROTLI DOUBLE_CONVERSION ZSTD S2 BITSHUFFLE CROARINGBITMAP ORC JEMALLOC CCTZ DATATABLES BOOTSTRAP_TABLE_JS BOOTSTRAP_TABLE_CSS"
+export TP_ARCHIVES="LIBEVENT OPENSSL THRIFT LLVM CLANG COMPILER_RT PROTOBUF GFLAGS GLOG GTEST RAPIDJSON SNAPPY GPERFTOOLS ZLIB LZ4 BZIP LZO2 CURL RE2 BOOST MYSQL BOOST_FOR_MYSQL ODBC LEVELDB BRPC ROCKSDB LIBRDKAFKA FLATBUFFERS ARROW BROTLI DOUBLE_CONVERSION ZSTD S2 BITSHUFFLE CROARINGBITMAP ORC JEMALLOC CCTZ DATATABLES BOOTSTRAP_TABLE_JS BOOTSTRAP_TABLE_CSS TSAN_HEADER"
 
diff --git a/tsan_suppressions b/tsan_suppressions
new file mode 100644
index 0000000..a01346f
--- /dev/null
+++ b/tsan_suppressions
@@ -0,0 +1,24 @@
+mutex:boost::condition_variable::wait(boost::unique_lock<boost::mutex>&)
+mutex:brpc::*
+mutex:doris::ConditionVariable::wait_until(doris::MonoTime const&) const
+mutex:doris::ConditionVariable::wait() const
+race:boost::intrusive::list_node_traits<void*>::get_next(boost::intrusive::list_node<void*> const* const&)
+race:brpc::*
+race:butil::*
+race:bvar::*
+race:doris::CountDownLatch::wait_until(doris::MonoTime const&) const
+race:doris::PBackendService::*
+race:doris::PStatus::status_code() const
+race:doris::PTabletWriterAddBatchResult::*
+race:doris::PTabletWriterOpenResult::*
+race:doris::RefCountClosure<doris::PTabletWriterOpenResult>::unref()
+race:doris::stream_load::TestInternalService::tablet_writer_add_batch(google::protobuf::RpcController*, doris::PTabletWriterAddBatchRequest const*, doris::PTabletWriterAddBatchResult*, google::protobuf::Closure*)
+race:glog_internal_namespace_::*
+race:google::protobuf::*
+race:operator delete(void*)
+race:std::_Bit_reference::operator bool() const
+race:std::char_traits<char>::compare(char const*, char const*, unsigned long)
+race:std::char_traits<char>::copy(char*, char const*, unsigned long)
+race:std::lock_guard<int volatile>::lock_guard(int volatile&)
+race:std::lock_guard<int volatile>::~lock_guard()
+race:void google::protobuf::internal::RepeatedPtrFieldBase::Clear<google::protobuf::RepeatedPtrField<doris::PTabletInfo>::TypeHandler>()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org