You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2020/02/19 12:23:25 UTC
[incubator-doris] branch master updated: Fix the flush_status bug
in flush-executor (#2933)
This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new c617fc9 Fix the flush_status bug in flush-executor (#2933)
c617fc9 is described below
commit c617fc9064cc8475b7c6a5ecbe540e3a88e05663
Author: LingBin <li...@gmail.com>
AuthorDate: Wed Feb 19 06:23:19 2020 -0600
Fix the flush_status bug in flush-executor (#2933)
For a tablet, there may be multiple memtables, which will be
flushed to disk one by one in the order of generation.
If a memtable flush fails, then the load job will definitely
fail, but the previous implementation will overwrite `_flush_status`,
which may make the error can not be detected, leads to an error
load job to be success.
This patch also have two other changes:
1. Use `std::bind` to replace `boost::bind`;
2. Removes some unneeded headers.
---
be/src/olap/memtable_flush_executor.cpp | 44 ++++++++++++++++++---------------
be/src/olap/memtable_flush_executor.h | 23 +++++++++++------
be/src/olap/tablet_sync_service.cpp | 2 +-
be/src/util/threadpool.cpp | 21 ++++++++--------
be/src/util/threadpool.h | 12 ++++-----
be/test/olap/delta_writer_test.cpp | 21 ++++++++--------
be/test/util/threadpool_test.cpp | 28 ++++++++++-----------
7 files changed, 81 insertions(+), 70 deletions(-)
diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp
index 00bf34e..e3f6c90 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -19,11 +19,7 @@
#include <functional>
-#include "olap/data_dir.h"
-#include "olap/delta_writer.h"
#include "olap/memtable.h"
-#include "runtime/exec_env.h"
-#include "runtime/mem_tracker.h"
#include "util/scoped_cleanup.h"
namespace doris {
@@ -34,36 +30,44 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
return os;
}
-OLAPStatus FlushToken::submit(std::shared_ptr<MemTable> memtable) {
- _flush_token->submit_func(boost::bind(boost::mem_fn(&FlushToken::_flush_memtable), this, memtable));
+// The type of parameter is safe to be a reference. Because the function object
+// returned by std::bind() will increase the reference count of Memtable. i.e.,
+// after the submit() method returns, even if the caller immediately releases the
+// passed shared_ptr object, the Memtable object will not be destructed because
+// its reference count is not 0.
+OLAPStatus FlushToken::submit(const std::shared_ptr<MemTable>& memtable) {
+ RETURN_NOT_OK(_flush_status.load());
+ _flush_token->submit_func(std::bind(&FlushToken::_flush_memtable, this, memtable));
return OLAP_SUCCESS;
}
-void FlushToken::cancel() {
+void FlushToken::cancel() {
_flush_token->shutdown();
}
-OLAPStatus FlushToken::wait() {
+OLAPStatus FlushToken::wait() {
_flush_token->wait();
- return _flush_status;
+ return _flush_status.load();
}
void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable) {
+ SCOPED_CLEANUP({ memtable.reset(); });
+
+ // If previous flush has failed, return directly
+ if (_flush_status.load() != OLAP_SUCCESS) {
+ return;
+ }
+
MonotonicStopWatch timer;
timer.start();
- _flush_status = memtable->flush();
- SCOPED_CLEANUP({
- memtable.reset();
- });
- if (_flush_status != OLAP_SUCCESS) {
+ _flush_status.store(memtable->flush());
+ if (_flush_status.load() != OLAP_SUCCESS) {
return;
}
_stats.flush_time_ns += timer.elapsed_time();
_stats.flush_count++;
_stats.flush_size_bytes += memtable->memory_usage();
- LOG(INFO) << "flushed " << *(memtable) << " in " << _stats.flush_time_ns / 1000 / 1000
- << " ms, status=" << _flush_status;
}
void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
@@ -71,12 +75,12 @@ void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
size_t min_threads = std::max(1, config::flush_thread_num_per_store);
size_t max_threads = data_dir_num * min_threads;
ThreadPoolBuilder("MemTableFlushThreadPool")
- .set_min_threads(min_threads)
- .set_max_threads(max_threads)
- .build(&_flush_pool);
+ .set_min_threads(min_threads)
+ .set_max_threads(max_threads)
+ .build(&_flush_pool);
}
-// create a flush token
+// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
OLAPStatus MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>* flush_token) {
flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
return OLAP_SUCCESS;
diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h
index bb58e3f..b3d0cbe 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -17,10 +17,9 @@
#pragma once
-#include <cstdint>
+#include <atomic>
#include <memory>
#include <vector>
-#include <utility>
#include "olap/olap_define.h"
#include "util/threadpool.h"
@@ -43,13 +42,19 @@ struct FlushStatistic {
std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat);
// A thin wrapper of ThreadPoolToken to submit task.
+// For a tablet, there may be multiple memtables, which will be flushed to disk
+// one by one in the order of generation.
+// If a memtable flush fails, then:
+// 1. Immediately disallow submission of any subsequent memtable
+// 2. For the memtables that have already been submitted, there is no need to flush,
+// because the entire job will definitely fail;
class FlushToken {
public:
- explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token)
- : _flush_status(OLAP_SUCCESS),
- _flush_token(std::move(flush_pool_token)) {}
+ explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token) :
+ _flush_token(std::move(flush_pool_token)),
+ _flush_status(OLAP_SUCCESS) { }
- OLAPStatus submit(std::shared_ptr<MemTable> mem_table);
+ OLAPStatus submit(const std::shared_ptr<MemTable>& mem_table);
// error has happpens, so we cancel this token
// And remove all tasks in the queue.
@@ -64,8 +69,12 @@ public:
private:
void _flush_memtable(std::shared_ptr<MemTable> mem_table);
- OLAPStatus _flush_status;
std::unique_ptr<ThreadPoolToken> _flush_token;
+
+ // Records the current flush status of the tablet.
+ // Note: Once its value is set to Failed, it cannot return to SUCCESS.
+ std::atomic<OLAPStatus> _flush_status;
+
FlushStatistic _stats;
};
diff --git a/be/src/olap/tablet_sync_service.cpp b/be/src/olap/tablet_sync_service.cpp
index 329a67d..e61c67a 100644
--- a/be/src/olap/tablet_sync_service.cpp
+++ b/be/src/olap/tablet_sync_service.cpp
@@ -152,4 +152,4 @@ void TabletSyncService::_push_tablet_meta_thread(std::vector<PushTabletMetaTask>
return;
}
-} // doris
\ No newline at end of file
+} // doris
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index cada3c2..582f224 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -36,23 +36,22 @@ using strings::Substitute;
class FunctionRunnable : public Runnable {
public:
- explicit FunctionRunnable(boost::function<void()> func)
- : _func(std::move(func)) {}
+ explicit FunctionRunnable(std::function<void()> func) : _func(std::move(func)) {}
void run() OVERRIDE {
_func();
}
private:
- boost::function<void()> _func;
+ std::function<void()> _func;
};
-ThreadPoolBuilder::ThreadPoolBuilder(string name)
- : _name(std::move(name)),
- _min_threads(0),
- _max_threads(base::NumCPUs()),
- _max_queue_size(std::numeric_limits<int>::max()),
- _idle_timeout(MonoDelta::FromMilliseconds(500)) {}
+ThreadPoolBuilder::ThreadPoolBuilder(string name) :
+ _name(std::move(name)),
+ _min_threads(0),
+ _max_threads(base::NumCPUs()),
+ _max_queue_size(std::numeric_limits<int>::max()),
+ _idle_timeout(MonoDelta::FromMilliseconds(500)) {}
ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
CHECK_GE(min_threads, 0);
@@ -99,7 +98,7 @@ Status ThreadPoolToken::submit(std::shared_ptr<Runnable> r) {
return _pool->do_submit(std::move(r), this);
}
-Status ThreadPoolToken::submit_func(boost::function<void()> f) {
+Status ThreadPoolToken::submit_func(std::function<void()> f) {
return submit(std::make_shared<FunctionRunnable>(std::move(f)));
}
@@ -353,7 +352,7 @@ Status ThreadPool::submit(std::shared_ptr<Runnable> r) {
return do_submit(std::move(r), _tokenless.get());
}
-Status ThreadPool::submit_func(boost::function<void()> f) {
+Status ThreadPool::submit_func(std::function<void()> f) {
return submit(std::make_shared<FunctionRunnable>(std::move(f)));
}
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index b13f396..af60a69 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -19,12 +19,12 @@
#define DORIS_BE_SRC_UTIL_THREAD_POOL_H
#include <deque>
+#include <functional>
#include <memory>
#include <utility>
#include <unordered_set>
#include <string>
-#include "boost/function.hpp"
#include <boost/intrusive/list.hpp>
#include <boost/intrusive/list_hook.hpp>
@@ -150,7 +150,7 @@ private:
// .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
// .Build(&thread_pool));
// thread_pool->Submit(shared_ptr<Runnable>(new Task()));
-// thread_pool->SubmitFunc(boost::bind(&Func, 10));
+// thread_pool->SubmitFunc(std::bind(&Func, 10));
class ThreadPool {
public:
~ThreadPool();
@@ -166,8 +166,8 @@ public:
// Submits a Runnable class.
Status submit(std::shared_ptr<Runnable> r);
- // Submits a function bound using boost::bind(&FuncName, args...).
- Status submit_func(boost::function<void()> f);
+ // Submits a function bound using std::bind(&FuncName, args...).
+ Status submit_func(std::function<void()> f);
// Waits until all the tasks are completed.
void wait();
@@ -341,8 +341,8 @@ public:
// Submits a Runnable class.
Status submit(std::shared_ptr<Runnable> r);
- // Submits a function bound using boost::bind(&FuncName, args...).
- Status submit_func(boost::function<void()> f);
+ // Submits a function bound using std::bind(&FuncName, args...).
+ Status submit_func(std::function<void()> f);
// Marks the token as unusable for future submissions. Any queued tasks not
// yet running are destroyed. If tasks are in flight, Shutdown() will wait
diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp
index c4175f1..537433f 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -310,7 +310,7 @@ TEST_F(TestDeltaWriter, open) {
WriteRequest write_req = {10003, 270068375, WriteType::LOAD,
20001, 30001, load_id, false, tuple_desc};
DeltaWriter* delta_writer = nullptr;
- DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer);
+ DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer);
ASSERT_NE(delta_writer, nullptr);
res = delta_writer->close();
ASSERT_EQ(OLAP_SUCCESS, res);
@@ -345,7 +345,7 @@ TEST_F(TestDeltaWriter, write) {
20002, 30002, load_id, false, tuple_desc,
&(tuple_desc->slots())};
DeltaWriter* delta_writer = nullptr;
- DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer);
+ DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer);
ASSERT_NE(delta_writer, nullptr);
MemTracker tracker;
@@ -362,18 +362,18 @@ TEST_F(TestDeltaWriter, write) {
int128_t large_int_value = -90000;
memcpy(tuple->get_slot(slots[4]->tuple_offset()), &large_int_value, sizeof(int128_t));
- ((DateTimeValue*)(tuple->get_slot(slots[5]->tuple_offset())))->from_date_str("2048-11-10", 10);
- ((DateTimeValue*)(tuple->get_slot(slots[6]->tuple_offset())))->from_date_str("2636-08-16 19:39:43", 19);
+ ((DateTimeValue*)(tuple->get_slot(slots[5]->tuple_offset())))->from_date_str("2048-11-10", 10);
+ ((DateTimeValue*)(tuple->get_slot(slots[6]->tuple_offset())))->from_date_str("2636-08-16 19:39:43", 19);
StringValue* char_ptr = (StringValue*)(tuple->get_slot(slots[7]->tuple_offset()));
char_ptr->ptr = (char*)pool.allocate(4);
memcpy(char_ptr->ptr, "abcd", 4);
- char_ptr->len = 4;
+ char_ptr->len = 4;
StringValue* var_ptr = (StringValue*)(tuple->get_slot(slots[8]->tuple_offset()));
var_ptr->ptr = (char*)pool.allocate(5);
memcpy(var_ptr->ptr, "abcde", 5);
- var_ptr->len = 5;
+ var_ptr->len = 5;
DecimalValue decimal_value(1.1);
*(DecimalValue*)(tuple->get_slot(slots[9]->tuple_offset())) = decimal_value;
@@ -385,13 +385,13 @@ TEST_F(TestDeltaWriter, write) {
memcpy(tuple->get_slot(slots[14]->tuple_offset()), &large_int_value, sizeof(int128_t));
- ((DateTimeValue*)(tuple->get_slot(slots[15]->tuple_offset())))->from_date_str("2048-11-10", 10);
- ((DateTimeValue*)(tuple->get_slot(slots[16]->tuple_offset())))->from_date_str("2636-08-16 19:39:43", 19);
+ ((DateTimeValue*)(tuple->get_slot(slots[15]->tuple_offset())))->from_date_str("2048-11-10", 10);
+ ((DateTimeValue*)(tuple->get_slot(slots[16]->tuple_offset())))->from_date_str("2636-08-16 19:39:43", 19);
char_ptr = (StringValue*)(tuple->get_slot(slots[17]->tuple_offset()));
char_ptr->ptr = (char*)pool.allocate(4);
memcpy(char_ptr->ptr, "abcd", 4);
- char_ptr->len = 4;
+ char_ptr->len = 4;
var_ptr = (StringValue*)(tuple->get_slot(slots[18]->tuple_offset()));
var_ptr->ptr = (char*)pool.allocate(5);
@@ -425,7 +425,7 @@ TEST_F(TestDeltaWriter, write) {
std::cout << "start to publish txn" << std::endl;
RowsetSharedPtr rowset = tablet_rs.second;
res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
- write_req.tablet_id, write_req.schema_hash, tablet_rs.first.tablet_uid,
+ write_req.tablet_id, write_req.schema_hash, tablet_rs.first.tablet_uid,
version, version_hash);
ASSERT_EQ(OLAP_SUCCESS, res);
std::cout << "start to add inc rowset:" << rowset->rowset_id() << ", num rows:" << rowset->num_rows()
@@ -441,6 +441,7 @@ TEST_F(TestDeltaWriter, write) {
auto schema_hash = 270068375;
res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
ASSERT_EQ(OLAP_SUCCESS, res);
+ delete delta_writer;
}
} // namespace doris
diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp
index b752de3..f3ea989 100644
--- a/be/test/util/threadpool_test.cpp
+++ b/be/test/util/threadpool_test.cpp
@@ -33,7 +33,6 @@
#include <gflags/gflags_declare.h>
#include <gtest/gtest.h>
-#include <boost/bind.hpp>
#include "common/logging.h"
#include "common/status.h"
#include "gutil/atomicops.h"
@@ -119,9 +118,9 @@ TEST_F(ThreadPoolTest, TestSimpleTasks) {
std::atomic<int32_t> counter(0);
std::shared_ptr<Runnable> task(new SimpleTask(15, &counter));
- ASSERT_TRUE(_pool->submit_func(boost::bind(&simple_task_method, 10, &counter)).ok());
+ ASSERT_TRUE(_pool->submit_func(std::bind(&simple_task_method, 10, &counter)).ok());
ASSERT_TRUE(_pool->submit(task).ok());
- ASSERT_TRUE(_pool->submit_func(boost::bind(&simple_task_method, 20, &counter)).ok());
+ ASSERT_TRUE(_pool->submit_func(std::bind(&simple_task_method, 20, &counter)).ok());
ASSERT_TRUE(_pool->submit(task).ok());
_pool->wait();
ASSERT_EQ(10 + 15 + 20 + 15, counter.load());
@@ -219,16 +218,19 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
TEST_F(ThreadPoolTest, TestRace) {
alarm(60);
auto cleanup = MakeScopedCleanup([]() {
- alarm(0); // Disable alarm on test exit.
- });
+ alarm(0); // Disable alarm on test exit.
+ });
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
- .set_min_threads(0)
- .set_max_threads(1)
- .set_idle_timeout(MonoDelta::FromMicroseconds(1))).ok());
+ .set_min_threads(0)
+ .set_max_threads(1)
+ .set_idle_timeout(MonoDelta::FromMicroseconds(1))).ok());
for (int i = 0; i < 500; i++) {
CountDownLatch l(1);
- ASSERT_TRUE(_pool->submit_func(boost::bind(&CountDownLatch::count_down, &l)).ok());
+ // CountDownLatch::count_down has multiple overloaded version,
+ // so an cast is needed to use std::bind
+ ASSERT_TRUE(_pool->submit_func(
+ std::bind((void (CountDownLatch::*)())(&CountDownLatch::count_down), &l)).ok());
l.wait();
// Sleeping a different amount in each iteration makes it more likely to hit
// the bug.
@@ -303,7 +305,6 @@ TEST_F(ThreadPoolTest, TestZeroQueueSize) {
_pool->shutdown();
}
-/*
// Test that a thread pool will crash if asked to run its own blocking
// functions in a pool thread.
//
@@ -319,20 +320,17 @@ TEST_F(ThreadPoolTest, TestDeadlocks) {
const char* death_msg = "called pool function that would result in deadlock";
ASSERT_DEATH({
ASSERT_TRUE(rebuild_pool_with_min_max(1, 1).ok());
- ASSERT_TRUE(_pool->submit_func(
- Bind(&ThreadPool::shutdown, Unretained(_pool.get()))).ok());
+ ASSERT_TRUE(_pool->submit_func(std::bind((&ThreadPool::shutdown), _pool.get())).ok());
_pool->wait();
}, death_msg);
ASSERT_DEATH({
ASSERT_TRUE(rebuild_pool_with_min_max(1, 1).ok());
- ASSERT_TRUE(_pool->submit_func(
- Bind(&ThreadPool::ok(), Unretained(_pool.get()))).ok());
+ ASSERT_TRUE(_pool->submit_func(std::bind(&ThreadPool::wait, _pool.get())).ok());
_pool->wait();
}, death_msg);
}
#endif
-*/
class SlowDestructorRunnable : public Runnable {
public:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org