You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/21 19:39:50 UTC
[impala] 02/13: IMPALA-2746: part 1: enable LSAN for many backend
tests
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 09e84801a76788bb79ac0bc2115c1c3d36185d67
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Fri Jun 8 14:54:45 2018 -0700
IMPALA-2746: part 1: enable LSAN for many backend tests
This turns on leak sanitizer for backend tests that required
relatively small modifications to pass. We suppress a few
leaks, mainly related to the embedded JVM.
Testing:
Ran core tests under ASAN.
Change-Id: Ibdda092a4eb4bc827c75a8c121e5428ec746b7f4
Reviewed-on: http://gerrit.cloudera.org:8080/10668
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/CMakeLists.txt | 11 +-
be/src/catalog/CMakeLists.txt | 2 +-
be/src/codegen/CMakeLists.txt | 4 +-
be/src/codegen/instruction-counter-test.cc | 54 +++++-----
be/src/common/CMakeLists.txt | 4 +-
be/src/common/atomic-test.cc | 8 +-
be/src/common/atomic.h | 11 ++
be/src/exec/CMakeLists.txt | 20 ++--
be/src/experiments/CMakeLists.txt | 2 +-
be/src/exprs/CMakeLists.txt | 4 +-
be/src/exprs/expr-test.cc | 93 +++++++++--------
be/src/rpc/CMakeLists.txt | 12 +--
be/src/runtime/CMakeLists.txt | 38 +++----
be/src/runtime/bufferpool/CMakeLists.txt | 10 +-
be/src/runtime/bufferpool/buffer-pool-test.cc | 4 +-
be/src/runtime/data-stream-test.cc | 111 ++++++++++-----------
be/src/runtime/io/CMakeLists.txt | 2 +-
be/src/runtime/io/disk-io-mgr-stress.cc | 4 +-
be/src/runtime/io/disk-io-mgr-stress.h | 3 +-
be/src/runtime/io/disk-io-mgr-test.cc | 6 +-
be/src/runtime/mem-tracker.cc | 3 +-
be/src/scheduling/CMakeLists.txt | 6 +-
be/src/service/CMakeLists.txt | 8 +-
be/src/statestore/CMakeLists.txt | 2 +-
be/src/udf/udf.cc | 2 +
be/src/util/CMakeLists.txt | 72 ++++++-------
be/src/util/decompress-test.cc | 9 +-
.../CMakeLists.txt => bin/lsan-suppressions.txt | 25 ++---
28 files changed, 276 insertions(+), 254 deletions(-)
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 1eed615..d817009 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -510,12 +510,18 @@ FUNCTION(ADD_BE_TEST TEST_NAME)
ADD_DEPENDENCIES(be-test ${TEST_NAME})
ENDFUNCTION()
+FUNCTION(ENABLE_LSAN_FOR_TEST TEST_NAME)
+ SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES ENVIRONMENT
+ "ASAN_OPTIONS=handle_segv=0 detect_leaks=1 allocator_may_return_null=1")
+ SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES ENVIRONMENT
+ "LSAN_OPTIONS=suppressions=${CMAKE_SOURCE_DIR}/bin/lsan-suppressions.txt")
+ENDFUNCTION()
+
# Same as ADD_BE_TEST, but also enable LeakSanitizer.
# TODO: IMPALA-2746: we should make this the default.
FUNCTION(ADD_BE_LSAN_TEST TEST_NAME)
ADD_BE_TEST(${TEST_NAME})
- SET_TESTS_PROPERTIES(${TEST_NAME} PROPERTIES ENVIRONMENT
- "ASAN_OPTIONS=handle_segv=0 detect_leaks=1 allocator_may_return_null=1")
+ ENABLE_LSAN_FOR_TEST(${TEST_NAME})
ENDFUNCTION()
# Similar utility function for tests that use the UDF SDK
@@ -532,6 +538,7 @@ FUNCTION(ADD_UDF_TEST TEST_NAME)
ADD_TEST(${TEST_NAME} "${BUILD_OUTPUT_ROOT_DIRECTORY}/${DIR_NAME}/${TEST_NAME}"
-log_dir=$ENV{IMPALA_BE_TEST_LOGS_DIR})
ADD_DEPENDENCIES(be-test ${TEST_NAME})
+ ENABLE_LSAN_FOR_TEST(${TEST_NAME})
ENDFUNCTION()
# Function to generate rule to cross compile a source file to an IR module.
diff --git a/be/src/catalog/CMakeLists.txt b/be/src/catalog/CMakeLists.txt
index 35cccea..ba8d97c 100644
--- a/be/src/catalog/CMakeLists.txt
+++ b/be/src/catalog/CMakeLists.txt
@@ -26,4 +26,4 @@ add_library(Catalog
)
add_dependencies(Catalog gen-deps)
-ADD_BE_TEST(catalog-util-test)
+ADD_BE_LSAN_TEST(catalog-util-test)
diff --git a/be/src/codegen/CMakeLists.txt b/be/src/codegen/CMakeLists.txt
index 56228a2..2709fcb 100644
--- a/be/src/codegen/CMakeLists.txt
+++ b/be/src/codegen/CMakeLists.txt
@@ -107,6 +107,6 @@ add_custom_target(test-loop.bc
SOURCES ${CMAKE_SOURCE_DIR}/testdata/llvm/test-loop.cc
)
-ADD_BE_TEST(llvm-codegen-test)
+ADD_BE_LSAN_TEST(llvm-codegen-test)
add_dependencies(llvm-codegen-test test-loop.bc)
-ADD_BE_TEST(instruction-counter-test)
+ADD_BE_LSAN_TEST(instruction-counter-test)
diff --git a/be/src/codegen/instruction-counter-test.cc b/be/src/codegen/instruction-counter-test.cc
index f404243..5dbc605 100644
--- a/be/src/codegen/instruction-counter-test.cc
+++ b/be/src/codegen/instruction-counter-test.cc
@@ -73,19 +73,19 @@ llvm::Module* CodegenMulAdd(llvm::LLVMContext* context) {
TEST_F(InstructionCounterTest, Count) {
llvm::Module* MulAddModule = CodegenMulAdd(&context_);
- InstructionCounter* instruction_counter = new InstructionCounter();
- instruction_counter->visit(*MulAddModule);
- instruction_counter->PrintCounters();
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_FUNCTIONS), 1);
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_INSTS), 3);
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TERMINATOR_INSTS), 1);
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::MEMORY_INSTS), 0);
+ InstructionCounter instruction_counter;
+ instruction_counter.visit(*MulAddModule);
+ instruction_counter.PrintCounters();
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_FUNCTIONS), 1);
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_INSTS), 3);
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TERMINATOR_INSTS), 1);
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::MEMORY_INSTS), 0);
// Test Reset
- instruction_counter->ResetCount();
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_FUNCTIONS), 0);
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_INSTS), 0);
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::MEMORY_INSTS), 0);
+ instruction_counter.ResetCount();
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_FUNCTIONS), 0);
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_INSTS), 0);
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::MEMORY_INSTS), 0);
}
// IR output from CodegenGcd
@@ -152,25 +152,25 @@ llvm::Module* CodegenGcd(llvm::LLVMContext* context) {
TEST_F(InstructionCounterTest, TestMemInstrCount) {
llvm::Module* GcdModule = CodegenGcd(&context_);
- InstructionCounter* instruction_counter = new InstructionCounter();
- instruction_counter->visit(*GcdModule);
- std::cout << instruction_counter->PrintCounters();
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_FUNCTIONS), 1);
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_BLOCKS), 5);
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_INSTS), 11);
+ InstructionCounter instruction_counter;
+ instruction_counter.visit(*GcdModule);
+ std::cout << instruction_counter.PrintCounters();
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_FUNCTIONS), 1);
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_BLOCKS), 5);
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_INSTS), 11);
// Test Category Totals
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TERMINATOR_INSTS), 5);
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::MEMORY_INSTS), 0);
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::OTHER_INSTS), 4);
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TERMINATOR_INSTS), 5);
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::MEMORY_INSTS), 0);
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::OTHER_INSTS), 4);
// Test Reset
- instruction_counter->ResetCount();
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_FUNCTIONS), 0);
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_BLOCKS), 0);
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TOTAL_INSTS), 0);
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::TERMINATOR_INSTS), 0);
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::MEMORY_INSTS), 0);
- EXPECT_EQ(instruction_counter->GetCount(InstructionCounter::OTHER_INSTS), 0);
+ instruction_counter.ResetCount();
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_FUNCTIONS), 0);
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_BLOCKS), 0);
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TOTAL_INSTS), 0);
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::TERMINATOR_INSTS), 0);
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::MEMORY_INSTS), 0);
+ EXPECT_EQ(instruction_counter.GetCount(InstructionCounter::OTHER_INSTS), 0);
}
} // namespace impala
diff --git a/be/src/common/CMakeLists.txt b/be/src/common/CMakeLists.txt
index c01d71c..daa274d 100644
--- a/be/src/common/CMakeLists.txt
+++ b/be/src/common/CMakeLists.txt
@@ -49,8 +49,8 @@ add_library(GlobalFlags
)
add_dependencies(GlobalFlags gen-deps)
-ADD_BE_TEST(atomic-test)
-ADD_BE_TEST(thread-debug-info-test)
+ADD_BE_LSAN_TEST(atomic-test)
+ADD_BE_LSAN_TEST(thread-debug-info-test)
# Generate config.h from config.h.in, filling in variables from CMake
CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.in
diff --git a/be/src/common/atomic-test.cc b/be/src/common/atomic-test.cc
index ccbaa8e..dc35396 100644
--- a/be/src/common/atomic-test.cc
+++ b/be/src/common/atomic-test.cc
@@ -236,10 +236,10 @@ static void TestAcquireReleaseLoadStore() {
const int ITERS = 1000000;
AtomicInt<T> control(-1);
T payload = -1;
- thread* t_a = new thread(AcquireReleaseThreadA<T>, 0, 1, ITERS, &control, &payload);
- thread* t_b = new thread(AcquireReleaseThreadB<T>, 1, 0, ITERS, &control, &payload);
- t_a->join();
- t_b->join();
+ thread t_a(AcquireReleaseThreadA<T>, 0, 1, ITERS, &control, &payload);
+ thread t_b(AcquireReleaseThreadB<T>, 1, 0, ITERS, &control, &payload);
+ t_a.join();
+ t_b.join();
}
TEST(AtomicTest, MultipleTreadsAcquireReleaseLoadStoreInt) {
diff --git a/be/src/common/atomic.h b/be/src/common/atomic.h
index 0d3e556..4c72826 100644
--- a/be/src/common/atomic.h
+++ b/be/src/common/atomic.h
@@ -105,6 +105,12 @@ class AtomicInt {
return base::subtle::Barrier_CompareAndSwap(&value_, old_val, new_val) == old_val;
}
+ /// Store 'new_val' and return the previous value. Implies a Release memory barrier
+ /// (i.e. the same as Store()).
+ ALWAYS_INLINE T Swap(T new_val) {
+ return base::subtle::Release_AtomicExchange(&value_, new_val);
+ }
+
private:
T value_;
@@ -130,6 +136,11 @@ class AtomicPtr {
/// Atomic store with "release" memory-ordering semantic.
ALWAYS_INLINE void Store(T* val) { ptr_.Store(reinterpret_cast<intptr_t>(val)); }
+ /// Store 'new_val' and return the previous value. Implies a Release memory barrier
+ /// (i.e. the same as Store()).
+ ALWAYS_INLINE T* Swap(T* val) {
+ return reinterpret_cast<T*>(ptr_.Swap(reinterpret_cast<intptr_t>(val)));
+ }
private:
internal::AtomicInt<intptr_t> ptr_;
};
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 2349df4..77c6e15 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -102,13 +102,13 @@ add_library(Exec
add_dependencies(Exec gen-deps)
-ADD_BE_TEST(zigzag-test)
-ADD_BE_TEST(hash-table-test)
-ADD_BE_TEST(delimited-text-parser-test)
-ADD_BE_TEST(read-write-util-test)
-ADD_BE_TEST(parquet-plain-test)
-ADD_BE_TEST(parquet-version-test)
-ADD_BE_TEST(row-batch-list-test)
-ADD_BE_TEST(incr-stats-util-test)
-ADD_BE_TEST(hdfs-avro-scanner-test)
-ADD_BE_TEST(hdfs-parquet-scanner-test)
+ADD_BE_LSAN_TEST(zigzag-test)
+ADD_BE_LSAN_TEST(hash-table-test)
+ADD_BE_LSAN_TEST(delimited-text-parser-test)
+ADD_BE_LSAN_TEST(read-write-util-test)
+ADD_BE_LSAN_TEST(parquet-plain-test)
+ADD_BE_LSAN_TEST(parquet-version-test)
+ADD_BE_LSAN_TEST(row-batch-list-test)
+ADD_BE_LSAN_TEST(incr-stats-util-test)
+ADD_BE_LSAN_TEST(hdfs-avro-scanner-test)
+ADD_BE_LSAN_TEST(hdfs-parquet-scanner-test)
diff --git a/be/src/experiments/CMakeLists.txt b/be/src/experiments/CMakeLists.txt
index b4f0c49..c4b5593 100644
--- a/be/src/experiments/CMakeLists.txt
+++ b/be/src/experiments/CMakeLists.txt
@@ -38,4 +38,4 @@ target_link_libraries(tuple-splitter-test Experiments ${IMPALA_LINK_LIBS})
target_link_libraries(hash-partition-test ${IMPALA_LINK_LIBS})
target_link_libraries(compression-test ${IMPALA_LINK_LIBS})
-ADD_BE_TEST(string-search-sse-test)
+ADD_BE_LSAN_TEST(string-search-sse-test)
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index 755c166..14c65f5 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -66,8 +66,8 @@ add_library(Exprs
)
add_dependencies(Exprs gen-deps gen_ir_descriptions)
-ADD_BE_TEST(expr-test)
-ADD_BE_TEST(expr-codegen-test)
+ADD_BE_LSAN_TEST(expr-test)
+ADD_BE_LSAN_TEST(expr-codegen-test)
# expr-codegen-test includes test IR functions
COMPILE_TO_IR(expr-codegen-test.cc)
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 530fce3..a96b556 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -168,6 +168,9 @@ class ScopedLocalUnixTimestampConversionOverride {
class ExprTest : public testing::Test {
protected:
+ // Pool for objects to be destroyed during test teardown.
+ ObjectPool pool_;
+
// Maps from enum value of primitive integer type to the minimum value that is
// outside of the next smaller-resolution type. For example the value for type
// TYPE_SMALLINT is numeric_limits<int8_t>::max()+1. There is a GREATEST test in
@@ -238,6 +241,8 @@ class ExprTest : public testing::Test {
default_type_strs_[TYPE_DECIMAL] = default_decimal_str_;
}
+ virtual void TearDown() { pool_.Clear(); }
+
string GetValue(const string& expr, const ColumnType& expr_type,
bool expect_error = false) {
string stmt = "select " + expr;
@@ -1023,7 +1028,7 @@ class ExprTest : public testing::Test {
template<class T>
bool ParseString(const string& str, T* val);
- // Create a Literal expression out of 'str'.
+ // Create a Literal expression out of 'str'. Adds the returned literal to pool_.
Literal* CreateLiteral(const ColumnType& type, const string& str);
// Helper function for LiteralConstruction test. Creates a Literal expression
@@ -1051,6 +1056,15 @@ class ExprTest : public testing::Test {
TestIsNull("cast(" + stmt + " as timestamp)", TYPE_TIMESTAMP);
}
}
+
+ // Wrapper around UdfTestHarness::CreateTestContext() that stores the context in
+ // 'pool_' to be automatically cleaned up.
+ FunctionContext* CreateUdfTestContext(const FunctionContext::TypeDesc& return_type,
+ const std::vector<FunctionContext::TypeDesc>& arg_types,
+ RuntimeState* state = nullptr, MemPool* pool = nullptr) {
+ return pool_.Add(
+ UdfTestHarness::CreateTestContext(return_type, arg_types, state, pool));
+ }
};
template<>
@@ -1182,51 +1196,51 @@ Literal* ExprTest::CreateLiteral(const ColumnType& type, const string& str) {
case TYPE_BOOLEAN: {
bool v = false;
EXPECT_TRUE(ParseString<bool>(str, &v));
- return new Literal(type, v);
+ return pool_.Add(new Literal(type, v));
}
case TYPE_TINYINT: {
int8_t v = 0;
EXPECT_TRUE(ParseString<int8_t>(str, &v));
- return new Literal(type, v);
+ return pool_.Add(new Literal(type, v));
}
case TYPE_SMALLINT: {
int16_t v = 0;
EXPECT_TRUE(ParseString<int16_t>(str, &v));
- return new Literal(type, v);
+ return pool_.Add(new Literal(type, v));
}
case TYPE_INT: {
int32_t v = 0;
EXPECT_TRUE(ParseString<int32_t>(str, &v));
- return new Literal(type, v);
+ return pool_.Add(new Literal(type, v));
}
case TYPE_BIGINT: {
int64_t v = 0;
EXPECT_TRUE(ParseString<int64_t>(str, &v));
- return new Literal(type, v);
+ return pool_.Add(new Literal(type, v));
}
case TYPE_FLOAT: {
float v = 0;
EXPECT_TRUE(ParseString<float>(str, &v));
- return new Literal(type, v);
+ return pool_.Add(new Literal(type, v));
}
case TYPE_DOUBLE: {
double v = 0;
EXPECT_TRUE(ParseString<double>(str, &v));
- return new Literal(type, v);
+ return pool_.Add(new Literal(type, v));
}
case TYPE_STRING:
case TYPE_VARCHAR:
case TYPE_CHAR:
- return new Literal(type, str);
+ return pool_.Add(new Literal(type, str));
case TYPE_TIMESTAMP: {
TimestampValue v;
EXPECT_TRUE(ParseString<TimestampValue>(str, &v));
- return new Literal(type, v);
+ return pool_.Add(new Literal(type, v));
}
case TYPE_DECIMAL: {
double v = 0;
EXPECT_TRUE(ParseString<double>(str, &v));
- return new Literal(type, v);
+ return pool_.Add(new Literal(type, v));
}
default:
DCHECK(false) << "Invalid type: " << type.DebugString();
@@ -1238,6 +1252,7 @@ template <typename T>
void ExprTest::TestSingleLiteralConstruction(
const ColumnType& type, const T& value, const string& string_val) {
ObjectPool pool;
+
RuntimeState state(TQueryCtx(), ExecEnv::GetInstance());
MemTracker tracker;
MemPool mem_pool(&tracker);
@@ -3826,7 +3841,7 @@ TEST_F(ExprTest, StringFunctions) {
FunctionContext::TypeDesc str_desc;
str_desc.type = FunctionContext::Type::TYPE_STRING;
std::vector<FunctionContext::TypeDesc> v(3, str_desc);
- auto context = UdfTestHarness::CreateTestContext(str_desc, v, nullptr, &pool);
+ FunctionContext* context = CreateUdfTestContext(str_desc, v, nullptr, &pool);
StringVal giga(static_cast<uint8_t*>(giga_buf->data()), StringVal::MAX_LENGTH);
StringVal a("A");
@@ -3863,7 +3878,7 @@ TEST_F(ExprTest, StringFunctions) {
EXPECT_TRUE(r4.is_null);
// Re-create context to clear the error from failed allocation.
UdfTestHarness::CloseContext(context);
- context = UdfTestHarness::CreateTestContext(str_desc, v, nullptr, &pool);
+ context = CreateUdfTestContext(str_desc, v, nullptr, &pool);
// Similar test for second overflow. This tests overflowing on re-allocation.
(*short_buf)[4095] = 'Z';
@@ -3872,7 +3887,7 @@ TEST_F(ExprTest, StringFunctions) {
EXPECT_TRUE(r5.is_null);
// Re-create context to clear the error from failed allocation.
UdfTestHarness::CloseContext(context);
- context = UdfTestHarness::CreateTestContext(str_desc, v, nullptr, &pool);
+ context = CreateUdfTestContext(str_desc, v, nullptr, &pool);
// Finally, test expanding to exactly MAX_LENGTH
// There are 4 Zs in giga4 (not including the trailing one, as we truncate that)
@@ -7360,8 +7375,6 @@ void ValidateLayout(const vector<ScalarExpr*>& exprs, int expected_byte_size,
}
TEST_F(ExprTest, ResultsLayoutTest) {
- ObjectPool pool;
-
vector<ScalarExpr*> exprs;
map<int, set<int>> expected_offsets;
@@ -7402,9 +7415,9 @@ TEST_F(ExprTest, ResultsLayoutTest) {
// With one expr, all offsets should be 0.
expected_offsets[t.GetByteSize()] = set<int>({0});
if (t.type != TYPE_TIMESTAMP) {
- exprs.push_back(pool.Add(CreateLiteral(t, "0")));
+ exprs.push_back(CreateLiteral(t, "0"));
} else {
- exprs.push_back(pool.Add(CreateLiteral(t, "2016-11-09")));
+ exprs.push_back(CreateLiteral(t, "2016-11-09"));
}
if (t.IsVarLenStringType()) {
ValidateLayout(exprs, 16, 0, expected_offsets);
@@ -7420,28 +7433,27 @@ TEST_F(ExprTest, ResultsLayoutTest) {
// Test layout adding a bunch of exprs. This is designed to trigger padding.
// The expected result is computed along the way
- exprs.push_back(pool.Add(CreateLiteral(TYPE_BOOLEAN, "0")));
- exprs.push_back(pool.Add(CreateLiteral(TYPE_TINYINT, "0")));
- exprs.push_back(pool.Add(CreateLiteral(ColumnType::CreateCharType(1), "0")));
+ exprs.push_back(CreateLiteral(TYPE_BOOLEAN, "0"));
+ exprs.push_back(CreateLiteral(TYPE_TINYINT, "0"));
+ exprs.push_back(CreateLiteral(ColumnType::CreateCharType(1), "0"));
expected_offsets[1].insert(expected_byte_size);
expected_offsets[1].insert(expected_byte_size + 1);
expected_offsets[1].insert(expected_byte_size + 2);
expected_byte_size += 3 * 1 + 1; // 1 byte of padding
- exprs.push_back(pool.Add(CreateLiteral(TYPE_SMALLINT, "0")));
+ exprs.push_back(CreateLiteral(TYPE_SMALLINT, "0"));
expected_offsets[2].insert(expected_byte_size);
expected_byte_size += 2; // No padding before CHAR
- exprs.push_back(pool.Add(CreateLiteral(ColumnType::CreateCharType(3), "0")));
+ exprs.push_back(CreateLiteral(ColumnType::CreateCharType(3), "0"));
expected_offsets[3].insert(expected_byte_size);
expected_byte_size += 3 + 3; // 3 byte of padding
ASSERT_EQ(expected_byte_size % 4, 0);
- exprs.push_back(pool.Add(CreateLiteral(TYPE_INT, "0")));
- exprs.push_back(pool.Add(CreateLiteral(TYPE_FLOAT, "0")));
- exprs.push_back(pool.Add(CreateLiteral(TYPE_FLOAT, "0")));
- exprs.push_back(pool.Add(
- CreateLiteral(ColumnType::CreateDecimalType(9, 0), "0")));
+ exprs.push_back(CreateLiteral(TYPE_INT, "0"));
+ exprs.push_back(CreateLiteral(TYPE_FLOAT, "0"));
+ exprs.push_back(CreateLiteral(TYPE_FLOAT, "0"));
+ exprs.push_back(CreateLiteral(ColumnType::CreateDecimalType(9, 0), "0"));
expected_offsets[4].insert(expected_byte_size);
expected_offsets[4].insert(expected_byte_size + 4);
expected_offsets[4].insert(expected_byte_size + 8);
@@ -7449,12 +7461,11 @@ TEST_F(ExprTest, ResultsLayoutTest) {
expected_byte_size += 4 * 4 + 4; // 4 bytes of padding
ASSERT_EQ(expected_byte_size % 8, 0);
- exprs.push_back(pool.Add(CreateLiteral(TYPE_BIGINT, "0")));
- exprs.push_back(pool.Add(CreateLiteral(TYPE_BIGINT, "0")));
- exprs.push_back(pool.Add(CreateLiteral(TYPE_BIGINT, "0")));
- exprs.push_back(pool.Add(CreateLiteral(TYPE_DOUBLE, "0")));
- exprs.push_back(pool.Add(
- CreateLiteral(ColumnType::CreateDecimalType(18, 0), "0")));
+ exprs.push_back(CreateLiteral(TYPE_BIGINT, "0"));
+ exprs.push_back(CreateLiteral(TYPE_BIGINT, "0"));
+ exprs.push_back(CreateLiteral(TYPE_BIGINT, "0"));
+ exprs.push_back(CreateLiteral(TYPE_DOUBLE, "0"));
+ exprs.push_back(CreateLiteral(ColumnType::CreateDecimalType(18, 0), "0"));
expected_offsets[8].insert(expected_byte_size);
expected_offsets[8].insert(expected_byte_size + 8);
expected_offsets[8].insert(expected_byte_size + 16);
@@ -7463,20 +7474,18 @@ TEST_F(ExprTest, ResultsLayoutTest) {
expected_byte_size += 5 * 8; // No more padding
ASSERT_EQ(expected_byte_size % 8, 0);
- exprs.push_back(pool.Add(CreateLiteral(TYPE_TIMESTAMP, "2016-11-09")));
- exprs.push_back(pool.Add(CreateLiteral(TYPE_TIMESTAMP, "2016-11-09")));
- exprs.push_back(pool.Add(
- CreateLiteral(ColumnType::CreateDecimalType(20, 0), "0")));
+ exprs.push_back(CreateLiteral(TYPE_TIMESTAMP, "2016-11-09"));
+ exprs.push_back(CreateLiteral(TYPE_TIMESTAMP, "2016-11-09"));
+ exprs.push_back(CreateLiteral(ColumnType::CreateDecimalType(20, 0), "0"));
expected_offsets[16].insert(expected_byte_size);
expected_offsets[16].insert(expected_byte_size + 16);
expected_offsets[16].insert(expected_byte_size + 32);
expected_byte_size += 3 * 16;
ASSERT_EQ(expected_byte_size % 8, 0);
- exprs.push_back(pool.Add(CreateLiteral(TYPE_STRING, "0")));
- exprs.push_back(pool.Add(CreateLiteral(TYPE_STRING, "0")));
- exprs.push_back(pool.Add(
- CreateLiteral(ColumnType::CreateVarcharType(1), "0")));
+ exprs.push_back(CreateLiteral(TYPE_STRING, "0"));
+ exprs.push_back(CreateLiteral(TYPE_STRING, "0"));
+ exprs.push_back(CreateLiteral(ColumnType::CreateVarcharType(1), "0"));
expected_offsets[0].insert(expected_byte_size);
expected_offsets[0].insert(expected_byte_size + 16);
expected_offsets[0].insert(expected_byte_size + 32);
diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt
index 1c7adc6..56a4190 100644
--- a/be/src/rpc/CMakeLists.txt
+++ b/be/src/rpc/CMakeLists.txt
@@ -39,20 +39,20 @@ add_library(Rpc
)
add_dependencies(Rpc gen-deps)
-ADD_BE_TEST(thrift-util-test)
-ADD_BE_TEST(thrift-server-test)
-# The thrift-server-test uses some utilites from the Kudu security test code.
+ADD_BE_LSAN_TEST(thrift-util-test)
+ADD_BE_TEST(thrift-server-test) # TODO: this test leaks servers
+# The thrift-server-test uses some utilities from the Kudu security test code.
target_link_libraries(thrift-server-test security-test-for-impala)
-ADD_BE_TEST(authentication-test)
+ADD_BE_LSAN_TEST(authentication-test)
-ADD_BE_TEST(rpc-mgr-test)
+ADD_BE_TEST(rpc-mgr-test) # TODO: this test leaks various KRPC things
add_dependencies(rpc-mgr-test rpc_test_proto)
target_link_libraries(rpc-mgr-test rpc_test_proto)
target_link_libraries(rpc-mgr-test security-test-for-impala)
target_link_libraries(rpc-mgr-test ${KRB5_REALM_OVERRIDE})
-ADD_BE_TEST(rpc-mgr-kerberized-test)
+ADD_BE_TEST(rpc-mgr-kerberized-test) # TODO: this test leaks various KRPC things
add_dependencies(rpc-mgr-kerberized-test rpc_test_proto)
target_link_libraries(rpc-mgr-kerberized-test rpc_test_proto)
target_link_libraries(rpc-mgr-kerberized-test security-test-for-impala)
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 7dcf45c..a55a394 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -77,22 +77,22 @@ add_library(Runtime
)
add_dependencies(Runtime gen-deps)
-ADD_BE_TEST(mem-pool-test)
-ADD_BE_TEST(free-pool-test)
-ADD_BE_TEST(string-buffer-test)
-ADD_BE_TEST(data-stream-test)
-ADD_BE_TEST(timestamp-test)
-ADD_BE_TEST(raw-value-test)
-ADD_BE_TEST(string-compare-test)
-ADD_BE_TEST(string-search-test)
-ADD_BE_TEST(string-value-test)
-ADD_BE_TEST(thread-resource-mgr-test)
-ADD_BE_TEST(mem-tracker-test)
-ADD_BE_TEST(multi-precision-test)
-ADD_BE_TEST(decimal-test)
-ADD_BE_TEST(buffered-tuple-stream-test)
-ADD_BE_TEST(hdfs-fs-cache-test)
-ADD_BE_TEST(tmp-file-mgr-test)
-ADD_BE_TEST(row-batch-serialize-test)
-ADD_BE_TEST(row-batch-test)
-ADD_BE_TEST(collection-value-builder-test)
+ADD_BE_LSAN_TEST(mem-pool-test)
+ADD_BE_LSAN_TEST(free-pool-test)
+ADD_BE_LSAN_TEST(string-buffer-test)
+ADD_BE_TEST(data-stream-test) # TODO: this test leaks
+ADD_BE_LSAN_TEST(timestamp-test)
+ADD_BE_LSAN_TEST(raw-value-test)
+ADD_BE_LSAN_TEST(string-compare-test)
+ADD_BE_LSAN_TEST(string-search-test)
+ADD_BE_LSAN_TEST(string-value-test)
+ADD_BE_LSAN_TEST(thread-resource-mgr-test)
+ADD_BE_LSAN_TEST(mem-tracker-test)
+ADD_BE_LSAN_TEST(multi-precision-test)
+ADD_BE_LSAN_TEST(decimal-test)
+ADD_BE_LSAN_TEST(buffered-tuple-stream-test)
+ADD_BE_LSAN_TEST(hdfs-fs-cache-test)
+ADD_BE_LSAN_TEST(tmp-file-mgr-test)
+ADD_BE_LSAN_TEST(row-batch-serialize-test)
+ADD_BE_LSAN_TEST(row-batch-test)
+ADD_BE_LSAN_TEST(collection-value-builder-test)
diff --git a/be/src/runtime/bufferpool/CMakeLists.txt b/be/src/runtime/bufferpool/CMakeLists.txt
index ce68b07..d5c95e9 100644
--- a/be/src/runtime/bufferpool/CMakeLists.txt
+++ b/be/src/runtime/bufferpool/CMakeLists.txt
@@ -31,8 +31,8 @@ add_library(BufferPool
)
add_dependencies(BufferPool gen-deps)
-ADD_BE_TEST(buffer-allocator-test)
-ADD_BE_TEST(buffer-pool-test)
-ADD_BE_TEST(free-list-test)
-ADD_BE_TEST(reservation-tracker-test)
-ADD_BE_TEST(suballocator-test)
+ADD_BE_LSAN_TEST(buffer-allocator-test)
+ADD_BE_LSAN_TEST(buffer-pool-test)
+ADD_BE_LSAN_TEST(free-list-test)
+ADD_BE_LSAN_TEST(reservation-tracker-test)
+ADD_BE_LSAN_TEST(suballocator-test)
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 1cfc819..77b6a70 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -1960,7 +1960,7 @@ void BufferPoolTest::TestRandomInternalMulti(
}
AtomicInt32 stop_maintenance(0);
- thread* maintenance_thread = new thread([&pool, &stop_maintenance]() {
+ thread maintenance_thread([&pool, &stop_maintenance]() {
while (stop_maintenance.Load() == 0) {
pool.Maintenance();
SleepForMs(50);
@@ -1968,7 +1968,7 @@ void BufferPoolTest::TestRandomInternalMulti(
});
workers.join_all();
stop_maintenance.Add(1);
- maintenance_thread->join();
+ maintenance_thread.join();
global_reservations_.Close();
}
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 0a0d81e..8851d1a 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -84,10 +84,6 @@ DECLARE_string(datastream_service_queue_mem_limit);
DECLARE_bool(use_krpc);
-// We reserve contiguous memory for senders in SetUp. If a test uses more
-// senders, a DCHECK will fail and you should increase this value.
-static const int MAX_SENDERS = 16;
-static const int MAX_RECEIVERS = 16;
static const PlanNodeId DEST_NODE_ID = 1;
static const int BATCH_CAPACITY = 100; // rows
static const int PER_ROW_DATA = 8;
@@ -238,9 +234,6 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
hash_sink_.output_partition.__isset.partition_exprs = true;
hash_sink_.output_partition.partition_exprs.push_back(expr);
- // Ensure that individual sender info addresses don't change
- sender_info_.reserve(MAX_SENDERS);
- receiver_info_.reserve(MAX_RECEIVERS);
if (GetParam() == USE_THRIFT) {
StartThriftBackend();
} else {
@@ -329,38 +322,36 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
vector<TPlanFragmentDestination> dest_;
struct SenderInfo {
- thread* thread_handle;
+ unique_ptr<thread> thread_handle;
Status status;
- int num_bytes_sent;
-
- SenderInfo(): thread_handle(nullptr), num_bytes_sent(0) {}
+ int num_bytes_sent = 0;
};
- vector<SenderInfo> sender_info_;
+ // Allocate each SenderInfo separately so the address doesn't change.
+ vector<unique_ptr<SenderInfo>> sender_info_;
struct ReceiverInfo {
TPartitionType::type stream_type;
int num_senders;
int receiver_num;
- thread* thread_handle;
+ unique_ptr<thread> thread_handle;
shared_ptr<DataStreamRecvrBase> stream_recvr;
Status status;
- int num_rows_received;
+ int num_rows_received = 0;
multiset<int64_t> data_values;
ReceiverInfo(TPartitionType::type stream_type, int num_senders, int receiver_num)
: stream_type(stream_type),
num_senders(num_senders),
- receiver_num(receiver_num),
- thread_handle(nullptr),
- num_rows_received(0) {}
+ receiver_num(receiver_num) {}
~ReceiverInfo() {
- delete thread_handle;
+ thread_handle.reset();
stream_recvr.reset();
}
};
- vector<ReceiverInfo> receiver_info_;
+ // Allocate each ReceiveInfo separately so the address doesn't change.
+ vector<unique_ptr<ReceiverInfo>> receiver_info_;
// Create an instance id and add it to dest_
void GetNextInstanceId(TUniqueId* instance_id) {
@@ -447,15 +438,16 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
TUniqueId instance_id;
GetNextInstanceId(&instance_id);
- receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, receiver_num));
- ReceiverInfo& info = receiver_info_.back();
- info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
+ receiver_info_.emplace_back(
+ make_unique<ReceiverInfo>(stream_type, num_senders, receiver_num));
+ ReceiverInfo* info = receiver_info_.back().get();
+ info->stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
num_senders, buffer_size, is_merging, profile, &tracker_, &buffer_pool_client_);
if (!is_merging) {
- info.thread_handle = new thread(&DataStreamTest::ReadStream, this, &info);
+ info->thread_handle.reset(new thread(&DataStreamTest::ReadStream, this, info));
} else {
- info.thread_handle = new thread(&DataStreamTest::ReadStreamMerging, this, &info,
- profile);
+ info->thread_handle.reset(new thread(&DataStreamTest::ReadStreamMerging, this, info,
+ profile));
}
if (out_id != nullptr) *out_id = instance_id;
}
@@ -463,8 +455,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
void JoinReceivers() {
VLOG_QUERY << "join receiver\n";
for (int i = 0; i < receiver_info_.size(); ++i) {
- receiver_info_[i].thread_handle->join();
- receiver_info_[i].stream_recvr->Close();
+ receiver_info_[i]->thread_handle->join();
+ receiver_info_[i]->stream_recvr->Close();
}
}
@@ -510,20 +502,20 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
int64_t total = 0;
multiset<int64_t> all_data_values;
for (int i = 0; i < receiver_info_.size(); ++i) {
- ReceiverInfo& info = receiver_info_[i];
- EXPECT_OK(info.status);
- total += info.data_values.size();
- ASSERT_EQ(info.stream_type, stream_type);
- ASSERT_EQ(info.num_senders, num_senders);
+ ReceiverInfo* info = receiver_info_[i].get();
+ EXPECT_OK(info->status);
+ total += info->data_values.size();
+ ASSERT_EQ(info->stream_type, stream_type);
+ ASSERT_EQ(info->num_senders, num_senders);
if (stream_type == TPartitionType::UNPARTITIONED) {
EXPECT_EQ(
- NUM_BATCHES * BATCH_CAPACITY * num_senders, info.data_values.size());
+ NUM_BATCHES * BATCH_CAPACITY * num_senders, info->data_values.size());
}
- all_data_values.insert(info.data_values.begin(), info.data_values.end());
+ all_data_values.insert(info->data_values.begin(), info->data_values.end());
int k = 0;
- for (multiset<int64_t>::iterator j = info.data_values.begin();
- j != info.data_values.end(); ++j, ++k) {
+ for (multiset<int64_t>::iterator j = info->data_values.begin();
+ j != info->data_values.end(); ++j, ++k) {
if (stream_type == TPartitionType::UNPARTITIONED) {
// unpartitioned streams contain all values as many times as there are
// senders
@@ -533,7 +525,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
int64_t value = *j;
uint64_t hash_val = RawValue::GetHashValueFastHash(&value, TYPE_BIGINT,
DataStreamSender::EXCHANGE_HASH_SEED);
- EXPECT_EQ(hash_val % receiver_info_.size(), info.receiver_num);
+ EXPECT_EQ(hash_val % receiver_info_.size(), info->receiver_num);
}
}
}
@@ -553,8 +545,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
void CheckSenders() {
for (int i = 0; i < sender_info_.size(); ++i) {
- EXPECT_OK(sender_info_[i].status);
- EXPECT_GT(sender_info_[i].num_bytes_sent, 0);
+ EXPECT_OK(sender_info_[i]->status);
+ EXPECT_GT(sender_info_[i]->num_bytes_sent, 0);
}
}
@@ -595,18 +587,16 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
int channel_buffer_size = 1024) {
VLOG_QUERY << "start sender";
int num_senders = sender_info_.size();
- ASSERT_LT(num_senders, MAX_SENDERS);
- sender_info_.push_back(SenderInfo());
- SenderInfo& info = sender_info_.back();
- info.thread_handle =
+ sender_info_.emplace_back(make_unique<SenderInfo>());
+ sender_info_.back()->thread_handle.reset(
new thread(&DataStreamTest::Sender, this, num_senders, channel_buffer_size,
- partition_type, GetParam() == USE_THRIFT);
+ partition_type, GetParam() == USE_THRIFT));
}
void JoinSenders() {
VLOG_QUERY << "join senders\n";
for (int i = 0; i < sender_info_.size(); ++i) {
- sender_info_[i].thread_handle->join();
+ sender_info_[i]->thread_handle->join();
}
}
@@ -640,22 +630,22 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
EXPECT_OK(sender->Prepare(&state, &tracker_));
EXPECT_OK(sender->Open(&state));
scoped_ptr<RowBatch> batch(CreateRowBatch());
- SenderInfo& info = sender_info_[sender_num];
+ SenderInfo* info = sender_info_[sender_num].get();
int next_val = 0;
for (int i = 0; i < NUM_BATCHES; ++i) {
GetNextBatch(batch.get(), &next_val);
VLOG_QUERY << "sender " << sender_num << ": #rows=" << batch->num_rows();
- info.status = sender->Send(&state, batch.get());
- if (!info.status.ok()) break;
+ info->status = sender->Send(&state, batch.get());
+ if (!info->status.ok()) break;
}
VLOG_QUERY << "closing sender" << sender_num;
- info.status.MergeStatus(sender->FlushFinal(&state));
+ info->status.MergeStatus(sender->FlushFinal(&state));
sender->Close(&state);
if (is_thrift) {
- info.num_bytes_sent = static_cast<DataStreamSender*>(
+ info->num_bytes_sent = static_cast<DataStreamSender*>(
sender.get())->GetNumDataBytesSent();
} else {
- info.num_bytes_sent = static_cast<KrpcDataStreamSender*>(
+ info->num_bytes_sent = static_cast<KrpcDataStreamSender*>(
sender.get())->GetNumDataBytesSent();
}
@@ -745,7 +735,7 @@ TEST_P(DataStreamTest, UnknownSenderSmallResult) {
GetNextInstanceId(&dummy_id);
StartSender(TPartitionType::UNPARTITIONED, TOTAL_DATA_SIZE + 1024);
JoinSenders();
- EXPECT_EQ(sender_info_[0].status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
+ EXPECT_EQ(sender_info_[0]->status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
}
TEST_P(DataStreamTest, UnknownSenderLargeResult) {
@@ -754,7 +744,7 @@ TEST_P(DataStreamTest, UnknownSenderLargeResult) {
GetNextInstanceId(&dummy_id);
StartSender();
JoinSenders();
- EXPECT_EQ(sender_info_[0].status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
+ EXPECT_EQ(sender_info_[0]->status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
}
TEST_P(DataStreamTest, Cancel) {
@@ -764,8 +754,8 @@ TEST_P(DataStreamTest, Cancel) {
StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, true, &instance_id);
stream_mgr_->Cancel(instance_id);
JoinReceivers();
- EXPECT_TRUE(receiver_info_[0].status.IsCancelled());
- EXPECT_TRUE(receiver_info_[1].status.IsCancelled());
+ EXPECT_TRUE(receiver_info_[0]->status.IsCancelled());
+ EXPECT_TRUE(receiver_info_[1]->status.IsCancelled());
}
TEST_P(DataStreamTest, BasicTest) {
@@ -867,12 +857,13 @@ TEST_P(DataStreamTestShortDeserQueue, TestNoDeadlock) {
// Setup the receiver.
RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
- receiver_info_.push_back(ReceiverInfo(TPartitionType::UNPARTITIONED, 4, 1));
- ReceiverInfo& info = receiver_info_.back();
- info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
+ receiver_info_.emplace_back(
+ make_unique<ReceiverInfo>(TPartitionType::UNPARTITIONED, 4, 1));
+ ReceiverInfo* info = receiver_info_.back().get();
+ info->stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
4, 1024 * 1024, false, profile, &tracker_, &buffer_pool_client_);
- info.thread_handle = new thread(
- &DataStreamTestShortDeserQueue_TestNoDeadlock_Test::ReadStream, this, &info);
+ info->thread_handle.reset(new thread(
+ &DataStreamTestShortDeserQueue_TestNoDeadlock_Test::ReadStream, this, info));
JoinSenders();
CheckSenders();
diff --git a/be/src/runtime/io/CMakeLists.txt b/be/src/runtime/io/CMakeLists.txt
index 226fdc6..4c8199f 100644
--- a/be/src/runtime/io/CMakeLists.txt
+++ b/be/src/runtime/io/CMakeLists.txt
@@ -36,4 +36,4 @@ add_dependencies(Io gen-deps)
add_executable(disk-io-mgr-stress-test disk-io-mgr-stress-test.cc)
target_link_libraries(disk-io-mgr-stress-test ${IMPALA_TEST_LINK_LIBS})
-ADD_BE_TEST(disk-io-mgr-test)
+ADD_BE_LSAN_TEST(disk-io-mgr-test)
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index c9b6870..16cfbf2 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -96,7 +96,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
CreateTempFile(files_[i].filename.c_str(), files_[i].data.c_str());
}
- clients_ = new Client[num_clients_];
+ clients_.reset(new Client[num_clients_]);
client_mem_trackers_.resize(num_clients_);
buffer_pool_clients_.reset(new BufferPool::ClientHandle[num_clients_]);
for (int i = 0; i < num_clients_; ++i) {
@@ -104,6 +104,8 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
}
}
+DiskIoMgrStress::~DiskIoMgrStress() { }
+
void DiskIoMgrStress::ClientThread(int client_id) {
Client* client = &clients_[client_id];
Status status;
diff --git a/be/src/runtime/io/disk-io-mgr-stress.h b/be/src/runtime/io/disk-io-mgr-stress.h
index 574b58c..1baef24 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.h
+++ b/be/src/runtime/io/disk-io-mgr-stress.h
@@ -42,6 +42,7 @@ class DiskIoMgrStress {
public:
DiskIoMgrStress(int num_disks, int num_threads_per_disk, int num_clients,
bool includes_cancellation);
+ ~DiskIoMgrStress();
/// Run the test for 'sec'. If 0, run forever
void Run(int sec);
@@ -84,7 +85,7 @@ class DiskIoMgrStress {
/// Array of clients
int num_clients_;
- Client* clients_;
+ std::unique_ptr<Client[]> clients_;
/// Client MemTrackers, one per client.
std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_;
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 9239658..3d89d04 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -1041,7 +1041,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
unique_ptr<BufferPool::ClientHandle[]> clients(
new BufferPool::ClientHandle[NUM_READERS]);
vector<unique_ptr<RequestContext>> readers(NUM_READERS);
- vector<char*> results(NUM_READERS);
+ vector<unique_ptr<char[]>> results(NUM_READERS);
// Initialize data for each reader. The data will be
// 'abcd...' for reader one, 'bcde...' for reader two (wrapping around at 'z')
@@ -1063,8 +1063,8 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
stat(file_names[i].c_str(), &stat_val);
mtimes[i] = stat_val.st_mtime;
- results[i] = new char[DATA_LEN + 1];
- memset(results[i], 0, DATA_LEN + 1);
+ results[i].reset(new char[DATA_LEN + 1]);
+ memset(results[i].get(), 0, DATA_LEN + 1);
}
// This exercises concurrency, run the test multiple times
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index 4762928..237484f 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -130,8 +130,7 @@ void MemTracker::CloseAndUnregisterFromParent() {
}
void MemTracker::EnableReservationReporting(const ReservationTrackerCounters& counters) {
- ReservationTrackerCounters* new_counters = new ReservationTrackerCounters(counters);
- reservation_counters_.Store(new_counters);
+ delete reservation_counters_.Swap(new ReservationTrackerCounters(counters));
}
int64_t MemTracker::GetPoolMemReserved() {
diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index 024e6e1..47fd438 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -33,7 +33,5 @@ add_library(Scheduling STATIC
)
add_dependencies(Scheduling gen-deps)
-ADD_BE_TEST(scheduler-test)
-ADD_BE_TEST(backend-config-test)
-# TODO: Add BE test
-# ADD_BE_TEST(admission-controller-test)
+ADD_BE_LSAN_TEST(scheduler-test)
+ADD_BE_LSAN_TEST(backend-config-test)
diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index 66cf55d..71704c8 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -74,7 +74,7 @@ target_link_libraries(impalad
${IMPALA_LINK_LIBS}
)
-ADD_BE_TEST(session-expiry-test session-expiry-test.cc)
-ADD_BE_TEST(hs2-util-test hs2-util-test.cc)
-ADD_BE_TEST(query-options-test query-options-test.cc)
-ADD_BE_TEST(impala-server-test impala-server-test.cc)
+ADD_BE_TEST(session-expiry-test session-expiry-test.cc) # TODO: this leaks thrift server
+ADD_BE_LSAN_TEST(hs2-util-test hs2-util-test.cc)
+ADD_BE_LSAN_TEST(query-options-test query-options-test.cc)
+ADD_BE_LSAN_TEST(impala-server-test impala-server-test.cc)
diff --git a/be/src/statestore/CMakeLists.txt b/be/src/statestore/CMakeLists.txt
index 313b2ec..a2bc4c0 100644
--- a/be/src/statestore/CMakeLists.txt
+++ b/be/src/statestore/CMakeLists.txt
@@ -30,4 +30,4 @@ add_library(Statestore
)
add_dependencies(Statestore gen-deps)
-ADD_BE_TEST(statestore-test)
+ADD_BE_LSAN_TEST(statestore-test)
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index 89eb8d1..032f036 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -79,6 +79,8 @@ class FreePool {
class MemPool {
public:
uint8_t* Allocate(int byte_size) {
+ // TODO: this function is called with this == nullptr from UdaTestHarness. This works
+ // for now because MemPool has no members or virtual functions.
return reinterpret_cast<uint8_t*>(malloc(byte_size));
}
};
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 095193d..99c8e6b 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -107,40 +107,40 @@ target_link_libraries(parquet-reader ${IMPALA_LINK_LIBS})
target_link_libraries(loggingsupport ${IMPALA_LINK_LIBS_DYNAMIC_TARGETS})
-ADD_BE_TEST(benchmark-test)
-ADD_BE_TEST(bitmap-test)
-ADD_BE_TEST(bit-packing-test)
-ADD_BE_TEST(bit-util-test)
-ADD_BE_TEST(blocking-queue-test)
-ADD_BE_TEST(bloom-filter-test)
-ADD_BE_TEST(coding-util-test)
-ADD_BE_TEST(debug-util-test)
-ADD_BE_TEST(decompress-test)
-ADD_BE_TEST(dict-test)
-ADD_BE_TEST(error-util-test)
-ADD_BE_TEST(filesystem-util-test)
-ADD_BE_TEST(fixed-size-hash-table-test)
-ADD_BE_TEST(hdfs-util-test)
-ADD_BE_TEST(internal-queue-test)
-ADD_BE_TEST(logging-support-test)
-ADD_BE_TEST(lru-cache-test)
-ADD_BE_TEST(metrics-test)
-ADD_BE_TEST(min-max-filter-test)
+ADD_BE_LSAN_TEST(benchmark-test)
+ADD_BE_LSAN_TEST(bitmap-test)
+ADD_BE_LSAN_TEST(bit-packing-test)
+ADD_BE_LSAN_TEST(bit-util-test)
+ADD_BE_LSAN_TEST(blocking-queue-test)
+ADD_BE_LSAN_TEST(bloom-filter-test)
+ADD_BE_LSAN_TEST(coding-util-test)
+ADD_BE_LSAN_TEST(debug-util-test)
+ADD_BE_LSAN_TEST(decompress-test)
+ADD_BE_LSAN_TEST(dict-test)
+ADD_BE_LSAN_TEST(error-util-test)
+ADD_BE_LSAN_TEST(filesystem-util-test)
+ADD_BE_LSAN_TEST(fixed-size-hash-table-test)
+ADD_BE_LSAN_TEST(hdfs-util-test)
+ADD_BE_LSAN_TEST(internal-queue-test)
+ADD_BE_LSAN_TEST(logging-support-test)
+ADD_BE_LSAN_TEST(lru-cache-test)
+ADD_BE_LSAN_TEST(metrics-test)
+ADD_BE_LSAN_TEST(min-max-filter-test)
ADD_BE_LSAN_TEST(openssl-util-test)
-ADD_BE_TEST(parse-util-test)
-ADD_BE_TEST(pretty-printer-test)
-ADD_BE_TEST(proc-info-test)
-ADD_BE_TEST(promise-test)
-ADD_BE_TEST(redactor-config-parser-test)
-ADD_BE_TEST(redactor-test)
-ADD_BE_TEST(redactor-unconfigured-test)
-ADD_BE_TEST(rle-test)
-ADD_BE_TEST(runtime-profile-test)
-ADD_BE_TEST(string-parser-test)
-ADD_BE_TEST(string-util-test)
-ADD_BE_TEST(symbols-util-test)
-ADD_BE_TEST(sys-info-test)
-ADD_BE_TEST(thread-pool-test)
-ADD_BE_TEST(time-test)
-ADD_BE_TEST(uid-util-test)
-ADD_BE_TEST(webserver-test)
+ADD_BE_LSAN_TEST(parse-util-test)
+ADD_BE_LSAN_TEST(pretty-printer-test)
+ADD_BE_LSAN_TEST(proc-info-test)
+ADD_BE_LSAN_TEST(promise-test)
+ADD_BE_LSAN_TEST(redactor-config-parser-test)
+ADD_BE_LSAN_TEST(redactor-test)
+ADD_BE_LSAN_TEST(redactor-unconfigured-test)
+ADD_BE_LSAN_TEST(rle-test)
+ADD_BE_LSAN_TEST(runtime-profile-test)
+ADD_BE_LSAN_TEST(string-parser-test)
+ADD_BE_LSAN_TEST(string-util-test)
+ADD_BE_LSAN_TEST(symbols-util-test)
+ADD_BE_LSAN_TEST(sys-info-test)
+ADD_BE_LSAN_TEST(thread-pool-test)
+ADD_BE_LSAN_TEST(time-test)
+ADD_BE_LSAN_TEST(uid-util-test)
+ADD_BE_LSAN_TEST(webserver-test)
diff --git a/be/src/util/decompress-test.cc b/be/src/util/decompress-test.cc
index 2a2299c..4d5f8ef 100644
--- a/be/src/util/decompress-test.cc
+++ b/be/src/util/decompress-test.cc
@@ -449,7 +449,7 @@ TEST_F(DecompressorTest, LZ4Huge) {
// Generate a big random payload.
int payload_len = numeric_limits<int>::max();
- uint8_t* payload = new uint8_t[payload_len];
+ unique_ptr<uint8_t[]> payload(new uint8_t[payload_len]);
for (int i = 0 ; i < payload_len; ++i) payload[i] = rand();
scoped_ptr<Codec> compressor;
@@ -462,9 +462,10 @@ TEST_F(DecompressorTest, LZ4Huge) {
// Trying to compress it should give an error
int64_t compressed_len = max_size;
- uint8_t* compressed = new uint8_t[max_size];
- EXPECT_ERROR(compressor->ProcessBlock(true, payload_len, payload,
- &compressed_len, &compressed), TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE);
+ unique_ptr<uint8_t[]> compressed(new uint8_t[max_size]);
+ uint8_t* compressed_ptr = compressed.get();
+ EXPECT_ERROR(compressor->ProcessBlock(true, payload_len, payload.get(),
+ &compressed_len, &compressed_ptr), TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE);
}
}
diff --git a/be/src/statestore/CMakeLists.txt b/bin/lsan-suppressions.txt
similarity index 55%
copy from be/src/statestore/CMakeLists.txt
copy to bin/lsan-suppressions.txt
index 313b2ec..f59592b 100644
--- a/be/src/statestore/CMakeLists.txt
+++ b/bin/lsan-suppressions.txt
@@ -15,19 +15,20 @@
# specific language governing permissions and limitations
# under the License.
+# This file suppresses Leak Sanitizer errors, following
+# https://clang.llvm.org/docs/LeakSanitizer.html
-# where to put generated libraries
-set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/statestore")
+# The JVM leaks things directly from libjvm.so and indirectly via inflate().
+leak:libjvm.so
+leak:inflate
-# where to put generated binaries
-set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/statestore")
+# Some test utilities have deliberate leaks
+leak:InProcessImpalaServer::StartWithEphemeralPorts
+leak:InProcessStatestore::StartWithEphemeralPorts
-add_library(Statestore
- failure-detector.cc
- statestore.cc
- statestore-subscriber.cc
- statestored-main.cc
-)
-add_dependencies(Statestore gen-deps)
+# MemTestPrepare is deliberately used to simulate a UDF leaking memory.
+leak:MemTestPrepare
-ADD_BE_TEST(statestore-test)
+# TODO: IMPALA-2746: fix these unnecessary leaks.
+# The UDF and UDA test harnesses are sloppy and leak result allocations.
+leak:impala::FunctionContextImpl::AllocateForResults