You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/02/10 20:54:50 UTC
[impala] branch master updated: IMPALA-5904: Add full_tsan option
and fix several TSAN bugs
This is an automated email from the ASF dual-hosted git repository.
stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new ca6c8d4 IMPALA-5904: Add full_tsan option and fix several TSAN bugs
ca6c8d4 is described below
commit ca6c8d43d78de3c4fa5e4606882b0865e2bce5b4
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Mon Jan 20 18:06:32 2020 -0800
IMPALA-5904: Add full_tsan option and fix several TSAN bugs
This patch adds an additional build flag -full_tsan in addition to the
existing -tsan build flag. -full_tsan is equivalent to the current -tsan
behavior, and -tsan is changed to set the ignore_noninstrumented_modules
flag to true. ignore_noninstrumented_modules causes TSAN to ignore any
modules that are not TSAN-instrumented. This is necessary to get TSAN to
play nicely with Java, since Java is not TSAN-instrumented (see
https://wiki.openjdk.java.net/display/tsan/Main and JDK-8208520). While
this might decrease the number of issues surfaced by TSAN, it drastically
decreases the amount of noise produced by TSAN because the JVM is not
running TSAN-instrumented code. Without this flag set to true, almost
every single backend test fails with the error:
WARNING: ThreadSanitizer: data race (pid=12939)
Write of size 1 at 0x7fcbe379c4c6 by thread T31:
#0 strncpy /mnt/source/llvm/llvm-5.0.1.src-p2/projects/compiler-rt/lib/tsan/rtl/tsan_interceptors.cc:650 (unifiedbetests+0x1b2a4ad)
#1 <null> <null> (libjvm.so+0x90e706)
This patch fixes various TSAN bugs (e.g. data races) reported while
running backend tests and E2E against a TSAN build (it does not make
Impala completely TSAN-clean). This patch makes the following changes:
* Fixes several bugs involving issues with updating shared variables
between threads
* Fixes a few race conditions in test classes
* Where possible, existing locks are used to fix any data races; in cases
where the locking logic is non-trivial, atomics are used
* There are a few places where variables are marked as 'volatile'
presumably for synchronization purposes; TSAN flags these 'volatile'
variables as unsafe, and according to
https://github.com/isocpp/CppCoreGuidelines/blob/master/CppCoreGuidelines.md#Rconc-volatile
using 'volatile' for synchronization is dangerous; in these cases, the
'volatile' variables are changed to 'atomic' variables
* This patch adds a suppression file (bin/tsan-suppresions.txt) similar to
the UBSAN suppresion file (bin/ubsan-suppresions.txt)
Testing:
* Ran exhaustive tests
* Ran core tests w/ ASAN build
* Manually re-ran backend tests against a TSAN build and made sure the
reported errors are gone
Change-Id: I3d7ef5c228afd5882e145e6f53885b355d6c25a0
Reviewed-on: http://gerrit.cloudera.org:8080/15116
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
CMakeLists.txt | 3 +-
be/CMakeLists.txt | 15 +++-
be/src/common/init.cc | 21 ++++-
be/src/exec/hdfs-scan-node-base.cc | 4 +-
be/src/exec/hdfs-scan-node-base.h | 2 +-
be/src/exec/hdfs-scan-node-mt.cc | 2 +-
be/src/exec/hdfs-scan-node.cc | 6 +-
be/src/rpc/thrift-thread.cc | 2 +-
be/src/rpc/thrift-thread.h | 4 +-
be/src/runtime/bufferpool/reservation-tracker.cc | 102 ++++++++++++-----------
be/src/runtime/bufferpool/reservation-tracker.h | 22 ++---
be/src/runtime/bufferpool/system-allocator.cc | 6 +-
be/src/runtime/coordinator-backend-state.cc | 10 ++-
be/src/runtime/coordinator.cc | 10 +--
be/src/runtime/coordinator.h | 4 +-
be/src/runtime/data-stream-test.cc | 7 +-
be/src/runtime/io/data-cache.cc | 21 ++---
be/src/runtime/io/disk-io-mgr-stress.cc | 6 +-
be/src/runtime/io/disk-io-mgr-stress.h | 2 +-
be/src/runtime/io/scan-range.cc | 11 ++-
be/src/service/session-expiry-test.cc | 4 +
be/src/statestore/statestore-test.cc | 1 +
be/src/util/runtime-profile-test.cc | 16 +++-
be/src/util/stopwatch.h | 7 +-
bin/run-backend-tests.sh | 14 +++-
bin/tsan-suppressions.txt | 32 +++++++
buildall.sh | 23 ++++-
tests/common/environ.py | 4 +-
tests/webserver/test_web_pages.py | 2 +-
29 files changed, 241 insertions(+), 122 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index c355218..52a60ee 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -232,7 +232,8 @@ if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG"
OR "${CMAKE_BUILD_TYPE}" STREQUAL "TIDY"
OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN"
OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN_FULL"
- OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
+ OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN"
+ OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN_FULL")
# Use the LLVM libaries with assertions for debug builds.
set(LLVM_ROOT ${LLVM_DEBUG_ROOT})
endif()
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 978cae3..c2d3ea6 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -130,7 +130,8 @@ SET(CXX_FLAGS_UBSAN "${CXX_FLAGS_UBSAN} -O0")
# Set the flags to the thread sanitizer, also known as "tsan"
# Turn on sanitizer and debug symbols to get stack traces:
SET(CXX_FLAGS_TSAN "${CXX_CLANG_FLAGS} -Werror -O1 -ggdb3 -fno-omit-frame-pointer")
-SET(CXX_FLAGS_TSAN "${CXX_FLAGS_TSAN} -fsanitize=thread -DTHREAD_SANITIZER")
+SET(CXX_FLAGS_TSAN "${CXX_FLAGS_TSAN} -fsanitize=thread -DTHREAD_SANITIZER -DDYNAMIC_ANNOTATIONS_ENABLED")
+SET(CXX_FLAGS_TSAN "${CXX_FLAGS_TSAN} -DTHREAD_SANITIZER_SUPPRESSIONS=\\\"$ENV{IMPALA_HOME}/bin/tsan-suppressions.txt\\\"")
SET(CXX_FLAGS_TIDY "${CXX_CLANG_FLAGS}")
# Catching unused variables requires an optimization level greater than 0
@@ -163,6 +164,9 @@ elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN_FULL")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DUNDEFINED_SANITIZER_FULL")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
SET(CMAKE_CXX_FLAGS "${CXX_FLAGS_TSAN}")
+elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TSAN_FULL")
+ SET(CMAKE_CXX_FLAGS "${CXX_FLAGS_TSAN}")
+ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DTHREAD_SANITIZER_FULL")
else()
message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
endif()
@@ -188,7 +192,8 @@ if (CCACHE AND NOT DEFINED ENV{DISABLE_CCACHE})
OR "${CMAKE_BUILD_TYPE}" STREQUAL "TIDY"
OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN"
OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN_FULL"
- OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
+ OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN"
+ OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN_FULL")
# Need to set CCACHE_CPP so that ccache calls clang with the original source file for
# both preprocessing and compilation. Otherwise, ccache will use clang to preprocess
# the file and then call clang with the preprocessed output if not cached. However,
@@ -337,7 +342,8 @@ if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG" OR
"${CMAKE_BUILD_TYPE}" STREQUAL "ADDRESS_SANITIZER" OR
"${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN" OR
"${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN_FULL" OR
- "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
+ "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN" OR
+ "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN_FULL")
set(BUILD_OUTPUT_ROOT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/build/debug/")
else()
set(BUILD_OUTPUT_ROOT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/build/release/")
@@ -542,7 +548,8 @@ endif()
# malloc/free)
set (IMPALA_LINK_LIBS_NO_TCMALLOC ${IMPALA_LINK_LIBS})
if (NOT "${CMAKE_BUILD_TYPE}" STREQUAL "ADDRESS_SANITIZER" AND
- NOT "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
+ NOT "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN" AND
+ NOT "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN_FULL")
set (IMPALA_LINK_LIBS ${IMPALA_LINK_LIBS} tcmallocstatic)
set (UNIFIED_TEST_LINK_LIBS ${UNIFIED_TEST_LINK_LIBS} tcmallocstatic)
endif()
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index e395afc..022406b 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -416,10 +416,23 @@ extern "C" const char *__asan_default_options() {
#endif
#if defined(THREAD_SANITIZER)
-// Default TSAN_OPTIONS. Override by setting environment variable $TSAN_OPTIONS.
-extern "C" const char *__tsan_default_options() {
- // Note that backend test should re-configure to halt_on_error=1
- return "halt_on_error=0 history_size=7";
+extern "C" const char* __tsan_default_options() {
+ // Default TSAN_OPTIONS. Override by setting environment variable $TSAN_OPTIONS.
+ // TSAN and Java don't play nicely together because JVM code is not instrumented with
+ // TSAN. TSAN requires all libs to be compiled with '-fsanitize=thread' (see
+ // https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual#non-instrumented-code),
+ // which is not currently possible for Java code. See
+ // https://wiki.openjdk.java.net/display/tsan/Main and JDK-8208520 for efforts to get
+ // TSAN to run against Java code. The flag ignore_noninstrumented_modules tells TSAN to
+ // ignore all interceptors called from any non-instrumented libraries (e.g. Java).
+ return "ignore_noninstrumented_modules="
+#if defined(THREAD_SANITIZER_FULL)
+ "0 "
+#else
+ "1 "
+#endif
+ "halt_on_error=0 history_size=7 allocator_may_return_null=1 "
+ "suppressions=" THREAD_SANITIZER_SUPPRESSIONS;
}
#endif
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 67fd2fd..49ba920 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -564,8 +564,8 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
}
Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
- DCHECK(!initial_ranges_issued_);
- initial_ranges_issued_ = true;
+ DCHECK(!initial_ranges_issued_.Load());
+ initial_ranges_issued_.Store(true);
// We want to decrement this remaining_scan_range_submissions in all cases.
auto remaining_scan_range_submissions_trigger =
MakeScopeExitTrigger([&](){ UpdateRemainingScanRangeSubmissions(-1); });
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 8e118bb..760172b 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -490,7 +490,7 @@ class HdfsScanNodeBase : public ScanNode {
/// Set to true when the initial scan ranges are issued to the IoMgr. This happens on
/// the first call to GetNext(). The token manager, in a different thread, will read
/// this variable.
- bool initial_ranges_issued_ = false;
+ AtomicBool initial_ranges_issued_;
/// When this counter drops to 0, AddDiskIoRanges() will not be called again, and
/// therefore scanner threads that can't get work should exit. For most
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index d8a2b02..2eeb8b8 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -51,7 +51,7 @@ Status HdfsScanNodeMt::Prepare(RuntimeState* state) {
Status HdfsScanNodeMt::Open(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(HdfsScanNodeBase::Open(state));
- DCHECK(!initial_ranges_issued_);
+ DCHECK(!initial_ranges_issued_.Load());
RETURN_IF_ERROR(IssueInitialScanRanges(state));
return Status::OK();
}
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 63b57ad..cb0eb01 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -81,7 +81,7 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedGetNextEventAdder ea(this, eos);
- if (!initial_ranges_issued_) {
+ if (!initial_ranges_issued_.Load()) {
// We do this in GetNext() to maximise the amount of work we can do while waiting for
// runtime filters to show up. The scanner threads have already started (in Open()),
// so we need to tell them there is work to do.
@@ -181,7 +181,7 @@ void HdfsScanNode::Close(RuntimeState* state) {
// At this point, the other threads have been joined, and
// remaining_scan_range_submissions_ should be 0, if the
// query started and wasn't cancelled or exited early.
- if (ranges_issued_barrier_.pending() == 0 && initial_ranges_issued_
+ if (ranges_issued_barrier_.pending() == 0 && initial_ranges_issued_.Load()
&& progress_.done()) {
DCHECK_EQ(remaining_scan_range_submissions_.Load(), 0);
}
@@ -269,7 +269,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
// Case 4. We have not issued the initial ranges so don't start a scanner thread.
// Issuing ranges will call this function and we'll start the scanner threads then.
// TODO: It would be good to have a test case for that.
- if (!initial_ranges_issued_) return;
+ if (!initial_ranges_issued_.Load()) return;
ScannerMemLimiter* scanner_mem_limiter =
runtime_state_->query_state()->scanner_mem_limiter();
diff --git a/be/src/rpc/thrift-thread.cc b/be/src/rpc/thrift-thread.cc
index ef2144d..307605a 100644
--- a/be/src/rpc/thrift-thread.cc
+++ b/be/src/rpc/thrift-thread.cc
@@ -57,7 +57,7 @@ void ThriftThread::join() {
boost::shared_ptr<atc::Thread> ThriftThreadFactory::newThread(
boost::shared_ptr<atc::Runnable> runnable) const {
stringstream name;
- name << prefix_ << "-" << count_++;
+ name << prefix_ << "-" << count_.Add(1);
boost::shared_ptr<ThriftThread> result =
boost::shared_ptr<ThriftThread>(new ThriftThread(group_, name.str(), runnable));
runnable->thread(result);
diff --git a/be/src/rpc/thrift-thread.h b/be/src/rpc/thrift-thread.h
index 468964b..e0aaa08 100644
--- a/be/src/rpc/thrift-thread.h
+++ b/be/src/rpc/thrift-thread.h
@@ -36,7 +36,7 @@ class ThriftThreadFactory : public apache::thrift::concurrency::ThreadFactory {
/// Group is the thread group for new threads to be assigned to, and prefix is the
/// per-thread prefix (threads are named "prefix-<count_>-<tid>").
ThriftThreadFactory(const std::string& group, const std::string& prefix)
- : group_(group), prefix_(prefix), count_(0) { }
+ : group_(group), prefix_(prefix), count_(-1) { }
/// (From ThreadFactory) - creates a new ThriftThread to run the supplied Runnable.
virtual boost::shared_ptr<apache::thrift::concurrency::Thread> newThread(
@@ -55,7 +55,7 @@ class ThriftThreadFactory : public apache::thrift::concurrency::ThreadFactory {
/// Marked mutable because we want to increment it inside newThread, which for some
/// reason is const.
- mutable int64_t count_;
+ mutable AtomicInt64 count_;
};
/// A ThriftThread is a Thrift-compatible wrapper for Impala's Thread class, so that all
diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc b/be/src/runtime/bufferpool/reservation-tracker.cc
index 205c6f6..433ca46 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker.cc
@@ -45,14 +45,14 @@ void ReservationTracker::InitRootTracker(
DCHECK(!initialized_);
parent_ = nullptr;
mem_tracker_ = nullptr;
- reservation_limit_ = reservation_limit;
- reservation_ = 0;
- used_reservation_ = 0;
- child_reservations_ = 0;
+ reservation_limit_.Store(reservation_limit);
+ reservation_.Store(0);
+ used_reservation_.Store(0);
+ child_reservations_.Store(0);
initialized_ = true;
- InitCounters(profile, reservation_limit_);
- COUNTER_SET(counters_.peak_reservation, reservation_);
+ InitCounters(profile, reservation_limit);
+ COUNTER_SET(counters_.peak_reservation, 0);
CheckConsistency();
}
@@ -69,10 +69,10 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
mem_tracker_ = mem_tracker;
mem_limit_mode_ = mem_limit_mode;
- reservation_limit_ = reservation_limit;
- reservation_ = 0;
- used_reservation_ = 0;
- child_reservations_ = 0;
+ reservation_limit_.Store(reservation_limit);
+ reservation_.Store(0);
+ used_reservation_.Store(0);
+ child_reservations_.Store(0);
initialized_ = true;
if (mem_tracker_ != nullptr) {
@@ -95,7 +95,7 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
}
}
- InitCounters(profile, reservation_limit_);
+ InitCounters(profile, reservation_limit);
CheckConsistency();
}
@@ -126,10 +126,10 @@ void ReservationTracker::Close() {
lock_guard<SpinLock> l(lock_);
if (!initialized_) return;
CheckConsistency();
- DCHECK_EQ(used_reservation_, 0);
- DCHECK_EQ(child_reservations_, 0);
+ DCHECK_EQ(used_reservation_.Load(), 0);
+ DCHECK_EQ(child_reservations_.Load(), 0);
// Release any reservation to parent.
- if (parent_ != nullptr) DecreaseReservationLocked(reservation_, false);
+ if (parent_ != nullptr) DecreaseReservationLocked(reservation_.Load(), false);
mem_tracker_ = nullptr;
parent_ = nullptr;
initialized_ = false;
@@ -175,7 +175,7 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
Substitute("Debug random failure mode is turned on: Reservation of $0 denied.",
PrettyPrinter::Print(bytes, TUnit::BYTES)));
}
- } else if (reservation_ + reservation_increase > reservation_limit_) {
+ } else if (reservation_.Load() + reservation_increase > reservation_limit_.Load()) {
granted = false;
if (error_status != nullptr) {
MemTracker* mem_tracker = mem_tracker_;
@@ -190,10 +190,10 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
"reservation=$3 used_reservation=$4 child_reservations=$5",
PrettyPrinter::Print(bytes, TUnit::BYTES),
mem_tracker == nullptr ? "Process" : mem_tracker->label(),
- PrettyPrinter::Print(reservation_limit_, TUnit::BYTES),
- PrettyPrinter::Print(reservation_, TUnit::BYTES),
- PrettyPrinter::Print(used_reservation_, TUnit::BYTES),
- PrettyPrinter::Print(child_reservations_, TUnit::BYTES));
+ PrettyPrinter::Print(reservation_limit_.Load(), TUnit::BYTES),
+ PrettyPrinter::Print(reservation_.Load(), TUnit::BYTES),
+ PrettyPrinter::Print(used_reservation_.Load(), TUnit::BYTES),
+ PrettyPrinter::Print(child_reservations_.Load(), TUnit::BYTES));
string top_n_queries = mem_tracker->LogTopNQueries(5);
if (!top_n_queries.empty()) {
error_msg = Substitute(
@@ -228,7 +228,7 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
// The reservation was granted and state updated in all ancestors: we can modify
// this tracker's state now.
UpdateReservation(reservation_increase);
- if (is_child_reservation) child_reservations_ += bytes;
+ if (is_child_reservation) child_reservations_.Add(bytes);
}
CheckConsistency();
@@ -270,9 +270,9 @@ void ReservationTracker::DecreaseReservation(int64_t bytes, bool is_child_reserv
void ReservationTracker::DecreaseReservationLocked(
int64_t bytes, bool is_child_reservation) {
DCHECK(initialized_);
- DCHECK_GE(reservation_, bytes);
+ DCHECK_GE(reservation_.Load(), bytes);
if (bytes == 0) return;
- if (is_child_reservation) child_reservations_ -= bytes;
+ if (is_child_reservation) child_reservations_.Add(-bytes);
UpdateReservation(-bytes);
ReleaseToMemTracker(bytes);
// The reservation should be returned up the tree.
@@ -325,7 +325,9 @@ bool ReservationTracker::TransferReservationTo(ReservationTracker* other, int64_
// Check reservation limits will not be violated before applying any updates.
for (ReservationTracker* tracker : other_path_to_common) {
- if (tracker->reservation_ + bytes > tracker->reservation_limit_) return false;
+ if (tracker->reservation_.Load() + bytes > tracker->reservation_limit_.Load()) {
+ return false;
+ }
}
// Do the updates now that we have checked the limits. We're holding all the locks
@@ -337,10 +339,10 @@ bool ReservationTracker::TransferReservationTo(ReservationTracker* other, int64_
DCHECK(tracker->mem_tracker_ == nullptr || !tracker->mem_tracker_->has_limit());
bool success = tracker->TryConsumeFromMemTracker(bytes, MemLimit::HARD);
DCHECK(success);
- if (tracker != other_path_to_common[0]) tracker->child_reservations_ += bytes;
+ if (tracker != other_path_to_common[0]) tracker->child_reservations_.Add(bytes);
}
for (ReservationTracker* tracker : path_to_common) {
- if (tracker != path_to_common[0]) tracker->child_reservations_ -= bytes;
+ if (tracker != path_to_common[0]) tracker->child_reservations_.Add(-bytes);
tracker->UpdateReservation(-bytes);
tracker->ReleaseToMemTracker(bytes);
}
@@ -348,15 +350,19 @@ bool ReservationTracker::TransferReservationTo(ReservationTracker* other, int64_
// Update the 'child_reservations_' on the common ancestor if needed.
// Case 1: reservation was pushed up to 'other'.
if (common_ancestor == other) {
+ other->child_reservations_.Add(-bytes);
+#ifndef NDEBUG
lock_guard<SpinLock> l(other->lock_);
- other->child_reservations_ -= bytes;
other->CheckConsistency();
+#endif
}
// Case 2: reservation was pushed down below 'this'.
if (common_ancestor == this) {
+ child_reservations_.Add(bytes);
+#ifndef NDEBUG
lock_guard<SpinLock> l(lock_);
- child_reservations_ += bytes;
CheckConsistency();
+#endif
}
return true;
}
@@ -388,7 +394,7 @@ void ReservationTracker::ReleaseTo(int64_t bytes) {
lock_guard<SpinLock> l(lock_);
DCHECK(initialized_);
DCHECK_GE(bytes, 0);
- DCHECK_LE(bytes, used_reservation_);
+ DCHECK_LE(bytes, used_reservation_.Load());
UpdateUsedReservation(-bytes);
CheckConsistency();
}
@@ -397,14 +403,14 @@ int64_t ReservationTracker::GetReservation() {
// Don't acquire lock - there is no point in holding it for this function only since
// the value read can change as soon as we release it.
DCHECK(initialized_);
- return base::subtle::Acquire_Load(&reservation_);
+ return reservation_.Load();
}
int64_t ReservationTracker::GetUsedReservation() {
// Don't acquire lock - there is no point in holding it for this function only since
// the value read can change as soon as we release it.
DCHECK(initialized_);
- return base::subtle::Acquire_Load(&used_reservation_);
+ return used_reservation_.Load();
}
int64_t ReservationTracker::GetUnusedReservation() {
@@ -417,35 +423,35 @@ int64_t ReservationTracker::GetChildReservations() {
// Don't acquire lock - there is no point in holding it for this function only since
// the value read can change as soon as we release it.
DCHECK(initialized_);
- return base::subtle::Acquire_Load(&child_reservations_);
+ return child_reservations_.Load();
}
void ReservationTracker::CheckConsistency() const {
// Check internal invariants.
- DCHECK_GE(reservation_, 0);
- DCHECK_LE(reservation_, reservation_limit_);
- DCHECK_GE(child_reservations_, 0);
- DCHECK_GE(used_reservation_, 0);
- DCHECK_LE(used_reservation_ + child_reservations_, reservation_);
-
- DCHECK_EQ(reservation_, counters_.peak_reservation->current_value());
- DCHECK_LE(reservation_, counters_.peak_reservation->value());
- DCHECK_EQ(used_reservation_, counters_.peak_used_reservation->current_value());
- DCHECK_LE(used_reservation_, counters_.peak_used_reservation->value());
+ DCHECK_GE(reservation_.Load(), 0);
+ DCHECK_LE(reservation_.Load(), reservation_limit_.Load());
+ DCHECK_GE(child_reservations_.Load(), 0);
+ DCHECK_GE(used_reservation_.Load(), 0);
+ DCHECK_LE(used_reservation_.Load() + child_reservations_.Load(), reservation_.Load());
+
+ DCHECK_EQ(reservation_.Load(), counters_.peak_reservation->current_value());
+ DCHECK_LE(reservation_.Load(), counters_.peak_reservation->value());
+ DCHECK_EQ(used_reservation_.Load(), counters_.peak_used_reservation->current_value());
+ DCHECK_LE(used_reservation_.Load(), counters_.peak_used_reservation->value());
if (counters_.reservation_limit != nullptr) {
- DCHECK_EQ(reservation_limit_, counters_.reservation_limit->value());
+ DCHECK_EQ(reservation_limit_.Load(), counters_.reservation_limit->value());
}
}
void ReservationTracker::UpdateUsedReservation(int64_t delta) {
- used_reservation_ += delta;
- COUNTER_SET(counters_.peak_used_reservation, used_reservation_);
+ int64_t used_reservation = used_reservation_.Add(delta);
+ COUNTER_SET(counters_.peak_used_reservation, used_reservation);
CheckConsistency();
}
void ReservationTracker::UpdateReservation(int64_t delta) {
- reservation_ += delta;
- COUNTER_SET(counters_.peak_reservation, reservation_);
+ int64_t reservation = reservation_.Add(delta);
+ COUNTER_SET(counters_.peak_reservation, reservation);
CheckConsistency();
}
@@ -457,7 +463,7 @@ string ReservationTracker::DebugString() {
return Substitute(
"<ReservationTracker>: reservation_limit $0 reservation $1 used_reservation $2 "
"child_reservations $3 parent:\n$4",
- reservation_limit_, reservation_, used_reservation_, child_reservations_,
- parent_debug_string);
+ reservation_limit_.Load(), reservation_.Load(), used_reservation_.Load(),
+ child_reservations_.Load(), parent_debug_string);
}
}
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h
index 604db7f..88bd3e6 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -189,7 +189,7 @@ class ReservationTracker {
private:
/// Returns the amount of 'reservation_' that is unused.
inline int64_t unused_reservation() const {
- return reservation_ - used_reservation_ - child_reservations_;
+ return reservation_.Load() - used_reservation_.Load() - child_reservations_.Load();
}
/// Returns the parent's memtracker if 'parent_' is non-NULL, or NULL otherwise.
@@ -308,23 +308,23 @@ class ReservationTracker {
/// reservation increases.
MemLimit mem_limit_mode_;
- /// The maximum reservation in bytes that this tracker can have. Can be read with an
- /// atomic load without holding lock.
- int64_t reservation_limit_;
+ /// The maximum reservation in bytes that this tracker can have. Can be read without
+ /// holding lock.
+ AtomicInt64 reservation_limit_;
/// This tracker's current reservation in bytes. 'reservation_' <= 'reservation_limit_'.
- /// Can be read with an atomic load without holding lock.
- int64_t reservation_;
+ /// Can be read without holding lock.
+ AtomicInt64 reservation_;
/// Total reservation of children in bytes. This is included in 'reservation_'.
- /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
- /// Can be read with an atomic load without holding lock.
- int64_t child_reservations_;
+ /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'. Can be read without
+ /// holding lock.
+ AtomicInt64 child_reservations_;
/// The amount of the reservation currently used by this tracker in bytes.
/// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
- /// Can be read with an atomic load without holding lock.
- int64_t used_reservation_;
+ /// Can be read without holding lock.
+ AtomicInt64 used_reservation_;
};
}
diff --git a/be/src/runtime/bufferpool/system-allocator.cc b/be/src/runtime/bufferpool/system-allocator.cc
index 8dc4625..e6b0376 100644
--- a/be/src/runtime/bufferpool/system-allocator.cc
+++ b/be/src/runtime/bufferpool/system-allocator.cc
@@ -121,9 +121,9 @@ Status SystemAllocator::AllocateViaMalloc(int64_t len, uint8_t** buffer_mem) {
// This ensures that it can be backed by a whole pages, rather than parts of pages.
size_t alignment = use_huge_pages ? HUGE_PAGE_SIZE : SMALL_PAGE_SIZE;
int rc = posix_memalign(reinterpret_cast<void**>(buffer_mem), alignment, len);
-#ifdef ADDRESS_SANITIZER
- // Workaround ASAN bug where posix_memalign returns 0 even when allocation fails.
- // It should instead return ENOMEM. See https://bugs.llvm.org/show_bug.cgi?id=32968.
+#ifdef THREAD_SANITIZER
+ // Workaround TSAN bug where posix_memalign returns 0 even when allocation fails. It
+ // should instead return ENOMEM. See https://reviews.llvm.org/D35690.
if (rc == 0 && *buffer_mem == nullptr && len != 0) rc = ENOMEM;
#endif
if (rc != 0) {
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index f4c5859..4a4243c 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -175,7 +175,15 @@ void Coordinator::BackendState::Exec(const DebugOptions& debug_options,
const kudu::Slice& serialized_query_ctx, CountingBarrier* exec_complete_barrier) {
const auto trigger = MakeScopeExitTrigger([&]() {
// Ensure that 'last_report_time_ms_' is set prior to the barrier being notified.
- last_report_time_ms_ = GenerateReportTimestamp();
+ {
+ // Since last_report_time_ms_ is protected by lock_ it must be acquired before
+ // updating last_report_time_ms_. The lock_ is guaranteed to not be held by this
+ // method when this lambda runs since it has not already been acquired by the
+ // method. The lambda executes in an object destructor and C++ destroys objects in
+ // the reverse order they were created.
+ lock_guard<mutex> lock(lock_);
+ last_report_time_ms_ = GenerateReportTimestamp();
+ }
exec_complete_barrier->Notify();
});
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 486a810..07cfadc 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -677,7 +677,7 @@ Status Coordinator::FinalizeHdfsDml() {
// All instances must have reported their final statuses before finalization, which is a
// post-condition of Wait. If the query was not successful, still try to clean up the
// staging directory.
- DCHECK(has_called_wait_);
+ DCHECK(has_called_wait_.Load());
DCHECK(finalize_params() != nullptr);
bool is_transactional = finalize_params()->__isset.write_id;
@@ -730,8 +730,8 @@ void Coordinator::WaitForBackends() {
Status Coordinator::Wait() {
lock_guard<SpinLock> l(wait_lock_);
SCOPED_TIMER(query_profile_->total_time_counter());
- if (has_called_wait_) return Status::OK();
- has_called_wait_ = true;
+ if (has_called_wait_.Load()) return Status::OK();
+ has_called_wait_.Store(true);
if (stmt_type_ == TStmtType::QUERY) {
DCHECK(coord_instance_ != nullptr);
@@ -756,7 +756,7 @@ Status Coordinator::Wait() {
Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos,
int64_t block_on_wait_time_us) {
VLOG_ROW << "GetNext() query_id=" << PrintId(query_id());
- DCHECK(has_called_wait_);
+ DCHECK(has_called_wait_.Load());
SCOPED_TIMER(query_profile_->total_time_counter());
if (ReturnedAllResults()) {
@@ -978,7 +978,7 @@ void Coordinator::ComputeQuerySummary() {
// In this case, the query did not even get to start all fragment instances.
// Some of the state that is used below might be uninitialized. In this case,
// the query has made so little progress, reporting a summary is not very useful.
- if (!has_called_wait_) return;
+ if (!has_called_wait_.Load()) return;
if (backend_states_.empty()) return;
// make sure fragment_stats_ are up-to-date
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index c295d66..019f3fb 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -290,7 +290,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// ensures single-threaded execution of Wait(). See lock ordering class comment.
SpinLock wait_lock_;
- bool has_called_wait_ = false; // if true, Wait() was called; protected by wait_lock_
+ /// If true, Wait() was called. When read / written in Wait() the wait_lock_ should
+ /// still be acquired to ensure Wait() is only executed once.
+ AtomicBool has_called_wait_;
BackendResourceState* backend_resource_state_ = nullptr;
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 4ffdb1d..d8dc832 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -517,7 +517,7 @@ class DataStreamTest : public testing::Test {
sender_info_.emplace_back(make_unique<SenderInfo>());
sender_info_.back()->thread_handle.reset(
new thread(&DataStreamTest::Sender, this, num_senders, channel_buffer_size,
- partition_type));
+ partition_type, sender_info_[num_senders].get()));
}
void JoinSenders() {
@@ -527,8 +527,8 @@ class DataStreamTest : public testing::Test {
}
}
- void Sender(
- int sender_num, int channel_buffer_size, TPartitionType::type partition_type) {
+ void Sender(int sender_num, int channel_buffer_size,
+ TPartitionType::type partition_type, SenderInfo* info) {
RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
VLOG_QUERY << "create sender " << sender_num;
const TDataSink& sink = GetSink(partition_type);
@@ -545,7 +545,6 @@ class DataStreamTest : public testing::Test {
EXPECT_OK(sender->Prepare(&state, &tracker_));
EXPECT_OK(sender->Open(&state));
scoped_ptr<RowBatch> batch(CreateRowBatch());
- SenderInfo* info = sender_info_[sender_num].get();
int next_val = 0;
for (int i = 0; i < NUM_BATCHES; ++i) {
GetNextBatch(batch.get(), &next_val);
diff --git a/be/src/runtime/io/data-cache.cc b/be/src/runtime/io/data-cache.cc
index 3e5a8fc..85dd277 100644
--- a/be/src/runtime/io/data-cache.cc
+++ b/be/src/runtime/io/data-cache.cc
@@ -267,17 +267,18 @@ class DataCache::CacheFile {
int64_t Allocate(int64_t len, const std::unique_lock<SpinLock>& partition_lock) {
DCHECK(partition_lock.owns_lock());
DCHECK_EQ(len % PAGE_SIZE, 0);
- DCHECK_EQ(current_offset_ % PAGE_SIZE, 0);
+ int64_t current_offset = current_offset_.Load();
+ DCHECK_EQ(current_offset % PAGE_SIZE, 0);
// Hold the lock in shared mode to check if 'file_' is not closed already.
kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
- if (!allow_append_ || (current_offset_ + len > FLAGS_data_cache_file_max_size_bytes &&
- current_offset_ > 0)) {
+ if (!allow_append_ || (current_offset + len > FLAGS_data_cache_file_max_size_bytes &&
+ current_offset > 0)) {
allow_append_ = false;
return -1;
}
DCHECK(file_);
- int64_t insertion_offset = current_offset_;
- current_offset_ += len;
+ int64_t insertion_offset = current_offset;
+ current_offset_.Add(len);
return insertion_offset;
}
@@ -289,7 +290,7 @@ class DataCache::CacheFile {
// Hold the lock in shared mode to check if 'file_' is not closed already.
kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
if (UNLIKELY(!file_)) return false;
- DCHECK_LE(offset + bytes_to_read, current_offset_);
+ DCHECK_LE(offset + bytes_to_read, current_offset_.Load());
kudu::Status status = file_->Read(offset, Slice(buffer, bytes_to_read));
if (UNLIKELY(!status.ok())) {
LOG(ERROR) << Substitute("Failed to read from $0 at offset $1 for $2 bytes: $3",
@@ -304,11 +305,11 @@ class DataCache::CacheFile {
// already closed.
bool Write(int64_t offset, const uint8_t* buffer, int64_t buffer_len) {
DCHECK_EQ(offset % PAGE_SIZE, 0);
- DCHECK_LE(offset, current_offset_);
+ DCHECK_LE(offset, current_offset_.Load());
// Hold the lock in shared mode to check if 'file_' is not closed already.
kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
if (UNLIKELY(!file_)) return false;
- DCHECK_LE(offset + buffer_len, current_offset_);
+ DCHECK_LE(offset + buffer_len, current_offset_.Load());
kudu::Status status = file_->Write(offset, Slice(buffer, buffer_len));
if (UNLIKELY(!status.ok())) {
LOG(ERROR) << Substitute("Failed to write to $0 at offset $1 for $2 bytes: $3",
@@ -324,7 +325,7 @@ class DataCache::CacheFile {
// Hold the lock in shared mode to check if 'file_' is not closed already.
kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
if (UNLIKELY(!file_)) return;
- DCHECK_LE(offset + hole_size, current_offset_);
+ DCHECK_LE(offset + hole_size, current_offset_.Load());
kudu::Status status = file_->PunchHole(offset, hole_size);
if (UNLIKELY(!status.ok())) {
LOG(DFATAL) << Substitute("Failed to punch hole in $0 at offset $1 for $2 $3",
@@ -345,7 +346,7 @@ class DataCache::CacheFile {
bool allow_append_ = true;
/// The current offset in the file to append to on next insert.
- int64_t current_offset_ = 0;
+ AtomicInt64 current_offset_;
/// This is a reader-writer lock used for synchronization with the deleter thread.
/// It is taken in write mode in Close() and shared mode everywhere else. It's expected
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index 63b1854..cb4ba0b 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -111,7 +111,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
Status status;
char read_buffer[MAX_FILE_LEN];
- while (!shutdown_) {
+ while (!shutdown_.Load()) {
bool eos = false;
int bytes_read = 0;
@@ -193,7 +193,7 @@ void DiskIoMgrStress::CancelRandomReader() {
}
void DiskIoMgrStress::Run(int sec) {
- shutdown_ = false;
+ shutdown_.Store(false);
for (int i = 0; i < num_clients_; ++i) {
readers_.add_thread(
new thread(&DiskIoMgrStress::ClientThread, this, i));
@@ -210,7 +210,7 @@ void DiskIoMgrStress::Run(int sec) {
}
// Signal shutdown for the client threads
- shutdown_ = true;
+ shutdown_.Store(true);
for (int i = 0; i < num_clients_; ++i) {
unique_lock<mutex> lock(clients_[i].lock);
diff --git a/be/src/runtime/io/disk-io-mgr-stress.h b/be/src/runtime/io/disk-io-mgr-stress.h
index 1baef24..edeaaf2 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.h
+++ b/be/src/runtime/io/disk-io-mgr-stress.h
@@ -97,7 +97,7 @@ class DiskIoMgrStress {
bool includes_cancellation_;
/// Flag to signal that client reader threads should exit
- volatile bool shutdown_;
+ AtomicBool shutdown_;
/// Helper to initialize a new reader client, registering a new reader with the
/// io mgr and initializing the scan ranges
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 3c6ff27..cab33d1 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -240,11 +240,14 @@ ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
return ReadOutcome::CANCELLED;
}
- bytes_read_ += buffer_desc->len();
- DCHECK_LE(bytes_read_, bytes_to_read_);
+ {
+ unique_lock<mutex> lock(lock_);
+ bytes_read_ += buffer_desc->len();
+ DCHECK_LE(bytes_read_, bytes_to_read_);
- // It is end of stream if it is end of file, or read all the bytes.
- buffer_desc->eosr_ = eof || bytes_read_ == bytes_to_read_;
+ // It is end of stream if it is end of file, or read all the bytes.
+ buffer_desc->eosr_ = eof || bytes_read_ == bytes_to_read_;
+ }
// After calling EnqueueReadyBuffer(), it is no longer valid to touch 'buffer_desc'.
// Store the state we need before calling EnqueueReadyBuffer().
diff --git a/be/src/service/session-expiry-test.cc b/be/src/service/session-expiry-test.cc
index 6251dd4..7dd68ad 100644
--- a/be/src/service/session-expiry-test.cc
+++ b/be/src/service/session-expiry-test.cc
@@ -51,6 +51,8 @@ DECLARE_int32(beeswax_port);
// instead of destroying it to avoid destroying the contained objects.
static ObjectPool* perm_objects;
+namespace impala {
+
TEST(SessionTest, TestExpiry) {
const int NUM_SESSIONS = 5;
const int MAX_IDLE_TIMEOUT_MS = 4000;
@@ -121,6 +123,8 @@ TEST(SessionTest, TestExpiry) {
statestore->ShutdownForTesting();
}
+} // namespace impala
+
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
diff --git a/be/src/statestore/statestore-test.cc b/be/src/statestore/statestore-test.cc
index f517a56..4d1bac9 100644
--- a/be/src/statestore/statestore-test.cc
+++ b/be/src/statestore/statestore-test.cc
@@ -53,6 +53,7 @@ TEST(StatestoreTest, SmokeTest) {
// Port already in use
Statestore* statestore_wont_start = perm_objects->Add(new Statestore(metrics_2));
ASSERT_FALSE(statestore_wont_start->Init(statestore->port()).ok());
+ statestore_wont_start->ShutdownForTesting();
StatestoreSubscriber* sub_will_start = perm_objects->Add(
new StatestoreSubscriber("sub1", MakeNetworkAddress("localhost", 0),
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index 70d69dc..5a3c7f5 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -826,7 +826,7 @@ class TimerCounterTest {
struct DummyWorker {
thread* thread_handle;
- bool done;
+ AtomicBool done;
DummyWorker()
: thread_handle(NULL), done(false) {}
@@ -835,9 +835,12 @@ class TimerCounterTest {
Stop();
}
+ DummyWorker(const DummyWorker& dummy_worker)
+ : thread_handle(dummy_worker.thread_handle), done(dummy_worker.done.Load()) {}
+
void Stop() {
- if (!done && thread_handle != NULL) {
- done = true;
+ if (!done.Load() && thread_handle != NULL) {
+ done.Store(true);
thread_handle->join();
delete thread_handle;
thread_handle = NULL;
@@ -848,7 +851,7 @@ class TimerCounterTest {
void Run(DummyWorker* worker) {
SCOPED_CONCURRENT_STOP_WATCH(&csw_);
SCOPED_CONCURRENT_COUNTER(&timercounter_);
- while (!worker->done) {
+ while (!worker->done.Load()) {
SleepForMs(10);
// Each test case should be no more than one second.
// Consider test failed if timer is more than 3 seconds.
@@ -970,6 +973,9 @@ TEST(TimerCounterTest, CountersTestRandom) {
ValidateLapTime(&tester, MonotonicStopWatch::Now() - lap_time_start);
}
+// Don't run TestAddClearRace against TSAN builds as it is expected to have race
+// conditions.
+#ifndef THREAD_SANITIZER
TEST(TimeSeriesCounterTest, TestAddClearRace) {
ObjectPool pool;
@@ -1010,6 +1016,8 @@ TEST(TimeSeriesCounterTest, TestAddClearRace) {
EXPECT_EQ(num_samples, 0);
}
+#endif
+
/// Stops the periodic counter updater in 'profile' and then clears the samples in
/// 'counter'.
void StopAndClearCounter(RuntimeProfile* profile,
diff --git a/be/src/util/stopwatch.h b/be/src/util/stopwatch.h
index 9f33bd5..60cdf42 100644
--- a/be/src/util/stopwatch.h
+++ b/be/src/util/stopwatch.h
@@ -227,13 +227,16 @@ class ConcurrentStopWatch {
return lap_duration;
}
- uint64_t TotalRunningTime() const { return msw_.ElapsedTime(); }
+ uint64_t TotalRunningTime() const {
+ boost::lock_guard<SpinLock> l(thread_counter_lock_);
+ return msw_.ElapsedTime();
+ }
private:
MonotonicStopWatch msw_;
/// Lock with busy_threads_.
- SpinLock thread_counter_lock_;
+ mutable SpinLock thread_counter_lock_;
/// Track how many threads are currently busy.
int busy_threads_;
diff --git a/bin/run-backend-tests.sh b/bin/run-backend-tests.sh
index 16a432f..5e1d84f 100755
--- a/bin/run-backend-tests.sh
+++ b/bin/run-backend-tests.sh
@@ -38,7 +38,19 @@ cd ${IMPALA_BE_DIR}
cd ..
export CTEST_OUTPUT_ON_FAILURE=1
+
# Override default TSAN_OPTIONS so that halt_on_error is set.
-export TSAN_OPTIONS="halt_on_error=1 history_size=7"
+# See be/src/common/init.cc and
+# https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual#non-instrumented-code
+# for an explanation of what this flag does.
+IGNORE_NONINSTRUMENTED_MODULES="ignore_noninstrumented_modules="
+if [ "${TSAN_FULL+x}" ]; then
+ IGNORE_NONINSTRUMENTED_MODULES+="0"
+else
+ IGNORE_NONINSTRUMENTED_MODULES+="1"
+fi
+export TSAN_OPTIONS="${IGNORE_NONINSTRUMENTED_MODULES} halt_on_error=1 history_size=7
+ allocator_may_return_null=1 suppressions=${IMPALA_HOME}/bin/tsan-suppressions.txt"
+
export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
"${MAKE_CMD:-make}" test ARGS="${BE_TEST_ARGS}"
diff --git a/bin/tsan-suppressions.txt b/bin/tsan-suppressions.txt
new file mode 100644
index 0000000..adf67c0
--- /dev/null
+++ b/bin/tsan-suppressions.txt
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This file suppresses TSAN errors, following
+# https://github.com/google/sanitizers/wiki/ThreadSanitizerSuppressions
+
+# This method in Boost's UUID library operates on static state with impunity,
+# triggering (harmless) data races in TSAN when boost::uuids::random_generator
+# instances are created across threads (this suppression and the corresponding
+# explanation were copied from Kudu's sanitizer_options.cc).
+race:boost::uuids::detail::seed_rng::sha1_random_digest_
+
+# TODO: IMPALA-9314: The ThriftServer used by the Statestore has various race conditions
+# during the shutdown process (and leaks threads)
+race:impala::StatestoreTest_SmokeTest_Test
+thread:impala::StatestoreTest_SmokeTest_Test
+race:impala::SessionTest_TestExpiry_Test
+race:impala::StatestoreSslTest_ValidCertSmokeTest_Test
diff --git a/buildall.sh b/buildall.sh
index 08ff4bf..a756069 100755
--- a/buildall.sh
+++ b/buildall.sh
@@ -70,6 +70,7 @@ BUILD_TIDY=0
BUILD_UBSAN=0
BUILD_UBSAN_FULL=0
BUILD_TSAN=0
+BUILD_TSAN_FULL=0
BUILD_SHARED_LIBS=0
# Export MAKE_CMD so it is visible in scripts that invoke make, e.g. copy-udfs-udas.sh
export MAKE_CMD=make
@@ -141,6 +142,9 @@ do
-tsan)
BUILD_TSAN=1
;;
+ -full_tsan)
+ BUILD_TSAN_FULL=1
+ ;;
-testpairwise)
EXPLORATION_STRATEGY=pairwise
;;
@@ -206,10 +210,18 @@ do
echo "[-codecoverage] : Build with code coverage [Default: False]"
echo "[-asan] : Address sanitizer build [Default: False]"
echo "[-tidy] : clang-tidy build [Default: False]"
+ echo "[-tsan] : Thread sanitizer build, runs with"\
+ "ignore_noninstrumented_modules=1. When this flag is true, TSAN ignores"\
+ "memory accesses from non-instrumented libraries. This decreases the number"\
+ "of false positives, but might miss real issues. -full_tsan disables this"\
+ "flag [Default: False]"
+ echo "[-full_tsan] : Thread sanitizer build, runs with"\
+ "ignore_noninstrumented_modules=0 (see the -tsan description for an"\
+ "explanation of what this flag does) [Default: False]"
echo "[-ubsan] : Undefined behavior sanitizer build [Default: False]"
echo "[-full_ubsan] : Undefined behavior sanitizer build, including code generated"\
- " by cross-compilation to LLVM IR. Much slower queries than plain -ubsan "\
- " [Default: False]"
+ "by cross-compilation to LLVM IR. Much slower queries than plain -ubsan"\
+ "[Default: False]"
echo "[-skiptests] : Skips execution of all tests"
echo "[-notests] : Skips building and execution of all tests"
echo "[-start_minicluster] : Start test cluster including Impala and all"\
@@ -299,6 +311,10 @@ fi
if [[ ${BUILD_TSAN} -eq 1 ]]; then
CMAKE_BUILD_TYPE_LIST+=(TSAN)
fi
+if [[ ${BUILD_TSAN_FULL} -eq 1 ]]; then
+ CMAKE_BUILD_TYPE_LIST+=(TSAN_FULL)
+ export TSAN_FULL=1
+fi
if [[ -n "${CMAKE_BUILD_TYPE_LIST:+1}" ]]; then
if [[ ${#CMAKE_BUILD_TYPE_LIST[@]} -gt 1 ]]; then
echo "ERROR: more than one CMake build type defined: ${CMAKE_BUILD_TYPE_LIST[@]}"
@@ -433,7 +449,8 @@ generate_cmake_files() {
|| ("$build_type" == "TIDY") \
|| ("$build_type" == "UBSAN") \
|| ("$build_type" == "UBSAN_FULL") \
- || ("$build_type" == "TSAN") ]]; then
+ || ("$build_type" == "TSAN") \
+ || ("$build_type" == "TSAN_FULL") ]]; then
CMAKE_ARGS+=(-DCMAKE_TOOLCHAIN_FILE=$IMPALA_HOME/cmake_modules/clang_toolchain.cmake)
else
CMAKE_ARGS+=(-DCMAKE_TOOLCHAIN_FILE=$IMPALA_HOME/cmake_modules/toolchain.cmake)
diff --git a/tests/common/environ.py b/tests/common/environ.py
index 3fc6230..0987b1b 100644
--- a/tests/common/environ.py
+++ b/tests/common/environ.py
@@ -113,13 +113,15 @@ class ImpalaBuildFlavors:
TIDY = 'tidy'
# ./buildall.sh -tsan
TSAN = 'tsan'
+ # ./buildall.sh -full_tsan
+ TSAN_FULL = 'tsan_full'
# ./buildall.sh -ubsan
UBSAN = 'ubsan'
# ./buildall.sh -full_ubsan
UBSAN_FULL = 'ubsan_full'
VALID_BUILD_TYPES = [ADDRESS_SANITIZER, DEBUG, CODE_COVERAGE_DEBUG, RELEASE,
- CODE_COVERAGE_RELEASE, TIDY, TSAN, UBSAN, UBSAN_FULL]
+ CODE_COVERAGE_RELEASE, TIDY, TSAN, TSAN_FULL, UBSAN, UBSAN_FULL]
class LinkTypes:
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index d94fcfd..523ff12 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -75,7 +75,7 @@ class TestWebPage(ImpalaTestSuite):
assert build_flags["is_ndebug"] in ["true", "false"]
assert "cmake_build_type" in build_flags
assert build_flags["cmake_build_type"] in ["debug", "release", "address_sanitizer",
- "tidy", "ubsan", "ubsan_full", "tsan", "code_coverage_release",
+ "tidy", "ubsan", "ubsan_full", "tsan", "tsan_full", "code_coverage_release",
"code_coverage_debug"]
assert "library_link_type" in build_flags
assert build_flags["library_link_type"] in ["dynamic", "static"]