You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/21 19:39:50 UTC

[impala] 02/13: IMPALA-2746: part 1: enable LSAN for many backend tests

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 09e84801a76788bb79ac0bc2115c1c3d36185d67
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Fri Jun 8 14:54:45 2018 -0700

    IMPALA-2746: part 1: enable LSAN for many backend tests
    
    This turns on leak sanitizer for backend tests that required
    relatively small modifications to pass. We suppress a few
    leaks, mainly related to the embedded JVM.
    
    Testing:
    Ran core tests under ASAN.
    
    Change-Id: Ibdda092a4eb4bc827c75a8c121e5428ec746b7f4
    Reviewed-on: http://gerrit.cloudera.org:8080/10668
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/CMakeLists.txt                                  |  11 +-
 be/src/catalog/CMakeLists.txt                      |   2 +-
 be/src/codegen/CMakeLists.txt                      |   4 +-
 be/src/codegen/instruction-counter-test.cc         |  54 +++++-----
 be/src/common/CMakeLists.txt                       |   4 +-
 be/src/common/atomic-test.cc                       |   8 +-
 be/src/common/atomic.h                             |  11 ++
 be/src/exec/CMakeLists.txt                         |  20 ++--
 be/src/experiments/CMakeLists.txt                  |   2 +-
 be/src/exprs/CMakeLists.txt                        |   4 +-
 be/src/exprs/expr-test.cc                          |  93 +++++++++--------
 be/src/rpc/CMakeLists.txt                          |  12 +--
 be/src/runtime/CMakeLists.txt                      |  38 +++----
 be/src/runtime/bufferpool/CMakeLists.txt           |  10 +-
 be/src/runtime/bufferpool/buffer-pool-test.cc      |   4 +-
 be/src/runtime/data-stream-test.cc                 | 111 ++++++++++-----------
 be/src/runtime/io/CMakeLists.txt                   |   2 +-
 be/src/runtime/io/disk-io-mgr-stress.cc            |   4 +-
 be/src/runtime/io/disk-io-mgr-stress.h             |   3 +-
 be/src/runtime/io/disk-io-mgr-test.cc              |   6 +-
 be/src/runtime/mem-tracker.cc                      |   3 +-
 be/src/scheduling/CMakeLists.txt                   |   6 +-
 be/src/service/CMakeLists.txt                      |   8 +-
 be/src/statestore/CMakeLists.txt                   |   2 +-
 be/src/udf/udf.cc                                  |   2 +
 be/src/util/CMakeLists.txt                         |  72 ++++++-------
 be/src/util/decompress-test.cc                     |   9 +-
 .../CMakeLists.txt => bin/lsan-suppressions.txt    |  25 ++---
 28 files changed, 276 insertions(+), 254 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 1eed615..d817009 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -510,12 +510,18 @@ FUNCTION(ADD_BE_TEST TEST_NAME)
   ADD_DEPENDENCIES(be-test ${TEST_NAME})
 ENDFUNCTION()
 
+FUNCTION(ENABLE_LSAN_FOR_TEST TEST_NAME)
+  SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES ENVIRONMENT
+      "ASAN_OPTIONS=handle_segv=0 detect_leaks=1 allocator_may_return_null=1")
+  SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES ENVIRONMENT
+      "LSAN_OPTIONS=suppressions=${CMAKE_SOURCE_DIR}/bin/lsan-suppressions.txt")
+ENDFUNCTION()
+
 # Same as ADD_BE_TEST, but also enable LeakSanitizer.
 # TODO: IMPALA-2746: we should make this the default.
 FUNCTION(ADD_BE_LSAN_TEST TEST_NAME)
   ADD_BE_TEST(${TEST_NAME})
-  SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES ENVIRONMENT
-      "ASAN_OPTIONS=handle_segv=0 detect_leaks=1 allocator_may_return_null=1")
+  ENABLE_LSAN_FOR_TEST(${TEST_NAME})
 ENDFUNCTION()
 
 # Similar utility function for tests that use the UDF SDK
@@ -532,6 +538,7 @@ FUNCTION(ADD_UDF_TEST TEST_NAME)
   ADD_TEST(${TEST_NAME} "${BUILD_OUTPUT_ROOT_DIRECTORY}/${DIR_NAME}/${TEST_NAME}"
     -log_dir=$ENV{IMPALA_BE_TEST_LOGS_DIR})
   ADD_DEPENDENCIES(be-test ${TEST_NAME})
+  ENABLE_LSAN_FOR_TEST(${TEST_NAME})
 ENDFUNCTION()
 
 # Function to generate rule to cross compile a source file to an IR module.
diff --git a/be/src/catalog/CMakeLists.txt b/be/src/catalog/CMakeLists.txt
index 35cccea..ba8d97c 100644
--- a/be/src/catalog/CMakeLists.txt
+++ b/be/src/catalog/CMakeLists.txt
@@ -26,4 +26,4 @@ add_library(Catalog
 )
 add_dependencies(Catalog gen-deps)
 
-ADD_BE_TEST(catalog-util-test)
+ADD_BE_LSAN_TEST(catalog-util-test)
diff --git a/be/src/codegen/CMakeLists.txt b/be/src/codegen/CMakeLists.txt
index 56228a2..2709fcb 100644
--- a/be/src/codegen/CMakeLists.txt
+++ b/be/src/codegen/CMakeLists.txt
@@ -107,6 +107,6 @@ add_custom_target(test-loop.bc
   SOURCES ${CMAKE_SOURCE_DIR}/testdata/llvm/test-loop.cc
 )
 
-ADD_BE_TEST(llvm-codegen-test)
+ADD_BE_LSAN_TEST(llvm-codegen-test)
 add_dependencies(llvm-codegen-test test-loop.bc)
-ADD_BE_TEST(instruction-counter-test)
+ADD_BE_LSAN_TEST(instruction-counter-test)
diff --git a/be/src/codegen/instruction-counter-test.cc b/be/src/codegen/instruction-counter-test.cc
index f404243..5dbc605 100644
--- a/be/src/codegen/instruction-counter-test.cc
+++ b/be/src/codegen/instruction-counter-test.cc
@@ -73,19 +73,19 @@ llvm::Module* CodegenMulAdd(llvm::LLVMContext* context) {
 
 TEST_F(InstructionCounterTest, Count) {
   llvm::Module* MulAddModule = CodegenMulAdd(&context_);
-  InstructionCounter* instruction_counter = new InstructionCounter();
-  instruction_counter->visit(*MulAddModule);
-  instruction_counter->PrintCounters();
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_FUNCTIONS), 1);
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_INSTS), 3);
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TERMINATOR_INSTS), 1);
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::MEMORY_INSTS), 0);
+  InstructionCounter instruction_counter;
+  instruction_counter.visit(*MulAddModule);
+  instruction_counter.PrintCounters();
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_FUNCTIONS), 1);
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_INSTS), 3);
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TERMINATOR_INSTS), 1);
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::MEMORY_INSTS), 0);
 
   // Test Reset
-  instruction_counter->ResetCount();
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_FUNCTIONS), 0);
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_INSTS), 0);
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::MEMORY_INSTS), 0);
+  instruction_counter.ResetCount();
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_FUNCTIONS), 0);
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_INSTS), 0);
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::MEMORY_INSTS), 0);
 }
 
 // IR output from CodegenGcd
@@ -152,25 +152,25 @@ llvm::Module* CodegenGcd(llvm::LLVMContext* context) {
 
 TEST_F(InstructionCounterTest, TestMemInstrCount) {
   llvm::Module* GcdModule = CodegenGcd(&context_);
-  InstructionCounter* instruction_counter = new InstructionCounter();
-  instruction_counter->visit(*GcdModule);
-  std::cout << instruction_counter->PrintCounters();
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_FUNCTIONS), 1);
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_BLOCKS), 5);
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_INSTS), 11);
+  InstructionCounter instruction_counter;
+  instruction_counter.visit(*GcdModule);
+  std::cout << instruction_counter.PrintCounters();
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_FUNCTIONS), 1);
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_BLOCKS), 5);
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_INSTS), 11);
   // Test Category Totals
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TERMINATOR_INSTS), 5);
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::MEMORY_INSTS), 0);
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::OTHER_INSTS), 4);
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TERMINATOR_INSTS), 5);
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::MEMORY_INSTS), 0);
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::OTHER_INSTS), 4);
 
   // Test Reset
-  instruction_counter->ResetCount();
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_FUNCTIONS), 0);
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_BLOCKS), 0);
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_INSTS), 0);
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TERMINATOR_INSTS), 0);
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::MEMORY_INSTS), 0);
-  EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::OTHER_INSTS), 0);
+  instruction_counter.ResetCount();
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_FUNCTIONS), 0);
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_BLOCKS), 0);
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_INSTS), 0);
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TERMINATOR_INSTS), 0);
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::MEMORY_INSTS), 0);
+  EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::OTHER_INSTS), 0);
 }
 
 }  // namespace impala
diff --git a/be/src/common/CMakeLists.txt b/be/src/common/CMakeLists.txt
index c01d71c..daa274d 100644
--- a/be/src/common/CMakeLists.txt
+++ b/be/src/common/CMakeLists.txt
@@ -49,8 +49,8 @@ add_library(GlobalFlags
 )
 add_dependencies(GlobalFlags gen-deps)
 
-ADD_BE_TEST(atomic-test)
-ADD_BE_TEST(thread-debug-info-test)
+ADD_BE_LSAN_TEST(atomic-test)
+ADD_BE_LSAN_TEST(thread-debug-info-test)
 
 # Generate config.h from config.h.in, filling in variables from CMake
 CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.in
diff --git a/be/src/common/atomic-test.cc b/be/src/common/atomic-test.cc
index ccbaa8e..dc35396 100644
--- a/be/src/common/atomic-test.cc
+++ b/be/src/common/atomic-test.cc
@@ -236,10 +236,10 @@ static void TestAcquireReleaseLoadStore() {
   const int ITERS = 1000000;
   AtomicInt<T> control(-1);
   T payload = -1;
-  thread* t_a = new thread(AcquireReleaseThreadA<T>, 0, 1, ITERS, &control, &payload);
-  thread* t_b = new thread(AcquireReleaseThreadB<T>, 1, 0, ITERS, &control, &payload);
-  t_a->join();
-  t_b->join();
+  thread t_a(AcquireReleaseThreadA<T>, 0, 1, ITERS, &control, &payload);
+  thread t_b(AcquireReleaseThreadB<T>, 1, 0, ITERS, &control, &payload);
+  t_a.join();
+  t_b.join();
 }
 
 TEST(AtomicTest, MultipleTreadsAcquireReleaseLoadStoreInt) {
diff --git a/be/src/common/atomic.h b/be/src/common/atomic.h
index 0d3e556..4c72826 100644
--- a/be/src/common/atomic.h
+++ b/be/src/common/atomic.h
@@ -105,6 +105,12 @@ class AtomicInt {
     return base::subtle::Barrier_CompareAndSwap(&value_, old_val, new_val) == old_val;
   }
 
+  /// Store 'new_val' and return the previous value. Implies a Release memory barrier
+  /// (i.e. the same as Store()).
+  ALWAYS_INLINE T Swap(T new_val) {
+    return base::subtle::Release_AtomicExchange(&value_, new_val);
+  }
+
  private:
   T value_;
 
@@ -130,6 +136,11 @@ class AtomicPtr {
   /// Atomic store with "release" memory-ordering semantic.
   ALWAYS_INLINE void Store(T* val) { ptr_.Store(reinterpret_cast<intptr_t>(val)); }
 
+  /// Store 'new_val' and return the previous value. Implies a Release memory barrier
+  /// (i.e. the same as Store()).
+  ALWAYS_INLINE T* Swap(T* val) {
+    return reinterpret_cast<T*>(ptr_.Swap(reinterpret_cast<intptr_t>(val)));
+  }
  private:
   internal::AtomicInt<intptr_t> ptr_;
 };
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 2349df4..77c6e15 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -102,13 +102,13 @@ add_library(Exec
 
 add_dependencies(Exec gen-deps)
 
-ADD_BE_TEST(zigzag-test)
-ADD_BE_TEST(hash-table-test)
-ADD_BE_TEST(delimited-text-parser-test)
-ADD_BE_TEST(read-write-util-test)
-ADD_BE_TEST(parquet-plain-test)
-ADD_BE_TEST(parquet-version-test)
-ADD_BE_TEST(row-batch-list-test)
-ADD_BE_TEST(incr-stats-util-test)
-ADD_BE_TEST(hdfs-avro-scanner-test)
-ADD_BE_TEST(hdfs-parquet-scanner-test)
+ADD_BE_LSAN_TEST(zigzag-test)
+ADD_BE_LSAN_TEST(hash-table-test)
+ADD_BE_LSAN_TEST(delimited-text-parser-test)
+ADD_BE_LSAN_TEST(read-write-util-test)
+ADD_BE_LSAN_TEST(parquet-plain-test)
+ADD_BE_LSAN_TEST(parquet-version-test)
+ADD_BE_LSAN_TEST(row-batch-list-test)
+ADD_BE_LSAN_TEST(incr-stats-util-test)
+ADD_BE_LSAN_TEST(hdfs-avro-scanner-test)
+ADD_BE_LSAN_TEST(hdfs-parquet-scanner-test)
diff --git a/be/src/experiments/CMakeLists.txt b/be/src/experiments/CMakeLists.txt
index b4f0c49..c4b5593 100644
--- a/be/src/experiments/CMakeLists.txt
+++ b/be/src/experiments/CMakeLists.txt
@@ -38,4 +38,4 @@ target_link_libraries(tuple-splitter-test Experiments ${IMPALA_LINK_LIBS})
 target_link_libraries(hash-partition-test ${IMPALA_LINK_LIBS})
 target_link_libraries(compression-test ${IMPALA_LINK_LIBS})
 
-ADD_BE_TEST(string-search-sse-test)
+ADD_BE_LSAN_TEST(string-search-sse-test)
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index 755c166..14c65f5 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -66,8 +66,8 @@ add_library(Exprs
 )
 add_dependencies(Exprs gen-deps gen_ir_descriptions)
 
-ADD_BE_TEST(expr-test)
-ADD_BE_TEST(expr-codegen-test)
+ADD_BE_LSAN_TEST(expr-test)
+ADD_BE_LSAN_TEST(expr-codegen-test)
 
 # expr-codegen-test includes test IR functions
 COMPILE_TO_IR(expr-codegen-test.cc)
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 530fce3..a96b556 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -168,6 +168,9 @@ class ScopedLocalUnixTimestampConversionOverride {
 
 class ExprTest : public testing::Test {
  protected:
+  // Pool for objects to be destroyed during test teardown.
+  ObjectPool pool_;
+
   // Maps from enum value of primitive integer type to the minimum value that is
   // outside of the next smaller-resolution type. For example the value for type
   // TYPE_SMALLINT is numeric_limits<int8_t>::max()+1. There is a GREATEST test in
@@ -238,6 +241,8 @@ class ExprTest : public testing::Test {
     default_type_strs_[TYPE_DECIMAL] = default_decimal_str_;
   }
 
+  virtual void TearDown() { pool_.Clear(); }
+
   string GetValue(const string& expr, const ColumnType& expr_type,
       bool expect_error = false) {
     string stmt = "select " + expr;
@@ -1023,7 +1028,7 @@ class ExprTest : public testing::Test {
   template<class T>
   bool ParseString(const string& str, T* val);
 
-  // Create a Literal expression out of 'str'.
+  // Create a Literal expression out of 'str'. Adds the returned literal to pool_.
   Literal* CreateLiteral(const ColumnType& type, const string& str);
 
   // Helper function for LiteralConstruction test. Creates a Literal expression
@@ -1051,6 +1056,15 @@ class ExprTest : public testing::Test {
       TestIsNull("cast(" + stmt + " as timestamp)", TYPE_TIMESTAMP);
     }
   }
+
+  // Wrapper around UdfTestHarness::CreateTestContext() that stores the context in
+  // 'pool_' to be automatically cleaned up.
+  FunctionContext* CreateUdfTestContext(const FunctionContext::TypeDesc& return_type,
+      const std::vector<FunctionContext::TypeDesc>& arg_types,
+      RuntimeState* state = nullptr, MemPool* pool = nullptr) {
+    return pool_.Add(
+        UdfTestHarness::CreateTestContext(return_type, arg_types, state, pool));
+  }
 };
 
 template<>
@@ -1182,51 +1196,51 @@ Literal* ExprTest::CreateLiteral(const ColumnType& type, const string& str) {
     case TYPE_BOOLEAN: {
       bool v = false;
       EXPECT_TRUE(ParseString<bool>(str, &v));
-      return new Literal(type, v);
+      return pool_.Add(new Literal(type, v));
     }
     case TYPE_TINYINT: {
       int8_t v = 0;
       EXPECT_TRUE(ParseString<int8_t>(str, &v));
-      return new Literal(type, v);
+      return pool_.Add(new Literal(type, v));
     }
     case TYPE_SMALLINT: {
       int16_t v = 0;
       EXPECT_TRUE(ParseString<int16_t>(str, &v));
-      return new Literal(type, v);
+      return pool_.Add(new Literal(type, v));
     }
     case TYPE_INT: {
       int32_t v = 0;
       EXPECT_TRUE(ParseString<int32_t>(str, &v));
-      return new Literal(type, v);
+      return pool_.Add(new Literal(type, v));
     }
     case TYPE_BIGINT: {
       int64_t v = 0;
       EXPECT_TRUE(ParseString<int64_t>(str, &v));
-      return new Literal(type, v);
+      return pool_.Add(new Literal(type, v));
     }
     case TYPE_FLOAT: {
       float v = 0;
       EXPECT_TRUE(ParseString<float>(str, &v));
-      return new Literal(type, v);
+      return pool_.Add(new Literal(type, v));
     }
     case TYPE_DOUBLE: {
       double v = 0;
       EXPECT_TRUE(ParseString<double>(str, &v));
-      return new Literal(type, v);
+      return pool_.Add(new Literal(type, v));
     }
     case TYPE_STRING:
     case TYPE_VARCHAR:
     case TYPE_CHAR:
-      return new Literal(type, str);
+      return pool_.Add(new Literal(type, str));
     case TYPE_TIMESTAMP: {
       TimestampValue v;
       EXPECT_TRUE(ParseString<TimestampValue>(str, &v));
-      return new Literal(type, v);
+      return pool_.Add(new Literal(type, v));
     }
     case TYPE_DECIMAL: {
       double v = 0;
       EXPECT_TRUE(ParseString<double>(str, &v));
-      return new Literal(type, v);
+      return pool_.Add(new Literal(type, v));
     }
     default:
       DCHECK(false) << "Invalid type: " << type.DebugString();
@@ -1238,6 +1252,7 @@ template <typename T>
 void ExprTest::TestSingleLiteralConstruction(
     const ColumnType& type, const T& value, const string& string_val) {
   ObjectPool pool;
+
   RuntimeState state(TQueryCtx(), ExecEnv::GetInstance());
   MemTracker tracker;
   MemPool mem_pool(&tracker);
@@ -3826,7 +3841,7 @@ TEST_F(ExprTest, StringFunctions) {
   FunctionContext::TypeDesc str_desc;
   str_desc.type = FunctionContext::Type::TYPE_STRING;
   std::vector<FunctionContext::TypeDesc> v(3, str_desc);
-  auto context = UdfTestHarness::CreateTestContext(str_desc, v, nullptr, &pool);
+  FunctionContext* context = CreateUdfTestContext(str_desc, v, nullptr, &pool);
 
   StringVal giga(static_cast<uint8_t*>(giga_buf->data()), StringVal::MAX_LENGTH);
   StringVal a("A");
@@ -3863,7 +3878,7 @@ TEST_F(ExprTest, StringFunctions) {
   EXPECT_TRUE(r4.is_null);
   // Re-create context to clear the error from failed allocation.
   UdfTestHarness::CloseContext(context);
-  context = UdfTestHarness::CreateTestContext(str_desc, v, nullptr, &pool);
+  context = CreateUdfTestContext(str_desc, v, nullptr, &pool);
 
   // Similar test for second overflow.  This tests overflowing on re-allocation.
   (*short_buf)[4095] = 'Z';
@@ -3872,7 +3887,7 @@ TEST_F(ExprTest, StringFunctions) {
   EXPECT_TRUE(r5.is_null);
   // Re-create context to clear the error from failed allocation.
   UdfTestHarness::CloseContext(context);
-  context = UdfTestHarness::CreateTestContext(str_desc, v, nullptr, &pool);
+  context = CreateUdfTestContext(str_desc, v, nullptr, &pool);
 
   // Finally, test expanding to exactly MAX_LENGTH
   // There are 4 Zs in giga4 (not including the trailing one, as we truncate that)
@@ -7360,8 +7375,6 @@ void ValidateLayout(const vector<ScalarExpr*>& exprs, int expected_byte_size,
 }
 
 TEST_F(ExprTest, ResultsLayoutTest) {
-  ObjectPool pool;
-
   vector<ScalarExpr*> exprs;
   map<int, set<int>> expected_offsets;
 
@@ -7402,9 +7415,9 @@ TEST_F(ExprTest, ResultsLayoutTest) {
     // With one expr, all offsets should be 0.
     expected_offsets[t.GetByteSize()] = set<int>({0});
     if (t.type != TYPE_TIMESTAMP) {
-      exprs.push_back(pool.Add(CreateLiteral(t, "0")));
+      exprs.push_back(CreateLiteral(t, "0"));
     } else {
-      exprs.push_back(pool.Add(CreateLiteral(t, "2016-11-09")));
+      exprs.push_back(CreateLiteral(t, "2016-11-09"));
     }
     if (t.IsVarLenStringType()) {
       ValidateLayout(exprs, 16, 0, expected_offsets);
@@ -7420,28 +7433,27 @@ TEST_F(ExprTest, ResultsLayoutTest) {
 
   // Test layout adding a bunch of exprs.  This is designed to trigger padding.
   // The expected result is computed along the way
-  exprs.push_back(pool.Add(CreateLiteral(TYPE_BOOLEAN, "0")));
-  exprs.push_back(pool.Add(CreateLiteral(TYPE_TINYINT, "0")));
-  exprs.push_back(pool.Add(CreateLiteral(ColumnType::CreateCharType(1), "0")));
+  exprs.push_back(CreateLiteral(TYPE_BOOLEAN, "0"));
+  exprs.push_back(CreateLiteral(TYPE_TINYINT, "0"));
+  exprs.push_back(CreateLiteral(ColumnType::CreateCharType(1), "0"));
   expected_offsets[1].insert(expected_byte_size);
   expected_offsets[1].insert(expected_byte_size + 1);
   expected_offsets[1].insert(expected_byte_size + 2);
   expected_byte_size += 3 * 1 + 1;  // 1 byte of padding
 
-  exprs.push_back(pool.Add(CreateLiteral(TYPE_SMALLINT, "0")));
+  exprs.push_back(CreateLiteral(TYPE_SMALLINT, "0"));
   expected_offsets[2].insert(expected_byte_size);
   expected_byte_size += 2; // No padding before CHAR
 
-  exprs.push_back(pool.Add(CreateLiteral(ColumnType::CreateCharType(3), "0")));
+  exprs.push_back(CreateLiteral(ColumnType::CreateCharType(3), "0"));
   expected_offsets[3].insert(expected_byte_size);
   expected_byte_size += 3 + 3; // 3 byte of padding
   ASSERT_EQ(expected_byte_size % 4, 0);
 
-  exprs.push_back(pool.Add(CreateLiteral(TYPE_INT, "0")));
-  exprs.push_back(pool.Add(CreateLiteral(TYPE_FLOAT, "0")));
-  exprs.push_back(pool.Add(CreateLiteral(TYPE_FLOAT, "0")));
-  exprs.push_back(pool.Add(
-      CreateLiteral(ColumnType::CreateDecimalType(9, 0), "0")));
+  exprs.push_back(CreateLiteral(TYPE_INT, "0"));
+  exprs.push_back(CreateLiteral(TYPE_FLOAT, "0"));
+  exprs.push_back(CreateLiteral(TYPE_FLOAT, "0"));
+  exprs.push_back(CreateLiteral(ColumnType::CreateDecimalType(9, 0), "0"));
   expected_offsets[4].insert(expected_byte_size);
   expected_offsets[4].insert(expected_byte_size + 4);
   expected_offsets[4].insert(expected_byte_size + 8);
@@ -7449,12 +7461,11 @@ TEST_F(ExprTest, ResultsLayoutTest) {
   expected_byte_size += 4 * 4 + 4;  // 4 bytes of padding
   ASSERT_EQ(expected_byte_size % 8, 0);
 
-  exprs.push_back(pool.Add(CreateLiteral(TYPE_BIGINT, "0")));
-  exprs.push_back(pool.Add(CreateLiteral(TYPE_BIGINT, "0")));
-  exprs.push_back(pool.Add(CreateLiteral(TYPE_BIGINT, "0")));
-  exprs.push_back(pool.Add(CreateLiteral(TYPE_DOUBLE, "0")));
-  exprs.push_back(pool.Add(
-      CreateLiteral(ColumnType::CreateDecimalType(18, 0), "0")));
+  exprs.push_back(CreateLiteral(TYPE_BIGINT, "0"));
+  exprs.push_back(CreateLiteral(TYPE_BIGINT, "0"));
+  exprs.push_back(CreateLiteral(TYPE_BIGINT, "0"));
+  exprs.push_back(CreateLiteral(TYPE_DOUBLE, "0"));
+  exprs.push_back(CreateLiteral(ColumnType::CreateDecimalType(18, 0), "0"));
   expected_offsets[8].insert(expected_byte_size);
   expected_offsets[8].insert(expected_byte_size + 8);
   expected_offsets[8].insert(expected_byte_size + 16);
@@ -7463,20 +7474,18 @@ TEST_F(ExprTest, ResultsLayoutTest) {
   expected_byte_size += 5 * 8;      // No more padding
   ASSERT_EQ(expected_byte_size % 8, 0);
 
-  exprs.push_back(pool.Add(CreateLiteral(TYPE_TIMESTAMP, "2016-11-09")));
-  exprs.push_back(pool.Add(CreateLiteral(TYPE_TIMESTAMP, "2016-11-09")));
-  exprs.push_back(pool.Add(
-      CreateLiteral(ColumnType::CreateDecimalType(20, 0), "0")));
+  exprs.push_back(CreateLiteral(TYPE_TIMESTAMP, "2016-11-09"));
+  exprs.push_back(CreateLiteral(TYPE_TIMESTAMP, "2016-11-09"));
+  exprs.push_back(CreateLiteral(ColumnType::CreateDecimalType(20, 0), "0"));
   expected_offsets[16].insert(expected_byte_size);
   expected_offsets[16].insert(expected_byte_size + 16);
   expected_offsets[16].insert(expected_byte_size + 32);
   expected_byte_size += 3 * 16;
   ASSERT_EQ(expected_byte_size % 8, 0);
 
-  exprs.push_back(pool.Add(CreateLiteral(TYPE_STRING, "0")));
-  exprs.push_back(pool.Add(CreateLiteral(TYPE_STRING, "0")));
-  exprs.push_back(pool.Add(
-      CreateLiteral(ColumnType::CreateVarcharType(1), "0")));
+  exprs.push_back(CreateLiteral(TYPE_STRING, "0"));
+  exprs.push_back(CreateLiteral(TYPE_STRING, "0"));
+  exprs.push_back(CreateLiteral(ColumnType::CreateVarcharType(1), "0"));
   expected_offsets[0].insert(expected_byte_size);
   expected_offsets[0].insert(expected_byte_size + 16);
   expected_offsets[0].insert(expected_byte_size + 32);
diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt
index 1c7adc6..56a4190 100644
--- a/be/src/rpc/CMakeLists.txt
+++ b/be/src/rpc/CMakeLists.txt
@@ -39,20 +39,20 @@ add_library(Rpc
 )
 add_dependencies(Rpc gen-deps)
 
-ADD_BE_TEST(thrift-util-test)
-ADD_BE_TEST(thrift-server-test)
-# The thrift-server-test uses some utilites from the Kudu security test code.
+ADD_BE_LSAN_TEST(thrift-util-test)
+ADD_BE_TEST(thrift-server-test) # TODO: this test leaks servers
+# The thrift-server-test uses some utilities from the Kudu security test code.
 target_link_libraries(thrift-server-test security-test-for-impala)
 
-ADD_BE_TEST(authentication-test)
+ADD_BE_LSAN_TEST(authentication-test)
 
-ADD_BE_TEST(rpc-mgr-test)
+ADD_BE_TEST(rpc-mgr-test) # TODO: this test leaks various KRPC things
 add_dependencies(rpc-mgr-test rpc_test_proto)
 target_link_libraries(rpc-mgr-test rpc_test_proto)
 target_link_libraries(rpc-mgr-test security-test-for-impala)
 target_link_libraries(rpc-mgr-test ${KRB5_REALM_OVERRIDE})
 
-ADD_BE_TEST(rpc-mgr-kerberized-test)
+ADD_BE_TEST(rpc-mgr-kerberized-test) # TODO: this test leaks various KRPC things
 add_dependencies(rpc-mgr-kerberized-test rpc_test_proto)
 target_link_libraries(rpc-mgr-kerberized-test rpc_test_proto)
 target_link_libraries(rpc-mgr-kerberized-test security-test-for-impala)
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 7dcf45c..a55a394 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -77,22 +77,22 @@ add_library(Runtime
 )
 add_dependencies(Runtime gen-deps)
 
-ADD_BE_TEST(mem-pool-test)
-ADD_BE_TEST(free-pool-test)
-ADD_BE_TEST(string-buffer-test)
-ADD_BE_TEST(data-stream-test)
-ADD_BE_TEST(timestamp-test)
-ADD_BE_TEST(raw-value-test)
-ADD_BE_TEST(string-compare-test)
-ADD_BE_TEST(string-search-test)
-ADD_BE_TEST(string-value-test)
-ADD_BE_TEST(thread-resource-mgr-test)
-ADD_BE_TEST(mem-tracker-test)
-ADD_BE_TEST(multi-precision-test)
-ADD_BE_TEST(decimal-test)
-ADD_BE_TEST(buffered-tuple-stream-test)
-ADD_BE_TEST(hdfs-fs-cache-test)
-ADD_BE_TEST(tmp-file-mgr-test)
-ADD_BE_TEST(row-batch-serialize-test)
-ADD_BE_TEST(row-batch-test)
-ADD_BE_TEST(collection-value-builder-test)
+ADD_BE_LSAN_TEST(mem-pool-test)
+ADD_BE_LSAN_TEST(free-pool-test)
+ADD_BE_LSAN_TEST(string-buffer-test)
+ADD_BE_TEST(data-stream-test) # TODO: this test leaks
+ADD_BE_LSAN_TEST(timestamp-test)
+ADD_BE_LSAN_TEST(raw-value-test)
+ADD_BE_LSAN_TEST(string-compare-test)
+ADD_BE_LSAN_TEST(string-search-test)
+ADD_BE_LSAN_TEST(string-value-test)
+ADD_BE_LSAN_TEST(thread-resource-mgr-test)
+ADD_BE_LSAN_TEST(mem-tracker-test)
+ADD_BE_LSAN_TEST(multi-precision-test)
+ADD_BE_LSAN_TEST(decimal-test)
+ADD_BE_LSAN_TEST(buffered-tuple-stream-test)
+ADD_BE_LSAN_TEST(hdfs-fs-cache-test)
+ADD_BE_LSAN_TEST(tmp-file-mgr-test)
+ADD_BE_LSAN_TEST(row-batch-serialize-test)
+ADD_BE_LSAN_TEST(row-batch-test)
+ADD_BE_LSAN_TEST(collection-value-builder-test)
diff --git a/be/src/runtime/bufferpool/CMakeLists.txt b/be/src/runtime/bufferpool/CMakeLists.txt
index ce68b07..d5c95e9 100644
--- a/be/src/runtime/bufferpool/CMakeLists.txt
+++ b/be/src/runtime/bufferpool/CMakeLists.txt
@@ -31,8 +31,8 @@ add_library(BufferPool
 )
 add_dependencies(BufferPool gen-deps)
 
-ADD_BE_TEST(buffer-allocator-test)
-ADD_BE_TEST(buffer-pool-test)
-ADD_BE_TEST(free-list-test)
-ADD_BE_TEST(reservation-tracker-test)
-ADD_BE_TEST(suballocator-test)
+ADD_BE_LSAN_TEST(buffer-allocator-test)
+ADD_BE_LSAN_TEST(buffer-pool-test)
+ADD_BE_LSAN_TEST(free-list-test)
+ADD_BE_LSAN_TEST(reservation-tracker-test)
+ADD_BE_LSAN_TEST(suballocator-test)
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 1cfc819..77b6a70 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -1960,7 +1960,7 @@ void BufferPoolTest::TestRandomInternalMulti(
   }
 
   AtomicInt32 stop_maintenance(0);
-  thread* maintenance_thread = new thread([&pool, &stop_maintenance]() {
+  thread maintenance_thread([&pool, &stop_maintenance]() {
     while (stop_maintenance.Load() == 0) {
       pool.Maintenance();
       SleepForMs(50);
@@ -1968,7 +1968,7 @@ void BufferPoolTest::TestRandomInternalMulti(
   });
   workers.join_all();
   stop_maintenance.Add(1);
-  maintenance_thread->join();
+  maintenance_thread.join();
   global_reservations_.Close();
 }
 
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 0a0d81e..8851d1a 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -84,10 +84,6 @@ DECLARE_string(datastream_service_queue_mem_limit);
 
 DECLARE_bool(use_krpc);
 
-// We reserve contiguous memory for senders in SetUp. If a test uses more
-// senders, a DCHECK will fail and you should increase this value.
-static const int MAX_SENDERS = 16;
-static const int MAX_RECEIVERS = 16;
 static const PlanNodeId DEST_NODE_ID = 1;
 static const int BATCH_CAPACITY = 100;  // rows
 static const int PER_ROW_DATA = 8;
@@ -238,9 +234,6 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     hash_sink_.output_partition.__isset.partition_exprs = true;
     hash_sink_.output_partition.partition_exprs.push_back(expr);
 
-    // Ensure that individual sender info addresses don't change
-    sender_info_.reserve(MAX_SENDERS);
-    receiver_info_.reserve(MAX_RECEIVERS);
     if (GetParam() == USE_THRIFT) {
       StartThriftBackend();
     } else {
@@ -329,38 +322,36 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   vector<TPlanFragmentDestination> dest_;
 
   struct SenderInfo {
-    thread* thread_handle;
+    unique_ptr<thread> thread_handle;
     Status status;
-    int num_bytes_sent;
-
-    SenderInfo(): thread_handle(nullptr), num_bytes_sent(0) {}
+    int num_bytes_sent = 0;
   };
-  vector<SenderInfo> sender_info_;
+  // Allocate each SenderInfo separately so the address doesn't change.
+  vector<unique_ptr<SenderInfo>> sender_info_;
 
   struct ReceiverInfo {
     TPartitionType::type stream_type;
     int num_senders;
     int receiver_num;
 
-    thread* thread_handle;
+    unique_ptr<thread> thread_handle;
     shared_ptr<DataStreamRecvrBase> stream_recvr;
     Status status;
-    int num_rows_received;
+    int num_rows_received = 0;
     multiset<int64_t> data_values;
 
     ReceiverInfo(TPartitionType::type stream_type, int num_senders, int receiver_num)
       : stream_type(stream_type),
         num_senders(num_senders),
-        receiver_num(receiver_num),
-        thread_handle(nullptr),
-        num_rows_received(0) {}
+        receiver_num(receiver_num) {}
 
     ~ReceiverInfo() {
-      delete thread_handle;
+      thread_handle.reset();
       stream_recvr.reset();
     }
   };
-  vector<ReceiverInfo> receiver_info_;
+  // Allocate each ReceiveInfo separately so the address doesn't change.
+  vector<unique_ptr<ReceiverInfo>> receiver_info_;
 
   // Create an instance id and add it to dest_
   void GetNextInstanceId(TUniqueId* instance_id) {
@@ -447,15 +438,16 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
     TUniqueId instance_id;
     GetNextInstanceId(&instance_id);
-    receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, receiver_num));
-    ReceiverInfo& info = receiver_info_.back();
-    info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
+    receiver_info_.emplace_back(
+        make_unique<ReceiverInfo>(stream_type, num_senders, receiver_num));
+    ReceiverInfo* info = receiver_info_.back().get();
+    info->stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
         num_senders, buffer_size, is_merging, profile, &tracker_, &buffer_pool_client_);
     if (!is_merging) {
-      info.thread_handle = new thread(&DataStreamTest::ReadStream, this, &info);
+      info->thread_handle.reset(new thread(&DataStreamTest::ReadStream, this, info));
     } else {
-      info.thread_handle = new thread(&DataStreamTest::ReadStreamMerging, this, &info,
-          profile);
+      info->thread_handle.reset(new thread(&DataStreamTest::ReadStreamMerging, this, info,
+          profile));
     }
     if (out_id != nullptr) *out_id = instance_id;
   }
@@ -463,8 +455,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   void JoinReceivers() {
     VLOG_QUERY << "join receiver\n";
     for (int i = 0; i < receiver_info_.size(); ++i) {
-      receiver_info_[i].thread_handle->join();
-      receiver_info_[i].stream_recvr->Close();
+      receiver_info_[i]->thread_handle->join();
+      receiver_info_[i]->stream_recvr->Close();
     }
   }
 
@@ -510,20 +502,20 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     int64_t total = 0;
     multiset<int64_t> all_data_values;
     for (int i = 0; i < receiver_info_.size(); ++i) {
-      ReceiverInfo& info = receiver_info_[i];
-      EXPECT_OK(info.status);
-      total += info.data_values.size();
-      ASSERT_EQ(info.stream_type, stream_type);
-      ASSERT_EQ(info.num_senders, num_senders);
+      ReceiverInfo* info = receiver_info_[i].get();
+      EXPECT_OK(info->status);
+      total += info->data_values.size();
+      ASSERT_EQ(info->stream_type, stream_type);
+      ASSERT_EQ(info->num_senders, num_senders);
       if (stream_type == TPartitionType::UNPARTITIONED) {
         EXPECT_EQ(
-            NUM_BATCHES * BATCH_CAPACITY * num_senders, info.data_values.size());
+            NUM_BATCHES * BATCH_CAPACITY * num_senders, info->data_values.size());
       }
-      all_data_values.insert(info.data_values.begin(), info.data_values.end());
+      all_data_values.insert(info->data_values.begin(), info->data_values.end());
 
       int k = 0;
-      for (multiset<int64_t>::iterator j = info.data_values.begin();
-           j != info.data_values.end(); ++j, ++k) {
+      for (multiset<int64_t>::iterator j = info->data_values.begin();
+           j != info->data_values.end(); ++j, ++k) {
         if (stream_type == TPartitionType::UNPARTITIONED) {
           // unpartitioned streams contain all values as many times as there are
           // senders
@@ -533,7 +525,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
           int64_t value = *j;
           uint64_t hash_val = RawValue::GetHashValueFastHash(&value, TYPE_BIGINT,
               DataStreamSender::EXCHANGE_HASH_SEED);
-          EXPECT_EQ(hash_val % receiver_info_.size(), info.receiver_num);
+          EXPECT_EQ(hash_val % receiver_info_.size(), info->receiver_num);
         }
       }
     }
@@ -553,8 +545,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
 
   void CheckSenders() {
     for (int i = 0; i < sender_info_.size(); ++i) {
-      EXPECT_OK(sender_info_[i].status);
-      EXPECT_GT(sender_info_[i].num_bytes_sent, 0);
+      EXPECT_OK(sender_info_[i]->status);
+      EXPECT_GT(sender_info_[i]->num_bytes_sent, 0);
     }
   }
 
@@ -595,18 +587,16 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
                    int channel_buffer_size = 1024) {
     VLOG_QUERY << "start sender";
     int num_senders = sender_info_.size();
-    ASSERT_LT(num_senders, MAX_SENDERS);
-    sender_info_.push_back(SenderInfo());
-    SenderInfo& info = sender_info_.back();
-    info.thread_handle =
+    sender_info_.emplace_back(make_unique<SenderInfo>());
+    sender_info_.back()->thread_handle.reset(
         new thread(&DataStreamTest::Sender, this, num_senders, channel_buffer_size,
-                   partition_type, GetParam() == USE_THRIFT);
+                   partition_type, GetParam() == USE_THRIFT));
   }
 
   void JoinSenders() {
     VLOG_QUERY << "join senders\n";
     for (int i = 0; i < sender_info_.size(); ++i) {
-      sender_info_[i].thread_handle->join();
+      sender_info_[i]->thread_handle->join();
     }
   }
 
@@ -640,22 +630,22 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     EXPECT_OK(sender->Prepare(&state, &tracker_));
     EXPECT_OK(sender->Open(&state));
     scoped_ptr<RowBatch> batch(CreateRowBatch());
-    SenderInfo& info = sender_info_[sender_num];
+    SenderInfo* info = sender_info_[sender_num].get();
     int next_val = 0;
     for (int i = 0; i < NUM_BATCHES; ++i) {
       GetNextBatch(batch.get(), &next_val);
       VLOG_QUERY << "sender " << sender_num << ": #rows=" << batch->num_rows();
-      info.status = sender->Send(&state, batch.get());
-      if (!info.status.ok()) break;
+      info->status = sender->Send(&state, batch.get());
+      if (!info->status.ok()) break;
     }
     VLOG_QUERY << "closing sender" << sender_num;
-    info.status.MergeStatus(sender->FlushFinal(&state));
+    info->status.MergeStatus(sender->FlushFinal(&state));
     sender->Close(&state);
     if (is_thrift) {
-      info.num_bytes_sent = static_cast<DataStreamSender*>(
+      info->num_bytes_sent = static_cast<DataStreamSender*>(
           sender.get())->GetNumDataBytesSent();
     } else {
-      info.num_bytes_sent = static_cast<KrpcDataStreamSender*>(
+      info->num_bytes_sent = static_cast<KrpcDataStreamSender*>(
           sender.get())->GetNumDataBytesSent();
     }
 
@@ -745,7 +735,7 @@ TEST_P(DataStreamTest, UnknownSenderSmallResult) {
   GetNextInstanceId(&dummy_id);
   StartSender(TPartitionType::UNPARTITIONED, TOTAL_DATA_SIZE + 1024);
   JoinSenders();
-  EXPECT_EQ(sender_info_[0].status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
+  EXPECT_EQ(sender_info_[0]->status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
 }
 
 TEST_P(DataStreamTest, UnknownSenderLargeResult) {
@@ -754,7 +744,7 @@ TEST_P(DataStreamTest, UnknownSenderLargeResult) {
   GetNextInstanceId(&dummy_id);
   StartSender();
   JoinSenders();
-  EXPECT_EQ(sender_info_[0].status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
+  EXPECT_EQ(sender_info_[0]->status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
 }
 
 TEST_P(DataStreamTest, Cancel) {
@@ -764,8 +754,8 @@ TEST_P(DataStreamTest, Cancel) {
   StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, true, &instance_id);
   stream_mgr_->Cancel(instance_id);
   JoinReceivers();
-  EXPECT_TRUE(receiver_info_[0].status.IsCancelled());
-  EXPECT_TRUE(receiver_info_[1].status.IsCancelled());
+  EXPECT_TRUE(receiver_info_[0]->status.IsCancelled());
+  EXPECT_TRUE(receiver_info_[1]->status.IsCancelled());
 }
 
 TEST_P(DataStreamTest, BasicTest) {
@@ -867,12 +857,13 @@ TEST_P(DataStreamTestShortDeserQueue, TestNoDeadlock) {
 
   // Setup the receiver.
   RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
-  receiver_info_.push_back(ReceiverInfo(TPartitionType::UNPARTITIONED, 4, 1));
-  ReceiverInfo& info = receiver_info_.back();
-  info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
+  receiver_info_.emplace_back(
+      make_unique<ReceiverInfo>(TPartitionType::UNPARTITIONED, 4, 1));
+  ReceiverInfo* info = receiver_info_.back().get();
+  info->stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
       4, 1024 * 1024, false, profile, &tracker_, &buffer_pool_client_);
-  info.thread_handle = new thread(
-      &DataStreamTestShortDeserQueue_TestNoDeadlock_Test::ReadStream, this, &info);
+  info->thread_handle.reset(new thread(
+      &DataStreamTestShortDeserQueue_TestNoDeadlock_Test::ReadStream, this, info));
 
   JoinSenders();
   CheckSenders();
diff --git a/be/src/runtime/io/CMakeLists.txt b/be/src/runtime/io/CMakeLists.txt
index 226fdc6..4c8199f 100644
--- a/be/src/runtime/io/CMakeLists.txt
+++ b/be/src/runtime/io/CMakeLists.txt
@@ -36,4 +36,4 @@ add_dependencies(Io gen-deps)
 add_executable(disk-io-mgr-stress-test disk-io-mgr-stress-test.cc)
 target_link_libraries(disk-io-mgr-stress-test ${IMPALA_TEST_LINK_LIBS})
 
-ADD_BE_TEST(disk-io-mgr-test)
+ADD_BE_LSAN_TEST(disk-io-mgr-test)
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index c9b6870..16cfbf2 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -96,7 +96,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
     CreateTempFile(files_[i].filename.c_str(), files_[i].data.c_str());
   }
 
-  clients_ = new Client[num_clients_];
+  clients_.reset(new Client[num_clients_]);
   client_mem_trackers_.resize(num_clients_);
   buffer_pool_clients_.reset(new BufferPool::ClientHandle[num_clients_]);
   for (int i = 0; i < num_clients_; ++i) {
@@ -104,6 +104,8 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
   }
 }
 
+DiskIoMgrStress::~DiskIoMgrStress() { }
+
 void DiskIoMgrStress::ClientThread(int client_id) {
   Client* client = &clients_[client_id];
   Status status;
diff --git a/be/src/runtime/io/disk-io-mgr-stress.h b/be/src/runtime/io/disk-io-mgr-stress.h
index 574b58c..1baef24 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.h
+++ b/be/src/runtime/io/disk-io-mgr-stress.h
@@ -42,6 +42,7 @@ class DiskIoMgrStress {
  public:
   DiskIoMgrStress(int num_disks, int num_threads_per_disk, int num_clients,
       bool includes_cancellation);
+  ~DiskIoMgrStress();
 
   /// Run the test for 'sec'.  If 0, run forever
   void Run(int sec);
@@ -84,7 +85,7 @@ class DiskIoMgrStress {
 
   /// Array of clients
   int num_clients_;
-  Client* clients_;
+  std::unique_ptr<Client[]> clients_;
 
   /// Client MemTrackers, one per client.
   std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_;
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 9239658..3d89d04 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -1041,7 +1041,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
   unique_ptr<BufferPool::ClientHandle[]> clients(
       new BufferPool::ClientHandle[NUM_READERS]);
   vector<unique_ptr<RequestContext>> readers(NUM_READERS);
-  vector<char*> results(NUM_READERS);
+  vector<unique_ptr<char[]>> results(NUM_READERS);
 
   // Initialize data for each reader.  The data will be
   // 'abcd...' for reader one, 'bcde...' for reader two (wrapping around at 'z')
@@ -1063,8 +1063,8 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
     stat(file_names[i].c_str(), &stat_val);
     mtimes[i] = stat_val.st_mtime;
 
-    results[i] = new char[DATA_LEN + 1];
-    memset(results[i], 0, DATA_LEN + 1);
+    results[i].reset(new char[DATA_LEN + 1]);
+    memset(results[i].get(), 0, DATA_LEN + 1);
   }
 
   // This exercises concurrency, run the test multiple times
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index 4762928..237484f 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -130,8 +130,7 @@ void MemTracker::CloseAndUnregisterFromParent() {
 }
 
 void MemTracker::EnableReservationReporting(const ReservationTrackerCounters& counters) {
-  ReservationTrackerCounters* new_counters = new ReservationTrackerCounters(counters);
-  reservation_counters_.Store(new_counters);
+  delete reservation_counters_.Swap(new ReservationTrackerCounters(counters));
 }
 
 int64_t MemTracker::GetPoolMemReserved() {
diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index 024e6e1..47fd438 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -33,7 +33,5 @@ add_library(Scheduling STATIC
 )
 add_dependencies(Scheduling gen-deps)
 
-ADD_BE_TEST(scheduler-test)
-ADD_BE_TEST(backend-config-test)
-# TODO: Add BE test
-# ADD_BE_TEST(admission-controller-test)
+ADD_BE_LSAN_TEST(scheduler-test)
+ADD_BE_LSAN_TEST(backend-config-test)
diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index 66cf55d..71704c8 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -74,7 +74,7 @@ target_link_libraries(impalad
   ${IMPALA_LINK_LIBS}
 )
 
-ADD_BE_TEST(session-expiry-test session-expiry-test.cc)
-ADD_BE_TEST(hs2-util-test hs2-util-test.cc)
-ADD_BE_TEST(query-options-test query-options-test.cc)
-ADD_BE_TEST(impala-server-test impala-server-test.cc)
+ADD_BE_TEST(session-expiry-test session-expiry-test.cc) # TODO: this leaks thrift server
+ADD_BE_LSAN_TEST(hs2-util-test hs2-util-test.cc)
+ADD_BE_LSAN_TEST(query-options-test query-options-test.cc)
+ADD_BE_LSAN_TEST(impala-server-test impala-server-test.cc)
diff --git a/be/src/statestore/CMakeLists.txt b/be/src/statestore/CMakeLists.txt
index 313b2ec..a2bc4c0 100644
--- a/be/src/statestore/CMakeLists.txt
+++ b/be/src/statestore/CMakeLists.txt
@@ -30,4 +30,4 @@ add_library(Statestore
 )
 add_dependencies(Statestore gen-deps)
 
-ADD_BE_TEST(statestore-test)
+ADD_BE_LSAN_TEST(statestore-test)
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index 89eb8d1..032f036 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -79,6 +79,8 @@ class FreePool {
 class MemPool {
  public:
   uint8_t* Allocate(int byte_size) {
+    // TODO: this function is called with this == nullptr from UdaTestHarness. This works
+    // for now because MemPool has no members or virtual functions.
     return reinterpret_cast<uint8_t*>(malloc(byte_size));
   }
 };
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 095193d..99c8e6b 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -107,40 +107,40 @@ target_link_libraries(parquet-reader ${IMPALA_LINK_LIBS})
 
 target_link_libraries(loggingsupport ${IMPALA_LINK_LIBS_DYNAMIC_TARGETS})
 
-ADD_BE_TEST(benchmark-test)
-ADD_BE_TEST(bitmap-test)
-ADD_BE_TEST(bit-packing-test)
-ADD_BE_TEST(bit-util-test)
-ADD_BE_TEST(blocking-queue-test)
-ADD_BE_TEST(bloom-filter-test)
-ADD_BE_TEST(coding-util-test)
-ADD_BE_TEST(debug-util-test)
-ADD_BE_TEST(decompress-test)
-ADD_BE_TEST(dict-test)
-ADD_BE_TEST(error-util-test)
-ADD_BE_TEST(filesystem-util-test)
-ADD_BE_TEST(fixed-size-hash-table-test)
-ADD_BE_TEST(hdfs-util-test)
-ADD_BE_TEST(internal-queue-test)
-ADD_BE_TEST(logging-support-test)
-ADD_BE_TEST(lru-cache-test)
-ADD_BE_TEST(metrics-test)
-ADD_BE_TEST(min-max-filter-test)
+ADD_BE_LSAN_TEST(benchmark-test)
+ADD_BE_LSAN_TEST(bitmap-test)
+ADD_BE_LSAN_TEST(bit-packing-test)
+ADD_BE_LSAN_TEST(bit-util-test)
+ADD_BE_LSAN_TEST(blocking-queue-test)
+ADD_BE_LSAN_TEST(bloom-filter-test)
+ADD_BE_LSAN_TEST(coding-util-test)
+ADD_BE_LSAN_TEST(debug-util-test)
+ADD_BE_LSAN_TEST(decompress-test)
+ADD_BE_LSAN_TEST(dict-test)
+ADD_BE_LSAN_TEST(error-util-test)
+ADD_BE_LSAN_TEST(filesystem-util-test)
+ADD_BE_LSAN_TEST(fixed-size-hash-table-test)
+ADD_BE_LSAN_TEST(hdfs-util-test)
+ADD_BE_LSAN_TEST(internal-queue-test)
+ADD_BE_LSAN_TEST(logging-support-test)
+ADD_BE_LSAN_TEST(lru-cache-test)
+ADD_BE_LSAN_TEST(metrics-test)
+ADD_BE_LSAN_TEST(min-max-filter-test)
 ADD_BE_LSAN_TEST(openssl-util-test)
-ADD_BE_TEST(parse-util-test)
-ADD_BE_TEST(pretty-printer-test)
-ADD_BE_TEST(proc-info-test)
-ADD_BE_TEST(promise-test)
-ADD_BE_TEST(redactor-config-parser-test)
-ADD_BE_TEST(redactor-test)
-ADD_BE_TEST(redactor-unconfigured-test)
-ADD_BE_TEST(rle-test)
-ADD_BE_TEST(runtime-profile-test)
-ADD_BE_TEST(string-parser-test)
-ADD_BE_TEST(string-util-test)
-ADD_BE_TEST(symbols-util-test)
-ADD_BE_TEST(sys-info-test)
-ADD_BE_TEST(thread-pool-test)
-ADD_BE_TEST(time-test)
-ADD_BE_TEST(uid-util-test)
-ADD_BE_TEST(webserver-test)
+ADD_BE_LSAN_TEST(parse-util-test)
+ADD_BE_LSAN_TEST(pretty-printer-test)
+ADD_BE_LSAN_TEST(proc-info-test)
+ADD_BE_LSAN_TEST(promise-test)
+ADD_BE_LSAN_TEST(redactor-config-parser-test)
+ADD_BE_LSAN_TEST(redactor-test)
+ADD_BE_LSAN_TEST(redactor-unconfigured-test)
+ADD_BE_LSAN_TEST(rle-test)
+ADD_BE_LSAN_TEST(runtime-profile-test)
+ADD_BE_LSAN_TEST(string-parser-test)
+ADD_BE_LSAN_TEST(string-util-test)
+ADD_BE_LSAN_TEST(symbols-util-test)
+ADD_BE_LSAN_TEST(sys-info-test)
+ADD_BE_LSAN_TEST(thread-pool-test)
+ADD_BE_LSAN_TEST(time-test)
+ADD_BE_LSAN_TEST(uid-util-test)
+ADD_BE_LSAN_TEST(webserver-test)
diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc
index 2a2299c..4d5f8ef 100644
--- a/be/src/util/decompress-test.cc
+++ b/be/src/util/decompress-test.cc
@@ -449,7 +449,7 @@ TEST_F(DecompressorTest, LZ4Huge) {
 
   // Generate a big random payload.
   int payload_len = numeric_limits<int>::max();
-  uint8_t* payload = new uint8_t[payload_len];
+  unique_ptr<uint8_t[]> payload(new uint8_t[payload_len]);
   for (int i = 0 ; i < payload_len; ++i) payload[i] = rand();
 
   scoped_ptr<Codec> compressor;
@@ -462,9 +462,10 @@ TEST_F(DecompressorTest, LZ4Huge) {
 
   // Trying to compress it should give an error
   int64_t compressed_len = max_size;
-  uint8_t* compressed = new uint8_t[max_size];
-  EXPECT_ERROR(compressor->ProcessBlock(true, payload_len, payload,
-      &compressed_len, &compressed), TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE);
+  unique_ptr<uint8_t[]> compressed(new uint8_t[max_size]);
+  uint8_t* compressed_ptr = compressed.get();
+  EXPECT_ERROR(compressor->ProcessBlock(true, payload_len, payload.get(),
+      &compressed_len, &compressed_ptr), TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE);
 }
 
 }
diff --git a/be/src/statestore/CMakeLists.txt b/bin/lsan-suppressions.txt
similarity index 55%
copy from be/src/statestore/CMakeLists.txt
copy to bin/lsan-suppressions.txt
index 313b2ec..f59592b 100644
--- a/be/src/statestore/CMakeLists.txt
+++ b/bin/lsan-suppressions.txt
@@ -15,19 +15,20 @@
 # specific language governing permissions and limitations
 # under the License.
 
+# This file suppresses Leak Sanitizer errors, following
+# https://clang.llvm.org/docs/LeakSanitizer.html
 
-# where to put generated libraries
-set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/statestore")
+# The JVM leaks things directly from libjvm.so and indirectly via inflate().
+leak:libjvm.so
+leak:inflate
 
-# where to put generated binaries
-set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/statestore")
+# Some test utilities have deliberate leaks
+leak:InProcessImpalaServer::StartWithEphemeralPorts
+leak:InProcessStatestore::StartWithEphemeralPorts
 
-add_library(Statestore
-  failure-detector.cc
-  statestore.cc
-  statestore-subscriber.cc
-  statestored-main.cc
-)
-add_dependencies(Statestore gen-deps)
+# MemTestPrepare is deliberately used to simulate a UDF leaking memory.
+leak:MemTestPrepare
 
-ADD_BE_TEST(statestore-test)
+# TODO: IMPALA-2746: fix these unnecessary leaks.
+# The UDF and UDA test harnesses are sloppy and leak result allocations.
+leak:impala::FunctionContextImpl::AllocateForResults