You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/02/10 20:54:50 UTC

[impala] branch master updated: IMPALA-5904: Add full_tsan option and fix several TSAN bugs

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ca6c8d4  IMPALA-5904: Add full_tsan option and fix several TSAN bugs
ca6c8d4 is described below

commit ca6c8d43d78de3c4fa5e4606882b0865e2bce5b4
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Mon Jan 20 18:06:32 2020 -0800

    IMPALA-5904: Add full_tsan option and fix several TSAN bugs
    
    This patch adds an additional build flag -full_tsan in addition to the
    existing -tsan build flag. -full_tsan is equivalent to the current -tsan
    behavior, and -tsan is changed to set the ignore_noninstrumented_modules
    flag to true. ignore_noninstrumented_modules causes TSAN to ignore any
    modules that are not TSAN-instrumented. This is necessary to get TSAN to
    play nicely with Java, since Java is not TSAN-instrumented (see
    https://wiki.openjdk.java.net/display/tsan/Main and JDK-8208520). While
    this might decrease the number of issues surfaced by TSAN, it drastically
    decreases the amount of noise produced by TSAN because the JVM is not
    running TSAN-instrumented code. Without this flag set to true, almost
    every single backend test fails with the error:
    
    WARNING: ThreadSanitizer: data race (pid=12939)
      Write of size 1 at 0x7fcbe379c4c6 by thread T31:
        #0 strncpy /mnt/source/llvm/llvm-5.0.1.src-p2/projects/compiler-rt/lib/tsan/rtl/tsan_interceptors.cc:650 (unifiedbetests+0x1b2a4ad)
        #1 <null> <null> (libjvm.so+0x90e706)
    
    This patch fixes various TSAN bugs (e.g. data races) reported while
    running backend tests and E2E against a TSAN build (it does not make
    Impala completely TSAN-clean). This patch makes the following changes:
    * Fixes several bugs involving issues with updating shared variables
      between threads
    * Fixes a few race conditions in test classes
    * Where possible, existing locks are used to fix any data races; in cases
      where the locking logic is non-trivial, atomics are used
    * There are a few places where variables are marked as 'volatile'
      presumably for synchronization purposes; TSAN flags these 'volatile'
      variables as unsafe, and according to
      https://github.com/isocpp/CppCoreGuidelines/blob/master/CppCoreGuidelines.md#Rconc-volatile
      using 'volatile' for synchronization is dangerous; in these cases, the
      'volatile' variables are changed to 'atomic' variables
    * This patch adds a suppression file (bin/tsan-suppresions.txt) similar to
      the UBSAN suppresion file (bin/ubsan-suppresions.txt)
    
    Testing:
    * Ran exhaustive tests
    * Ran core tests w/ ASAN build
    * Manually re-ran backend tests against a TSAN build and made sure the
      reported errors are gone
    
    Change-Id: I3d7ef5c228afd5882e145e6f53885b355d6c25a0
    Reviewed-on: http://gerrit.cloudera.org:8080/15116
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 CMakeLists.txt                                   |   3 +-
 be/CMakeLists.txt                                |  15 +++-
 be/src/common/init.cc                            |  21 ++++-
 be/src/exec/hdfs-scan-node-base.cc               |   4 +-
 be/src/exec/hdfs-scan-node-base.h                |   2 +-
 be/src/exec/hdfs-scan-node-mt.cc                 |   2 +-
 be/src/exec/hdfs-scan-node.cc                    |   6 +-
 be/src/rpc/thrift-thread.cc                      |   2 +-
 be/src/rpc/thrift-thread.h                       |   4 +-
 be/src/runtime/bufferpool/reservation-tracker.cc | 102 ++++++++++++-----------
 be/src/runtime/bufferpool/reservation-tracker.h  |  22 ++---
 be/src/runtime/bufferpool/system-allocator.cc    |   6 +-
 be/src/runtime/coordinator-backend-state.cc      |  10 ++-
 be/src/runtime/coordinator.cc                    |  10 +--
 be/src/runtime/coordinator.h                     |   4 +-
 be/src/runtime/data-stream-test.cc               |   7 +-
 be/src/runtime/io/data-cache.cc                  |  21 ++---
 be/src/runtime/io/disk-io-mgr-stress.cc          |   6 +-
 be/src/runtime/io/disk-io-mgr-stress.h           |   2 +-
 be/src/runtime/io/scan-range.cc                  |  11 ++-
 be/src/service/session-expiry-test.cc            |   4 +
 be/src/statestore/statestore-test.cc             |   1 +
 be/src/util/runtime-profile-test.cc              |  16 +++-
 be/src/util/stopwatch.h                          |   7 +-
 bin/run-backend-tests.sh                         |  14 +++-
 bin/tsan-suppressions.txt                        |  32 +++++++
 buildall.sh                                      |  23 ++++-
 tests/common/environ.py                          |   4 +-
 tests/webserver/test_web_pages.py                |   2 +-
 29 files changed, 241 insertions(+), 122 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index c355218..52a60ee 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -232,7 +232,8 @@ if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG"
     OR "${CMAKE_BUILD_TYPE}" STREQUAL "TIDY"
     OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN"
     OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN_FULL"
-    OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
+    OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN"
+    OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN_FULL")
   # Use the LLVM libaries with assertions for debug builds.
   set(LLVM_ROOT ${LLVM_DEBUG_ROOT})
 endif()
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 978cae3..c2d3ea6 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -130,7 +130,8 @@ SET(CXX_FLAGS_UBSAN "${CXX_FLAGS_UBSAN} -O0")
 # Set the flags to the thread sanitizer, also known as "tsan"
 # Turn on sanitizer and debug symbols to get stack traces:
 SET(CXX_FLAGS_TSAN "${CXX_CLANG_FLAGS} -Werror -O1 -ggdb3 -fno-omit-frame-pointer")
-SET(CXX_FLAGS_TSAN "${CXX_FLAGS_TSAN} -fsanitize=thread -DTHREAD_SANITIZER")
+SET(CXX_FLAGS_TSAN "${CXX_FLAGS_TSAN} -fsanitize=thread -DTHREAD_SANITIZER -DDYNAMIC_ANNOTATIONS_ENABLED")
+SET(CXX_FLAGS_TSAN "${CXX_FLAGS_TSAN} -DTHREAD_SANITIZER_SUPPRESSIONS=\\\"$ENV{IMPALA_HOME}/bin/tsan-suppressions.txt\\\"")
 
 SET(CXX_FLAGS_TIDY "${CXX_CLANG_FLAGS}")
 # Catching unused variables requires an optimization level greater than 0
@@ -163,6 +164,9 @@ elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN_FULL")
   SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DUNDEFINED_SANITIZER_FULL")
 elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
   SET(CMAKE_CXX_FLAGS "${CXX_FLAGS_TSAN}")
+elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TSAN_FULL")
+  SET(CMAKE_CXX_FLAGS "${CXX_FLAGS_TSAN}")
+  SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DTHREAD_SANITIZER_FULL")
 else()
   message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
 endif()
@@ -188,7 +192,8 @@ if (CCACHE AND NOT DEFINED ENV{DISABLE_CCACHE})
       OR "${CMAKE_BUILD_TYPE}" STREQUAL "TIDY"
       OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN"
       OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN_FULL"
-      OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
+      OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN"
+      OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN_FULL")
     # Need to set CCACHE_CPP so that ccache calls clang with the original source file for
     # both preprocessing and compilation. Otherwise, ccache will use clang to preprocess
     # the file and then call clang with the preprocessed output if not cached. However,
@@ -337,7 +342,8 @@ if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG" OR
     "${CMAKE_BUILD_TYPE}" STREQUAL "ADDRESS_SANITIZER" OR
     "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN" OR
     "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN_FULL" OR
-    "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
+    "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN" OR
+    "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN_FULL")
   set(BUILD_OUTPUT_ROOT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/build/debug/")
 else()
   set(BUILD_OUTPUT_ROOT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/build/release/")
@@ -542,7 +548,8 @@ endif()
 # malloc/free)
 set (IMPALA_LINK_LIBS_NO_TCMALLOC ${IMPALA_LINK_LIBS})
 if (NOT "${CMAKE_BUILD_TYPE}" STREQUAL "ADDRESS_SANITIZER" AND
-    NOT "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
+    NOT "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN" AND
+    NOT "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN_FULL")
   set (IMPALA_LINK_LIBS ${IMPALA_LINK_LIBS} tcmallocstatic)
   set (UNIFIED_TEST_LINK_LIBS ${UNIFIED_TEST_LINK_LIBS} tcmallocstatic)
 endif()
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index e395afc..022406b 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -416,10 +416,23 @@ extern "C" const char *__asan_default_options() {
 #endif
 
 #if defined(THREAD_SANITIZER)
-// Default TSAN_OPTIONS. Override by setting environment variable $TSAN_OPTIONS.
-extern "C" const char *__tsan_default_options() {
-  // Note that backend test should re-configure to halt_on_error=1
-  return "halt_on_error=0 history_size=7";
+extern "C" const char* __tsan_default_options() {
+  // Default TSAN_OPTIONS. Override by setting environment variable $TSAN_OPTIONS.
+  // TSAN and Java don't play nicely together because JVM code is not instrumented with
+  // TSAN. TSAN requires all libs to be compiled with '-fsanitize=thread' (see
+  // https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual#non-instrumented-code),
+  // which is not currently possible for Java code. See
+  // https://wiki.openjdk.java.net/display/tsan/Main and JDK-8208520 for efforts to get
+  // TSAN to run against Java code. The flag ignore_noninstrumented_modules tells TSAN to
+  // ignore all interceptors called from any non-instrumented libraries (e.g. Java).
+  return "ignore_noninstrumented_modules="
+#if defined(THREAD_SANITIZER_FULL)
+         "0 "
+#else
+         "1 "
+#endif
+         "halt_on_error=0 history_size=7 allocator_may_return_null=1 "
+         "suppressions=" THREAD_SANITIZER_SUPPRESSIONS;
 }
 #endif
 
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 67fd2fd..49ba920 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -564,8 +564,8 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
 }
 
 Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
-  DCHECK(!initial_ranges_issued_);
-  initial_ranges_issued_ = true;
+  DCHECK(!initial_ranges_issued_.Load());
+  initial_ranges_issued_.Store(true);
   // We want to decrement this remaining_scan_range_submissions in all cases.
   auto remaining_scan_range_submissions_trigger =
     MakeScopeExitTrigger([&](){ UpdateRemainingScanRangeSubmissions(-1); });
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 8e118bb..760172b 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -490,7 +490,7 @@ class HdfsScanNodeBase : public ScanNode {
   /// Set to true when the initial scan ranges are issued to the IoMgr. This happens on
   /// the first call to GetNext(). The token manager, in a different thread, will read
   /// this variable.
-  bool initial_ranges_issued_ = false;
+  AtomicBool initial_ranges_issued_;
 
   /// When this counter drops to 0, AddDiskIoRanges() will not be called again, and
   /// therefore scanner threads that can't get work should exit. For most
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index d8a2b02..2eeb8b8 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -51,7 +51,7 @@ Status HdfsScanNodeMt::Prepare(RuntimeState* state) {
 Status HdfsScanNodeMt::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(HdfsScanNodeBase::Open(state));
-  DCHECK(!initial_ranges_issued_);
+  DCHECK(!initial_ranges_issued_.Load());
   RETURN_IF_ERROR(IssueInitialScanRanges(state));
   return Status::OK();
 }
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 63b57ad..cb0eb01 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -81,7 +81,7 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   ScopedGetNextEventAdder ea(this, eos);
 
-  if (!initial_ranges_issued_) {
+  if (!initial_ranges_issued_.Load()) {
     // We do this in GetNext() to maximise the amount of work we can do while waiting for
     // runtime filters to show up. The scanner threads have already started (in Open()),
     // so we need to tell them there is work to do.
@@ -181,7 +181,7 @@ void HdfsScanNode::Close(RuntimeState* state) {
   // At this point, the other threads have been joined, and
   // remaining_scan_range_submissions_ should be 0, if the
   // query started and wasn't cancelled or exited early.
-  if (ranges_issued_barrier_.pending() == 0 && initial_ranges_issued_
+  if (ranges_issued_barrier_.pending() == 0 && initial_ranges_issued_.Load()
       && progress_.done()) {
     DCHECK_EQ(remaining_scan_range_submissions_.Load(), 0);
   }
@@ -269,7 +269,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
   // Case 4. We have not issued the initial ranges so don't start a scanner thread.
   // Issuing ranges will call this function and we'll start the scanner threads then.
   // TODO: It would be good to have a test case for that.
-  if (!initial_ranges_issued_) return;
+  if (!initial_ranges_issued_.Load()) return;
 
   ScannerMemLimiter* scanner_mem_limiter =
       runtime_state_->query_state()->scanner_mem_limiter();
diff --git a/be/src/rpc/thrift-thread.cc b/be/src/rpc/thrift-thread.cc
index ef2144d..307605a 100644
--- a/be/src/rpc/thrift-thread.cc
+++ b/be/src/rpc/thrift-thread.cc
@@ -57,7 +57,7 @@ void ThriftThread::join() {
 boost::shared_ptr<atc::Thread> ThriftThreadFactory::newThread(
     boost::shared_ptr<atc::Runnable> runnable) const {
   stringstream name;
-  name << prefix_ << "-" << count_++;
+  name << prefix_ << "-" << count_.Add(1);
   boost::shared_ptr<ThriftThread> result =
       boost::shared_ptr<ThriftThread>(new ThriftThread(group_, name.str(), runnable));
   runnable->thread(result);
diff --git a/be/src/rpc/thrift-thread.h b/be/src/rpc/thrift-thread.h
index 468964b..e0aaa08 100644
--- a/be/src/rpc/thrift-thread.h
+++ b/be/src/rpc/thrift-thread.h
@@ -36,7 +36,7 @@ class ThriftThreadFactory : public apache::thrift::concurrency::ThreadFactory {
   /// Group is the thread group for new threads to be assigned to, and prefix is the
   /// per-thread prefix (threads are named "prefix-<count_>-<tid>").
   ThriftThreadFactory(const std::string& group, const std::string& prefix)
-      : group_(group), prefix_(prefix), count_(0) { }
+      : group_(group), prefix_(prefix), count_(-1) { }
 
   /// (From ThreadFactory) - creates a new ThriftThread to run the supplied Runnable.
   virtual boost::shared_ptr<apache::thrift::concurrency::Thread> newThread(
@@ -55,7 +55,7 @@ class ThriftThreadFactory : public apache::thrift::concurrency::ThreadFactory {
 
   /// Marked mutable because we want to increment it inside newThread, which for some
   /// reason is const.
-  mutable int64_t count_;
+  mutable AtomicInt64 count_;
 };
 
 /// A ThriftThread is a Thrift-compatible wrapper for Impala's Thread class, so that all
diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc b/be/src/runtime/bufferpool/reservation-tracker.cc
index 205c6f6..433ca46 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker.cc
@@ -45,14 +45,14 @@ void ReservationTracker::InitRootTracker(
   DCHECK(!initialized_);
   parent_ = nullptr;
   mem_tracker_ = nullptr;
-  reservation_limit_ = reservation_limit;
-  reservation_ = 0;
-  used_reservation_ = 0;
-  child_reservations_ = 0;
+  reservation_limit_.Store(reservation_limit);
+  reservation_.Store(0);
+  used_reservation_.Store(0);
+  child_reservations_.Store(0);
   initialized_ = true;
 
-  InitCounters(profile, reservation_limit_);
-  COUNTER_SET(counters_.peak_reservation, reservation_);
+  InitCounters(profile, reservation_limit);
+  COUNTER_SET(counters_.peak_reservation, 0);
 
   CheckConsistency();
 }
@@ -69,10 +69,10 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
   mem_tracker_ = mem_tracker;
   mem_limit_mode_ = mem_limit_mode;
 
-  reservation_limit_ = reservation_limit;
-  reservation_ = 0;
-  used_reservation_ = 0;
-  child_reservations_ = 0;
+  reservation_limit_.Store(reservation_limit);
+  reservation_.Store(0);
+  used_reservation_.Store(0);
+  child_reservations_.Store(0);
   initialized_ = true;
 
   if (mem_tracker_ != nullptr) {
@@ -95,7 +95,7 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
     }
   }
 
-  InitCounters(profile, reservation_limit_);
+  InitCounters(profile, reservation_limit);
 
   CheckConsistency();
 }
@@ -126,10 +126,10 @@ void ReservationTracker::Close() {
   lock_guard<SpinLock> l(lock_);
   if (!initialized_) return;
   CheckConsistency();
-  DCHECK_EQ(used_reservation_, 0);
-  DCHECK_EQ(child_reservations_, 0);
+  DCHECK_EQ(used_reservation_.Load(), 0);
+  DCHECK_EQ(child_reservations_.Load(), 0);
   // Release any reservation to parent.
-  if (parent_ != nullptr) DecreaseReservationLocked(reservation_, false);
+  if (parent_ != nullptr) DecreaseReservationLocked(reservation_.Load(), false);
   mem_tracker_ = nullptr;
   parent_ = nullptr;
   initialized_ = false;
@@ -175,7 +175,7 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
           Substitute("Debug random failure mode is turned on: Reservation of $0 denied.",
               PrettyPrinter::Print(bytes, TUnit::BYTES)));
     }
-  } else if (reservation_ + reservation_increase > reservation_limit_) {
+  } else if (reservation_.Load() + reservation_increase > reservation_limit_.Load()) {
     granted = false;
     if (error_status != nullptr) {
       MemTracker* mem_tracker = mem_tracker_;
@@ -190,10 +190,10 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
           "reservation=$3 used_reservation=$4 child_reservations=$5",
           PrettyPrinter::Print(bytes, TUnit::BYTES),
           mem_tracker == nullptr ? "Process" : mem_tracker->label(),
-          PrettyPrinter::Print(reservation_limit_, TUnit::BYTES),
-          PrettyPrinter::Print(reservation_, TUnit::BYTES),
-          PrettyPrinter::Print(used_reservation_, TUnit::BYTES),
-          PrettyPrinter::Print(child_reservations_, TUnit::BYTES));
+          PrettyPrinter::Print(reservation_limit_.Load(), TUnit::BYTES),
+          PrettyPrinter::Print(reservation_.Load(), TUnit::BYTES),
+          PrettyPrinter::Print(used_reservation_.Load(), TUnit::BYTES),
+          PrettyPrinter::Print(child_reservations_.Load(), TUnit::BYTES));
       string top_n_queries = mem_tracker->LogTopNQueries(5);
       if (!top_n_queries.empty()) {
         error_msg = Substitute(
@@ -228,7 +228,7 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
     // The reservation was granted and state updated in all ancestors: we can modify
     // this tracker's state now.
     UpdateReservation(reservation_increase);
-    if (is_child_reservation) child_reservations_ += bytes;
+    if (is_child_reservation) child_reservations_.Add(bytes);
   }
 
   CheckConsistency();
@@ -270,9 +270,9 @@ void ReservationTracker::DecreaseReservation(int64_t bytes, bool is_child_reserv
 void ReservationTracker::DecreaseReservationLocked(
     int64_t bytes, bool is_child_reservation) {
   DCHECK(initialized_);
-  DCHECK_GE(reservation_, bytes);
+  DCHECK_GE(reservation_.Load(), bytes);
   if (bytes == 0) return;
-  if (is_child_reservation) child_reservations_ -= bytes;
+  if (is_child_reservation) child_reservations_.Add(-bytes);
   UpdateReservation(-bytes);
   ReleaseToMemTracker(bytes);
   // The reservation should be returned up the tree.
@@ -325,7 +325,9 @@ bool ReservationTracker::TransferReservationTo(ReservationTracker* other, int64_
 
   // Check reservation limits will not be violated before applying any updates.
   for (ReservationTracker* tracker : other_path_to_common) {
-    if (tracker->reservation_ + bytes > tracker->reservation_limit_) return false;
+    if (tracker->reservation_.Load() + bytes > tracker->reservation_limit_.Load()) {
+      return false;
+    }
   }
 
   // Do the updates now that we have checked the limits. We're holding all the locks
@@ -337,10 +339,10 @@ bool ReservationTracker::TransferReservationTo(ReservationTracker* other, int64_
     DCHECK(tracker->mem_tracker_ == nullptr || !tracker->mem_tracker_->has_limit());
     bool success = tracker->TryConsumeFromMemTracker(bytes, MemLimit::HARD);
     DCHECK(success);
-    if (tracker != other_path_to_common[0]) tracker->child_reservations_ += bytes;
+    if (tracker != other_path_to_common[0]) tracker->child_reservations_.Add(bytes);
   }
   for (ReservationTracker* tracker : path_to_common) {
-    if (tracker != path_to_common[0]) tracker->child_reservations_ -= bytes;
+    if (tracker != path_to_common[0]) tracker->child_reservations_.Add(-bytes);
     tracker->UpdateReservation(-bytes);
     tracker->ReleaseToMemTracker(bytes);
   }
@@ -348,15 +350,19 @@ bool ReservationTracker::TransferReservationTo(ReservationTracker* other, int64_
   // Update the 'child_reservations_' on the common ancestor if needed.
   // Case 1: reservation was pushed up to 'other'.
   if (common_ancestor == other) {
+    other->child_reservations_.Add(-bytes);
+#ifndef NDEBUG
     lock_guard<SpinLock> l(other->lock_);
-    other->child_reservations_ -= bytes;
     other->CheckConsistency();
+#endif
   }
   // Case 2: reservation was pushed down below 'this'.
   if (common_ancestor == this) {
+    child_reservations_.Add(bytes);
+#ifndef NDEBUG
     lock_guard<SpinLock> l(lock_);
-    child_reservations_ += bytes;
     CheckConsistency();
+#endif
   }
   return true;
 }
@@ -388,7 +394,7 @@ void ReservationTracker::ReleaseTo(int64_t bytes) {
   lock_guard<SpinLock> l(lock_);
   DCHECK(initialized_);
   DCHECK_GE(bytes, 0);
-  DCHECK_LE(bytes, used_reservation_);
+  DCHECK_LE(bytes, used_reservation_.Load());
   UpdateUsedReservation(-bytes);
   CheckConsistency();
 }
@@ -397,14 +403,14 @@ int64_t ReservationTracker::GetReservation() {
   // Don't acquire lock - there is no point in holding it for this function only since
   // the value read can change as soon as we release it.
   DCHECK(initialized_);
-  return base::subtle::Acquire_Load(&reservation_);
+  return reservation_.Load();
 }
 
 int64_t ReservationTracker::GetUsedReservation() {
   // Don't acquire lock - there is no point in holding it for this function only since
   // the value read can change as soon as we release it.
   DCHECK(initialized_);
-  return base::subtle::Acquire_Load(&used_reservation_);
+  return used_reservation_.Load();
 }
 
 int64_t ReservationTracker::GetUnusedReservation() {
@@ -417,35 +423,35 @@ int64_t ReservationTracker::GetChildReservations() {
   // Don't acquire lock - there is no point in holding it for this function only since
   // the value read can change as soon as we release it.
   DCHECK(initialized_);
-  return base::subtle::Acquire_Load(&child_reservations_);
+  return child_reservations_.Load();
 }
 
 void ReservationTracker::CheckConsistency() const {
   // Check internal invariants.
-  DCHECK_GE(reservation_, 0);
-  DCHECK_LE(reservation_, reservation_limit_);
-  DCHECK_GE(child_reservations_, 0);
-  DCHECK_GE(used_reservation_, 0);
-  DCHECK_LE(used_reservation_ + child_reservations_, reservation_);
-
-  DCHECK_EQ(reservation_, counters_.peak_reservation->current_value());
-  DCHECK_LE(reservation_, counters_.peak_reservation->value());
-  DCHECK_EQ(used_reservation_, counters_.peak_used_reservation->current_value());
-  DCHECK_LE(used_reservation_, counters_.peak_used_reservation->value());
+  DCHECK_GE(reservation_.Load(), 0);
+  DCHECK_LE(reservation_.Load(), reservation_limit_.Load());
+  DCHECK_GE(child_reservations_.Load(), 0);
+  DCHECK_GE(used_reservation_.Load(), 0);
+  DCHECK_LE(used_reservation_.Load() + child_reservations_.Load(), reservation_.Load());
+
+  DCHECK_EQ(reservation_.Load(), counters_.peak_reservation->current_value());
+  DCHECK_LE(reservation_.Load(), counters_.peak_reservation->value());
+  DCHECK_EQ(used_reservation_.Load(), counters_.peak_used_reservation->current_value());
+  DCHECK_LE(used_reservation_.Load(), counters_.peak_used_reservation->value());
   if (counters_.reservation_limit != nullptr) {
-    DCHECK_EQ(reservation_limit_, counters_.reservation_limit->value());
+    DCHECK_EQ(reservation_limit_.Load(), counters_.reservation_limit->value());
   }
 }
 
 void ReservationTracker::UpdateUsedReservation(int64_t delta) {
-  used_reservation_ += delta;
-  COUNTER_SET(counters_.peak_used_reservation, used_reservation_);
+  int64_t used_reservation = used_reservation_.Add(delta);
+  COUNTER_SET(counters_.peak_used_reservation, used_reservation);
   CheckConsistency();
 }
 
 void ReservationTracker::UpdateReservation(int64_t delta) {
-  reservation_ += delta;
-  COUNTER_SET(counters_.peak_reservation, reservation_);
+  int64_t reservation = reservation_.Add(delta);
+  COUNTER_SET(counters_.peak_reservation, reservation);
   CheckConsistency();
 }
 
@@ -457,7 +463,7 @@ string ReservationTracker::DebugString() {
   return Substitute(
       "<ReservationTracker>: reservation_limit $0 reservation $1 used_reservation $2 "
       "child_reservations $3 parent:\n$4",
-      reservation_limit_, reservation_, used_reservation_, child_reservations_,
-      parent_debug_string);
+      reservation_limit_.Load(), reservation_.Load(), used_reservation_.Load(),
+      child_reservations_.Load(), parent_debug_string);
 }
 }
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h
index 604db7f..88bd3e6 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -189,7 +189,7 @@ class ReservationTracker {
  private:
   /// Returns the amount of 'reservation_' that is unused.
   inline int64_t unused_reservation() const {
-    return reservation_ - used_reservation_ - child_reservations_;
+    return reservation_.Load() - used_reservation_.Load() - child_reservations_.Load();
   }
 
   /// Returns the parent's memtracker if 'parent_' is non-NULL, or NULL otherwise.
@@ -308,23 +308,23 @@ class ReservationTracker {
   /// reservation increases.
   MemLimit mem_limit_mode_;
 
-  /// The maximum reservation in bytes that this tracker can have. Can be read with an
-  /// atomic load without holding lock.
-  int64_t reservation_limit_;
+  /// The maximum reservation in bytes that this tracker can have. Can be read without
+  /// holding lock.
+  AtomicInt64 reservation_limit_;
 
   /// This tracker's current reservation in bytes. 'reservation_' <= 'reservation_limit_'.
-  /// Can be read with an atomic load without holding lock.
-  int64_t reservation_;
+  /// Can be read without holding lock.
+  AtomicInt64 reservation_;
 
   /// Total reservation of children in bytes. This is included in 'reservation_'.
-  /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
-  /// Can be read with an atomic load without holding lock.
-  int64_t child_reservations_;
+  /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'. Can be read without
+  /// holding lock.
+  AtomicInt64 child_reservations_;
 
   /// The amount of the reservation currently used by this tracker in bytes.
   /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
-  /// Can be read with an atomic load without holding lock.
-  int64_t used_reservation_;
+  /// Can be read without holding lock.
+  AtomicInt64 used_reservation_;
 };
 }
 
diff --git a/be/src/runtime/bufferpool/system-allocator.cc b/be/src/runtime/bufferpool/system-allocator.cc
index 8dc4625..e6b0376 100644
--- a/be/src/runtime/bufferpool/system-allocator.cc
+++ b/be/src/runtime/bufferpool/system-allocator.cc
@@ -121,9 +121,9 @@ Status SystemAllocator::AllocateViaMalloc(int64_t len, uint8_t** buffer_mem) {
   // This ensures that it can be backed by a whole pages, rather than parts of pages.
   size_t alignment = use_huge_pages ? HUGE_PAGE_SIZE : SMALL_PAGE_SIZE;
   int rc = posix_memalign(reinterpret_cast<void**>(buffer_mem), alignment, len);
-#ifdef ADDRESS_SANITIZER
-  // Workaround ASAN bug where posix_memalign returns 0 even when allocation fails.
-  // It should instead return ENOMEM. See https://bugs.llvm.org/show_bug.cgi?id=32968.
+#ifdef THREAD_SANITIZER
+  // Workaround TSAN bug where posix_memalign returns 0 even when allocation fails. It
+  // should instead return ENOMEM. See https://reviews.llvm.org/D35690.
   if (rc == 0 && *buffer_mem == nullptr && len != 0) rc = ENOMEM;
 #endif
   if (rc != 0) {
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index f4c5859..4a4243c 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -175,7 +175,15 @@ void Coordinator::BackendState::Exec(const DebugOptions& debug_options,
     const kudu::Slice& serialized_query_ctx, CountingBarrier* exec_complete_barrier) {
   const auto trigger = MakeScopeExitTrigger([&]() {
     // Ensure that 'last_report_time_ms_' is set prior to the barrier being notified.
-    last_report_time_ms_ = GenerateReportTimestamp();
+    {
+      // Since last_report_time_ms_ is protected by lock_ it must be acquired before
+      // updating last_report_time_ms_. The lock_ is guaranteed to not be held by this
+      // method when this lambda runs since it has not already been acquired by the
+      // method. The lambda executes in an object destructor and C++ destroys objects in
+      // the reverse order they were created.
+      lock_guard<mutex> lock(lock_);
+      last_report_time_ms_ = GenerateReportTimestamp();
+    }
     exec_complete_barrier->Notify();
   });
 
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 486a810..07cfadc 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -677,7 +677,7 @@ Status Coordinator::FinalizeHdfsDml() {
   // All instances must have reported their final statuses before finalization, which is a
   // post-condition of Wait. If the query was not successful, still try to clean up the
   // staging directory.
-  DCHECK(has_called_wait_);
+  DCHECK(has_called_wait_.Load());
   DCHECK(finalize_params() != nullptr);
   bool is_transactional = finalize_params()->__isset.write_id;
 
@@ -730,8 +730,8 @@ void Coordinator::WaitForBackends() {
 Status Coordinator::Wait() {
   lock_guard<SpinLock> l(wait_lock_);
   SCOPED_TIMER(query_profile_->total_time_counter());
-  if (has_called_wait_) return Status::OK();
-  has_called_wait_ = true;
+  if (has_called_wait_.Load()) return Status::OK();
+  has_called_wait_.Store(true);
 
   if (stmt_type_ == TStmtType::QUERY) {
     DCHECK(coord_instance_ != nullptr);
@@ -756,7 +756,7 @@ Status Coordinator::Wait() {
 Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos,
     int64_t block_on_wait_time_us) {
   VLOG_ROW << "GetNext() query_id=" << PrintId(query_id());
-  DCHECK(has_called_wait_);
+  DCHECK(has_called_wait_.Load());
   SCOPED_TIMER(query_profile_->total_time_counter());
 
   if (ReturnedAllResults()) {
@@ -978,7 +978,7 @@ void Coordinator::ComputeQuerySummary() {
   // In this case, the query did not even get to start all fragment instances.
   // Some of the state that is used below might be uninitialized.  In this case,
   // the query has made so little progress, reporting a summary is not very useful.
-  if (!has_called_wait_) return;
+  if (!has_called_wait_.Load()) return;
 
   if (backend_states_.empty()) return;
   // make sure fragment_stats_ are up-to-date
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index c295d66..019f3fb 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -290,7 +290,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// ensures single-threaded execution of Wait(). See lock ordering class comment.
   SpinLock wait_lock_;
 
-  bool has_called_wait_ = false;  // if true, Wait() was called; protected by wait_lock_
+  /// If true, Wait() was called. When read / written in Wait() the wait_lock_ should
+  /// still be acquired to ensure Wait() is only executed once.
+  AtomicBool has_called_wait_;
 
   BackendResourceState* backend_resource_state_ = nullptr;
 
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 4ffdb1d..d8dc832 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -517,7 +517,7 @@ class DataStreamTest : public testing::Test {
     sender_info_.emplace_back(make_unique<SenderInfo>());
     sender_info_.back()->thread_handle.reset(
         new thread(&DataStreamTest::Sender, this, num_senders, channel_buffer_size,
-            partition_type));
+            partition_type, sender_info_[num_senders].get()));
   }
 
   void JoinSenders() {
@@ -527,8 +527,8 @@ class DataStreamTest : public testing::Test {
     }
   }
 
-  void Sender(
-      int sender_num, int channel_buffer_size, TPartitionType::type partition_type) {
+  void Sender(int sender_num, int channel_buffer_size,
+      TPartitionType::type partition_type, SenderInfo* info) {
     RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
     VLOG_QUERY << "create sender " << sender_num;
     const TDataSink& sink = GetSink(partition_type);
@@ -545,7 +545,6 @@ class DataStreamTest : public testing::Test {
     EXPECT_OK(sender->Prepare(&state, &tracker_));
     EXPECT_OK(sender->Open(&state));
     scoped_ptr<RowBatch> batch(CreateRowBatch());
-    SenderInfo* info = sender_info_[sender_num].get();
     int next_val = 0;
     for (int i = 0; i < NUM_BATCHES; ++i) {
       GetNextBatch(batch.get(), &next_val);
diff --git a/be/src/runtime/io/data-cache.cc b/be/src/runtime/io/data-cache.cc
index 3e5a8fc..85dd277 100644
--- a/be/src/runtime/io/data-cache.cc
+++ b/be/src/runtime/io/data-cache.cc
@@ -267,17 +267,18 @@ class DataCache::CacheFile {
   int64_t Allocate(int64_t len, const std::unique_lock<SpinLock>& partition_lock) {
     DCHECK(partition_lock.owns_lock());
     DCHECK_EQ(len % PAGE_SIZE, 0);
-    DCHECK_EQ(current_offset_ % PAGE_SIZE, 0);
+    int64_t current_offset = current_offset_.Load();
+    DCHECK_EQ(current_offset % PAGE_SIZE, 0);
     // Hold the lock in shared mode to check if 'file_' is not closed already.
     kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
-    if (!allow_append_ || (current_offset_ + len > FLAGS_data_cache_file_max_size_bytes &&
-            current_offset_ > 0)) {
+    if (!allow_append_ || (current_offset + len > FLAGS_data_cache_file_max_size_bytes &&
+            current_offset > 0)) {
       allow_append_ = false;
       return -1;
     }
     DCHECK(file_);
-    int64_t insertion_offset = current_offset_;
-    current_offset_ += len;
+    int64_t insertion_offset = current_offset;
+    current_offset_.Add(len);
     return insertion_offset;
   }
 
@@ -289,7 +290,7 @@ class DataCache::CacheFile {
     // Hold the lock in shared mode to check if 'file_' is not closed already.
     kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
     if (UNLIKELY(!file_)) return false;
-    DCHECK_LE(offset + bytes_to_read, current_offset_);
+    DCHECK_LE(offset + bytes_to_read, current_offset_.Load());
     kudu::Status status = file_->Read(offset, Slice(buffer, bytes_to_read));
     if (UNLIKELY(!status.ok())) {
       LOG(ERROR) << Substitute("Failed to read from $0 at offset $1 for $2 bytes: $3",
@@ -304,11 +305,11 @@ class DataCache::CacheFile {
   // already closed.
   bool Write(int64_t offset, const uint8_t* buffer, int64_t buffer_len) {
     DCHECK_EQ(offset % PAGE_SIZE, 0);
-    DCHECK_LE(offset, current_offset_);
+    DCHECK_LE(offset, current_offset_.Load());
     // Hold the lock in shared mode to check if 'file_' is not closed already.
     kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
     if (UNLIKELY(!file_)) return false;
-    DCHECK_LE(offset + buffer_len, current_offset_);
+    DCHECK_LE(offset + buffer_len, current_offset_.Load());
     kudu::Status status = file_->Write(offset, Slice(buffer, buffer_len));
     if (UNLIKELY(!status.ok())) {
       LOG(ERROR) << Substitute("Failed to write to $0 at offset $1 for $2 bytes: $3",
@@ -324,7 +325,7 @@ class DataCache::CacheFile {
     // Hold the lock in shared mode to check if 'file_' is not closed already.
     kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
     if (UNLIKELY(!file_)) return;
-    DCHECK_LE(offset + hole_size, current_offset_);
+    DCHECK_LE(offset + hole_size, current_offset_.Load());
     kudu::Status status = file_->PunchHole(offset, hole_size);
     if (UNLIKELY(!status.ok())) {
       LOG(DFATAL) << Substitute("Failed to punch hole in $0 at offset $1 for $2 $3",
@@ -345,7 +346,7 @@ class DataCache::CacheFile {
   bool allow_append_ = true;
 
   /// The current offset in the file to append to on next insert.
-  int64_t current_offset_ = 0;
+  AtomicInt64 current_offset_;
 
   /// This is a reader-writer lock used for synchronization with the deleter thread.
   /// It is taken in write mode in Close() and shared mode everywhere else. It's expected
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index 63b1854..cb4ba0b 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -111,7 +111,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
   Status status;
   char read_buffer[MAX_FILE_LEN];
 
-  while (!shutdown_) {
+  while (!shutdown_.Load()) {
     bool eos = false;
     int bytes_read = 0;
 
@@ -193,7 +193,7 @@ void DiskIoMgrStress::CancelRandomReader() {
 }
 
 void DiskIoMgrStress::Run(int sec) {
-  shutdown_ = false;
+  shutdown_.Store(false);
   for (int i = 0; i < num_clients_; ++i) {
     readers_.add_thread(
         new thread(&DiskIoMgrStress::ClientThread, this, i));
@@ -210,7 +210,7 @@ void DiskIoMgrStress::Run(int sec) {
   }
 
   // Signal shutdown for the client threads
-  shutdown_ = true;
+  shutdown_.Store(true);
 
   for (int i = 0; i < num_clients_; ++i) {
     unique_lock<mutex> lock(clients_[i].lock);
diff --git a/be/src/runtime/io/disk-io-mgr-stress.h b/be/src/runtime/io/disk-io-mgr-stress.h
index 1baef24..edeaaf2 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.h
+++ b/be/src/runtime/io/disk-io-mgr-stress.h
@@ -97,7 +97,7 @@ class DiskIoMgrStress {
   bool includes_cancellation_;
 
   /// Flag to signal that client reader threads should exit
-  volatile bool shutdown_;
+  AtomicBool shutdown_;
 
   /// Helper to initialize a new reader client, registering a new reader with the
   /// io mgr and initializing the scan ranges
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 3c6ff27..cab33d1 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -240,11 +240,14 @@ ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
     return ReadOutcome::CANCELLED;
   }
 
-  bytes_read_ += buffer_desc->len();
-  DCHECK_LE(bytes_read_, bytes_to_read_);
+  {
+    unique_lock<mutex> lock(lock_);
+    bytes_read_ += buffer_desc->len();
+    DCHECK_LE(bytes_read_, bytes_to_read_);
 
-  // It is end of stream if it is end of file, or read all the bytes.
-  buffer_desc->eosr_ = eof || bytes_read_ == bytes_to_read_;
+    // It is end of stream if it is end of file, or read all the bytes.
+    buffer_desc->eosr_ = eof || bytes_read_ == bytes_to_read_;
+  }
 
   // After calling EnqueueReadyBuffer(), it is no longer valid to touch 'buffer_desc'.
   // Store the state we need before calling EnqueueReadyBuffer().
diff --git a/be/src/service/session-expiry-test.cc b/be/src/service/session-expiry-test.cc
index 6251dd4..7dd68ad 100644
--- a/be/src/service/session-expiry-test.cc
+++ b/be/src/service/session-expiry-test.cc
@@ -51,6 +51,8 @@ DECLARE_int32(beeswax_port);
 // instead of destroying it to avoid destroying the contained objects.
 static ObjectPool* perm_objects;
 
+namespace impala {
+
 TEST(SessionTest, TestExpiry) {
   const int NUM_SESSIONS = 5;
   const int MAX_IDLE_TIMEOUT_MS = 4000;
@@ -121,6 +123,8 @@ TEST(SessionTest, TestExpiry) {
   statestore->ShutdownForTesting();
 }
 
+} // namespace impala
+
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
   impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
diff --git a/be/src/statestore/statestore-test.cc b/be/src/statestore/statestore-test.cc
index f517a56..4d1bac9 100644
--- a/be/src/statestore/statestore-test.cc
+++ b/be/src/statestore/statestore-test.cc
@@ -53,6 +53,7 @@ TEST(StatestoreTest, SmokeTest) {
   // Port already in use
   Statestore* statestore_wont_start = perm_objects->Add(new Statestore(metrics_2));
   ASSERT_FALSE(statestore_wont_start->Init(statestore->port()).ok());
+  statestore_wont_start->ShutdownForTesting();
 
   StatestoreSubscriber* sub_will_start = perm_objects->Add(
       new StatestoreSubscriber("sub1", MakeNetworkAddress("localhost", 0),
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index 70d69dc..5a3c7f5 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -826,7 +826,7 @@ class TimerCounterTest {
 
   struct DummyWorker {
     thread* thread_handle;
-    bool done;
+    AtomicBool done;
 
     DummyWorker()
       : thread_handle(NULL), done(false) {}
@@ -835,9 +835,12 @@ class TimerCounterTest {
       Stop();
     }
 
+    DummyWorker(const DummyWorker& dummy_worker)
+      : thread_handle(dummy_worker.thread_handle), done(dummy_worker.done.Load()) {}
+
     void Stop() {
-      if (!done && thread_handle != NULL) {
-        done = true;
+      if (!done.Load() && thread_handle != NULL) {
+        done.Store(true);
         thread_handle->join();
         delete thread_handle;
         thread_handle = NULL;
@@ -848,7 +851,7 @@ class TimerCounterTest {
   void Run(DummyWorker* worker) {
     SCOPED_CONCURRENT_STOP_WATCH(&csw_);
     SCOPED_CONCURRENT_COUNTER(&timercounter_);
-    while (!worker->done) {
+    while (!worker->done.Load()) {
       SleepForMs(10);
       // Each test case should be no more than one second.
       // Consider test failed if timer is more than 3 seconds.
@@ -970,6 +973,9 @@ TEST(TimerCounterTest, CountersTestRandom) {
   ValidateLapTime(&tester, MonotonicStopWatch::Now() - lap_time_start);
 }
 
+// Don't run TestAddClearRace against TSAN builds as it is expected to have race
+// conditions.
+#ifndef THREAD_SANITIZER
 
 TEST(TimeSeriesCounterTest, TestAddClearRace) {
   ObjectPool pool;
@@ -1010,6 +1016,8 @@ TEST(TimeSeriesCounterTest, TestAddClearRace) {
   EXPECT_EQ(num_samples, 0);
 }
 
+#endif
+
 /// Stops the periodic counter updater in 'profile' and then clears the samples in
 /// 'counter'.
 void StopAndClearCounter(RuntimeProfile* profile,
diff --git a/be/src/util/stopwatch.h b/be/src/util/stopwatch.h
index 9f33bd5..60cdf42 100644
--- a/be/src/util/stopwatch.h
+++ b/be/src/util/stopwatch.h
@@ -227,13 +227,16 @@ class ConcurrentStopWatch {
     return lap_duration;
   }
 
-  uint64_t TotalRunningTime() const {  return msw_.ElapsedTime(); }
+  uint64_t TotalRunningTime() const {
+    boost::lock_guard<SpinLock> l(thread_counter_lock_);
+    return msw_.ElapsedTime();
+  }
 
  private:
   MonotonicStopWatch msw_;
 
   /// Lock with busy_threads_.
-  SpinLock thread_counter_lock_;
+  mutable SpinLock thread_counter_lock_;
 
   /// Track how many threads are currently busy.
   int busy_threads_;
diff --git a/bin/run-backend-tests.sh b/bin/run-backend-tests.sh
index 16a432f..5e1d84f 100755
--- a/bin/run-backend-tests.sh
+++ b/bin/run-backend-tests.sh
@@ -38,7 +38,19 @@ cd ${IMPALA_BE_DIR}
 cd ..
 
 export CTEST_OUTPUT_ON_FAILURE=1
+
 # Override default TSAN_OPTIONS so that halt_on_error is set.
-export TSAN_OPTIONS="halt_on_error=1 history_size=7"
+# See be/src/common/init.cc and
+# https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual#non-instrumented-code
+# for an explanation of what this flag does.
+IGNORE_NONINSTRUMENTED_MODULES="ignore_noninstrumented_modules="
+if [ "${TSAN_FULL+x}" ]; then
+  IGNORE_NONINSTRUMENTED_MODULES+="0"
+else
+  IGNORE_NONINSTRUMENTED_MODULES+="1"
+fi
+export TSAN_OPTIONS="${IGNORE_NONINSTRUMENTED_MODULES} halt_on_error=1 history_size=7
+  allocator_may_return_null=1 suppressions=${IMPALA_HOME}/bin/tsan-suppressions.txt"
+
 export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
 "${MAKE_CMD:-make}" test ARGS="${BE_TEST_ARGS}"
diff --git a/bin/tsan-suppressions.txt b/bin/tsan-suppressions.txt
new file mode 100644
index 0000000..adf67c0
--- /dev/null
+++ b/bin/tsan-suppressions.txt
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This file suppresses TSAN errors, following
+# https://github.com/google/sanitizers/wiki/ThreadSanitizerSuppressions
+
+# This method in Boost's UUID library operates on static state with impunity,
+# triggering (harmless) data races in TSAN when boost::uuids::random_generator
+# instances are created across threads (this suppression and the corresponding
+# explanation were copied from Kudu's sanitizer_options.cc).
+race:boost::uuids::detail::seed_rng::sha1_random_digest_
+
+# TODO: IMPALA-9314: The ThriftServer used by the Statestore has various race conditions
+# during the shutdown process (and leaks threads)
+race:impala::StatestoreTest_SmokeTest_Test
+thread:impala::StatestoreTest_SmokeTest_Test
+race:impala::SessionTest_TestExpiry_Test
+race:impala::StatestoreSslTest_ValidCertSmokeTest_Test
diff --git a/buildall.sh b/buildall.sh
index 08ff4bf..a756069 100755
--- a/buildall.sh
+++ b/buildall.sh
@@ -70,6 +70,7 @@ BUILD_TIDY=0
 BUILD_UBSAN=0
 BUILD_UBSAN_FULL=0
 BUILD_TSAN=0
+BUILD_TSAN_FULL=0
 BUILD_SHARED_LIBS=0
 # Export MAKE_CMD so it is visible in scripts that invoke make, e.g. copy-udfs-udas.sh
 export MAKE_CMD=make
@@ -141,6 +142,9 @@ do
     -tsan)
       BUILD_TSAN=1
       ;;
+     -full_tsan)
+      BUILD_TSAN_FULL=1
+      ;;
     -testpairwise)
       EXPLORATION_STRATEGY=pairwise
       ;;
@@ -206,10 +210,18 @@ do
       echo "[-codecoverage] : Build with code coverage [Default: False]"
       echo "[-asan] : Address sanitizer build [Default: False]"
       echo "[-tidy] : clang-tidy build [Default: False]"
+      echo "[-tsan] : Thread sanitizer build, runs with"\
+           "ignore_noninstrumented_modules=1. When this flag is true, TSAN ignores"\
+           "memory accesses from non-instrumented libraries. This decreases the number"\
+           "of false positives, but might miss real issues. -full_tsan disables this"\
+           "flag [Default: False]"
+      echo "[-full_tsan] : Thread sanitizer build, runs with"\
+           "ignore_noninstrumented_modules=0 (see the -tsan description for an"\
+           "explanation of what this flag does) [Default: False]"
       echo "[-ubsan] : Undefined behavior sanitizer build [Default: False]"
       echo "[-full_ubsan] : Undefined behavior sanitizer build, including code generated"\
-           " by cross-compilation to LLVM IR. Much slower queries than plain -ubsan "\
-           " [Default: False]"
+           "by cross-compilation to LLVM IR. Much slower queries than plain -ubsan"\
+           "[Default: False]"
       echo "[-skiptests] : Skips execution of all tests"
       echo "[-notests] : Skips building and execution of all tests"
       echo "[-start_minicluster] : Start test cluster including Impala and all"\
@@ -299,6 +311,10 @@ fi
 if [[ ${BUILD_TSAN} -eq 1 ]]; then
   CMAKE_BUILD_TYPE_LIST+=(TSAN)
 fi
+if [[ ${BUILD_TSAN_FULL} -eq 1 ]]; then
+  CMAKE_BUILD_TYPE_LIST+=(TSAN_FULL)
+  export TSAN_FULL=1
+fi
 if [[ -n "${CMAKE_BUILD_TYPE_LIST:+1}" ]]; then
   if [[ ${#CMAKE_BUILD_TYPE_LIST[@]} -gt 1 ]]; then
     echo "ERROR: more than one CMake build type defined: ${CMAKE_BUILD_TYPE_LIST[@]}"
@@ -433,7 +449,8 @@ generate_cmake_files() {
             || ("$build_type" == "TIDY") \
             || ("$build_type" == "UBSAN") \
             || ("$build_type" == "UBSAN_FULL") \
-            || ("$build_type" == "TSAN") ]]; then
+            || ("$build_type" == "TSAN") \
+            || ("$build_type" == "TSAN_FULL") ]]; then
     CMAKE_ARGS+=(-DCMAKE_TOOLCHAIN_FILE=$IMPALA_HOME/cmake_modules/clang_toolchain.cmake)
   else
     CMAKE_ARGS+=(-DCMAKE_TOOLCHAIN_FILE=$IMPALA_HOME/cmake_modules/toolchain.cmake)
diff --git a/tests/common/environ.py b/tests/common/environ.py
index 3fc6230..0987b1b 100644
--- a/tests/common/environ.py
+++ b/tests/common/environ.py
@@ -113,13 +113,15 @@ class ImpalaBuildFlavors:
   TIDY = 'tidy'
   # ./buildall.sh -tsan
   TSAN = 'tsan'
+  # ./buildall.sh -full_tsan
+  TSAN_FULL = 'tsan_full'
   # ./buildall.sh -ubsan
   UBSAN = 'ubsan'
   # ./buildall.sh -full_ubsan
   UBSAN_FULL = 'ubsan_full'
 
   VALID_BUILD_TYPES = [ADDRESS_SANITIZER, DEBUG, CODE_COVERAGE_DEBUG, RELEASE,
-      CODE_COVERAGE_RELEASE, TIDY, TSAN, UBSAN, UBSAN_FULL]
+      CODE_COVERAGE_RELEASE, TIDY, TSAN, TSAN_FULL, UBSAN, UBSAN_FULL]
 
 
 class LinkTypes:
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index d94fcfd..523ff12 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -75,7 +75,7 @@ class TestWebPage(ImpalaTestSuite):
       assert build_flags["is_ndebug"] in ["true", "false"]
       assert "cmake_build_type" in build_flags
       assert build_flags["cmake_build_type"] in ["debug", "release", "address_sanitizer",
-          "tidy", "ubsan", "ubsan_full", "tsan", "code_coverage_release",
+          "tidy", "ubsan", "ubsan_full", "tsan", "tsan_full", "code_coverage_release",
           "code_coverage_debug"]
       assert "library_link_type" in build_flags
       assert build_flags["library_link_type"] in ["dynamic", "static"]