You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2020/02/19 12:23:25 UTC

[incubator-doris] branch master updated: Fix the flush_status bug in flush-executor (#2933)

This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c617fc9  Fix the flush_status bug in flush-executor (#2933)
c617fc9 is described below

commit c617fc9064cc8475b7c6a5ecbe540e3a88e05663
Author: LingBin <li...@gmail.com>
AuthorDate: Wed Feb 19 06:23:19 2020 -0600

    Fix the flush_status bug in flush-executor (#2933)
    
    For a tablet, there may be multiple memtables, which will be
    flushed to disk one by one in the order of generation.
    
    If a memtable flush fails, then the load job will definitely
    fail, but the previous implementation will overwrite `_flush_status`,
    which may make the error can not be detected, leads to an error
    load job to be success.
    
    This patch also have two other changes:
    1. Use `std::bind` to replace `boost::bind`;
    2. Removes some unneeded headers.
---
 be/src/olap/memtable_flush_executor.cpp | 44 ++++++++++++++++++---------------
 be/src/olap/memtable_flush_executor.h   | 23 +++++++++++------
 be/src/olap/tablet_sync_service.cpp     |  2 +-
 be/src/util/threadpool.cpp              | 21 ++++++++--------
 be/src/util/threadpool.h                | 12 ++++-----
 be/test/olap/delta_writer_test.cpp      | 21 ++++++++--------
 be/test/util/threadpool_test.cpp        | 28 ++++++++++-----------
 7 files changed, 81 insertions(+), 70 deletions(-)

diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp
index 00bf34e..e3f6c90 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -19,11 +19,7 @@
 
 #include <functional>
 
-#include "olap/data_dir.h"
-#include "olap/delta_writer.h"
 #include "olap/memtable.h"
-#include "runtime/exec_env.h"
-#include "runtime/mem_tracker.h"
 #include "util/scoped_cleanup.h"
 
 namespace doris {
@@ -34,36 +30,44 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
     return os;
 }
 
-OLAPStatus FlushToken::submit(std::shared_ptr<MemTable> memtable) {
-    _flush_token->submit_func(boost::bind(boost::mem_fn(&FlushToken::_flush_memtable), this, memtable));
+// The type of parameter is safe to be a reference. Because the function object
+// returned by std::bind() will increase the reference count of Memtable. i.e.,
+// after the submit() method returns, even if the caller immediately releases the
+// passed shared_ptr object, the Memtable object will not be destructed because
+// its reference count is not 0.
+OLAPStatus FlushToken::submit(const std::shared_ptr<MemTable>& memtable) {
+    RETURN_NOT_OK(_flush_status.load());
+    _flush_token->submit_func(std::bind(&FlushToken::_flush_memtable, this, memtable));
     return OLAP_SUCCESS;
 }
 
-void FlushToken::cancel() { 
+void FlushToken::cancel() {
     _flush_token->shutdown();
 }
 
-OLAPStatus FlushToken::wait() { 
+OLAPStatus FlushToken::wait() {
     _flush_token->wait();
-    return _flush_status;
+    return _flush_status.load();
 }
 
 void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable) {
+    SCOPED_CLEANUP({ memtable.reset(); });
+
+    // If previous flush has failed, return directly
+    if (_flush_status.load() != OLAP_SUCCESS) {
+        return;
+    }
+
     MonotonicStopWatch timer;
     timer.start();
-    _flush_status = memtable->flush();
-    SCOPED_CLEANUP({
-        memtable.reset();
-    });
-    if (_flush_status != OLAP_SUCCESS) {
+    _flush_status.store(memtable->flush());
+    if (_flush_status.load() != OLAP_SUCCESS) {
         return;
     }
 
     _stats.flush_time_ns += timer.elapsed_time();
     _stats.flush_count++;
     _stats.flush_size_bytes += memtable->memory_usage();
-    LOG(INFO) << "flushed " << *(memtable) << " in " << _stats.flush_time_ns / 1000 / 1000
-              << " ms, status=" << _flush_status;
 }
 
 void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
@@ -71,12 +75,12 @@ void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
     size_t min_threads = std::max(1, config::flush_thread_num_per_store);
     size_t max_threads = data_dir_num * min_threads;
     ThreadPoolBuilder("MemTableFlushThreadPool")
-        .set_min_threads(min_threads)
-        .set_max_threads(max_threads)
-        .build(&_flush_pool);
+            .set_min_threads(min_threads)
+            .set_max_threads(max_threads)
+            .build(&_flush_pool);
 }
 
-// create a flush token
+// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
 OLAPStatus MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>* flush_token) {
     flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
     return OLAP_SUCCESS;
diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h
index bb58e3f..b3d0cbe 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -17,10 +17,9 @@
 
 #pragma once
 
-#include <cstdint>
+#include <atomic>
 #include <memory>
 #include <vector>
-#include <utility>
 
 #include "olap/olap_define.h"
 #include "util/threadpool.h"
@@ -43,13 +42,19 @@ struct FlushStatistic {
 std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat);
 
 // A thin wrapper of ThreadPoolToken to submit task.
+// For a tablet, there may be multiple memtables, which will be flushed to disk
+// one by one in the order of generation.
+// If a memtable flush fails, then:
+// 1. Immediately disallow submission of any subsequent memtable
+// 2. For the memtables that have already been submitted, there is no need to flush,
+//    because the entire job will definitely fail;
 class FlushToken {
 public:
-    explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token)
-        : _flush_status(OLAP_SUCCESS),
-          _flush_token(std::move(flush_pool_token)) {}
+    explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token) :
+            _flush_token(std::move(flush_pool_token)),
+            _flush_status(OLAP_SUCCESS) {  }
 
-    OLAPStatus submit(std::shared_ptr<MemTable> mem_table);
+    OLAPStatus submit(const std::shared_ptr<MemTable>& mem_table);
 
     // error has happpens, so we cancel this token
     // And remove all tasks in the queue.
@@ -64,8 +69,12 @@ public:
 private:
     void _flush_memtable(std::shared_ptr<MemTable> mem_table);
 
-    OLAPStatus _flush_status;
     std::unique_ptr<ThreadPoolToken> _flush_token;
+
+    // Records the current flush status of the tablet.
+    // Note: Once its value is set to Failed, it cannot return to SUCCESS.
+    std::atomic<OLAPStatus> _flush_status;
+
     FlushStatistic _stats;
 };
 
diff --git a/be/src/olap/tablet_sync_service.cpp b/be/src/olap/tablet_sync_service.cpp
index 329a67d..e61c67a 100644
--- a/be/src/olap/tablet_sync_service.cpp
+++ b/be/src/olap/tablet_sync_service.cpp
@@ -152,4 +152,4 @@ void TabletSyncService::_push_tablet_meta_thread(std::vector<PushTabletMetaTask>
     return;
 }
 
-} // doris
\ No newline at end of file
+} // doris
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index cada3c2..582f224 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -36,23 +36,22 @@ using strings::Substitute;
 
 class FunctionRunnable : public Runnable {
 public:
-    explicit FunctionRunnable(boost::function<void()> func)
-        : _func(std::move(func)) {}
+    explicit FunctionRunnable(std::function<void()> func) : _func(std::move(func)) {}
 
     void run() OVERRIDE {
         _func();
     }
 
 private:
-    boost::function<void()> _func;
+    std::function<void()> _func;
 };
 
-ThreadPoolBuilder::ThreadPoolBuilder(string name)
-  : _name(std::move(name)),
-    _min_threads(0),
-    _max_threads(base::NumCPUs()),
-    _max_queue_size(std::numeric_limits<int>::max()),
-    _idle_timeout(MonoDelta::FromMilliseconds(500)) {}
+ThreadPoolBuilder::ThreadPoolBuilder(string name) :
+        _name(std::move(name)),
+        _min_threads(0),
+        _max_threads(base::NumCPUs()),
+        _max_queue_size(std::numeric_limits<int>::max()),
+        _idle_timeout(MonoDelta::FromMilliseconds(500)) {}
 
 ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
     CHECK_GE(min_threads, 0);
@@ -99,7 +98,7 @@ Status ThreadPoolToken::submit(std::shared_ptr<Runnable> r) {
     return _pool->do_submit(std::move(r), this);
 }
 
-Status ThreadPoolToken::submit_func(boost::function<void()> f) {
+Status ThreadPoolToken::submit_func(std::function<void()> f) {
     return submit(std::make_shared<FunctionRunnable>(std::move(f)));
 }
 
@@ -353,7 +352,7 @@ Status ThreadPool::submit(std::shared_ptr<Runnable> r) {
     return do_submit(std::move(r), _tokenless.get());
 }
 
-Status ThreadPool::submit_func(boost::function<void()> f) {
+Status ThreadPool::submit_func(std::function<void()> f) {
     return submit(std::make_shared<FunctionRunnable>(std::move(f)));
 }
 
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index b13f396..af60a69 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -19,12 +19,12 @@
 #define DORIS_BE_SRC_UTIL_THREAD_POOL_H
 
 #include <deque>
+#include <functional>
 #include <memory>
 #include <utility>
 #include <unordered_set>
 #include <string>
 
-#include "boost/function.hpp"
 #include <boost/intrusive/list.hpp>
 #include <boost/intrusive/list_hook.hpp>
 
@@ -150,7 +150,7 @@ private:
 //            .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
 //            .Build(&thread_pool));
 //    thread_pool->Submit(shared_ptr<Runnable>(new Task()));
-//    thread_pool->SubmitFunc(boost::bind(&Func, 10));
+//    thread_pool->SubmitFunc(std::bind(&Func, 10));
 class ThreadPool {
 public:
     ~ThreadPool();
@@ -166,8 +166,8 @@ public:
     // Submits a Runnable class.
     Status submit(std::shared_ptr<Runnable> r);
 
-    // Submits a function bound using boost::bind(&FuncName, args...).
-    Status submit_func(boost::function<void()> f);
+    // Submits a function bound using std::bind(&FuncName, args...).
+    Status submit_func(std::function<void()> f);
 
     // Waits until all the tasks are completed.
     void wait();
@@ -341,8 +341,8 @@ public:
     // Submits a Runnable class.
     Status submit(std::shared_ptr<Runnable> r);
 
-    // Submits a function bound using boost::bind(&FuncName, args...).
-    Status submit_func(boost::function<void()> f);
+    // Submits a function bound using std::bind(&FuncName, args...).
+    Status submit_func(std::function<void()> f);
 
     // Marks the token as unusable for future submissions. Any queued tasks not
     // yet running are destroyed. If tasks are in flight, Shutdown() will wait
diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp
index c4175f1..537433f 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -310,7 +310,7 @@ TEST_F(TestDeltaWriter, open) {
     WriteRequest write_req = {10003, 270068375, WriteType::LOAD,
                               20001, 30001, load_id, false, tuple_desc};
     DeltaWriter* delta_writer = nullptr;
-    DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer); 
+    DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer);
     ASSERT_NE(delta_writer, nullptr);
     res = delta_writer->close();
     ASSERT_EQ(OLAP_SUCCESS, res);
@@ -345,7 +345,7 @@ TEST_F(TestDeltaWriter, write) {
                               20002, 30002, load_id, false, tuple_desc,
                               &(tuple_desc->slots())};
     DeltaWriter* delta_writer = nullptr;
-    DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer); 
+    DeltaWriter::open(&write_req, k_mem_tracker, &delta_writer);
     ASSERT_NE(delta_writer, nullptr);
 
     MemTracker tracker;
@@ -362,18 +362,18 @@ TEST_F(TestDeltaWriter, write) {
         int128_t large_int_value = -90000;
         memcpy(tuple->get_slot(slots[4]->tuple_offset()), &large_int_value, sizeof(int128_t));
 
-        ((DateTimeValue*)(tuple->get_slot(slots[5]->tuple_offset())))->from_date_str("2048-11-10", 10); 
-        ((DateTimeValue*)(tuple->get_slot(slots[6]->tuple_offset())))->from_date_str("2636-08-16 19:39:43", 19); 
+        ((DateTimeValue*)(tuple->get_slot(slots[5]->tuple_offset())))->from_date_str("2048-11-10", 10);
+        ((DateTimeValue*)(tuple->get_slot(slots[6]->tuple_offset())))->from_date_str("2636-08-16 19:39:43", 19);
 
         StringValue* char_ptr = (StringValue*)(tuple->get_slot(slots[7]->tuple_offset()));
         char_ptr->ptr = (char*)pool.allocate(4);
         memcpy(char_ptr->ptr, "abcd", 4);
-        char_ptr->len = 4; 
+        char_ptr->len = 4;
 
         StringValue* var_ptr = (StringValue*)(tuple->get_slot(slots[8]->tuple_offset()));
         var_ptr->ptr = (char*)pool.allocate(5);
         memcpy(var_ptr->ptr, "abcde", 5);
-        var_ptr->len = 5; 
+        var_ptr->len = 5;
 
         DecimalValue decimal_value(1.1);
         *(DecimalValue*)(tuple->get_slot(slots[9]->tuple_offset())) = decimal_value;
@@ -385,13 +385,13 @@ TEST_F(TestDeltaWriter, write) {
 
         memcpy(tuple->get_slot(slots[14]->tuple_offset()), &large_int_value, sizeof(int128_t));
 
-        ((DateTimeValue*)(tuple->get_slot(slots[15]->tuple_offset())))->from_date_str("2048-11-10", 10); 
-        ((DateTimeValue*)(tuple->get_slot(slots[16]->tuple_offset())))->from_date_str("2636-08-16 19:39:43", 19); 
+        ((DateTimeValue*)(tuple->get_slot(slots[15]->tuple_offset())))->from_date_str("2048-11-10", 10);
+        ((DateTimeValue*)(tuple->get_slot(slots[16]->tuple_offset())))->from_date_str("2636-08-16 19:39:43", 19);
 
         char_ptr = (StringValue*)(tuple->get_slot(slots[17]->tuple_offset()));
         char_ptr->ptr = (char*)pool.allocate(4);
         memcpy(char_ptr->ptr, "abcd", 4);
-        char_ptr->len = 4; 
+        char_ptr->len = 4;
 
         var_ptr = (StringValue*)(tuple->get_slot(slots[18]->tuple_offset()));
         var_ptr->ptr = (char*)pool.allocate(5);
@@ -425,7 +425,7 @@ TEST_F(TestDeltaWriter, write) {
         std::cout << "start to publish txn" << std::endl;
         RowsetSharedPtr rowset = tablet_rs.second;
         res = k_engine->txn_manager()->publish_txn(meta, write_req.partition_id, write_req.txn_id,
-                                   write_req.tablet_id, write_req.schema_hash, tablet_rs.first.tablet_uid, 
+                                   write_req.tablet_id, write_req.schema_hash, tablet_rs.first.tablet_uid,
                                    version, version_hash);
         ASSERT_EQ(OLAP_SUCCESS, res);
         std::cout << "start to add inc rowset:" << rowset->rowset_id() << ", num rows:" << rowset->num_rows()
@@ -441,6 +441,7 @@ TEST_F(TestDeltaWriter, write) {
     auto schema_hash = 270068375;
     res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
     ASSERT_EQ(OLAP_SUCCESS, res);
+    delete delta_writer;
 }
 
 } // namespace doris
diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp
index b752de3..f3ea989 100644
--- a/be/test/util/threadpool_test.cpp
+++ b/be/test/util/threadpool_test.cpp
@@ -33,7 +33,6 @@
 #include <gflags/gflags_declare.h>
 #include <gtest/gtest.h>
 
-#include <boost/bind.hpp>
 #include "common/logging.h"
 #include "common/status.h"
 #include "gutil/atomicops.h"
@@ -119,9 +118,9 @@ TEST_F(ThreadPoolTest, TestSimpleTasks) {
     std::atomic<int32_t> counter(0);
     std::shared_ptr<Runnable> task(new SimpleTask(15, &counter));
 
-    ASSERT_TRUE(_pool->submit_func(boost::bind(&simple_task_method, 10, &counter)).ok());
+    ASSERT_TRUE(_pool->submit_func(std::bind(&simple_task_method, 10, &counter)).ok());
     ASSERT_TRUE(_pool->submit(task).ok());
-    ASSERT_TRUE(_pool->submit_func(boost::bind(&simple_task_method, 20, &counter)).ok());
+    ASSERT_TRUE(_pool->submit_func(std::bind(&simple_task_method, 20, &counter)).ok());
     ASSERT_TRUE(_pool->submit(task).ok());
     _pool->wait();
     ASSERT_EQ(10 + 15 + 20 + 15, counter.load());
@@ -219,16 +218,19 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
 TEST_F(ThreadPoolTest, TestRace) {
     alarm(60);
     auto cleanup = MakeScopedCleanup([]() {
-            alarm(0); // Disable alarm on test exit.
-            });
+        alarm(0); // Disable alarm on test exit.
+    });
     ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
-                  .set_min_threads(0)
-                  .set_max_threads(1)
-                  .set_idle_timeout(MonoDelta::FromMicroseconds(1))).ok());
+                .set_min_threads(0)
+                .set_max_threads(1)
+                .set_idle_timeout(MonoDelta::FromMicroseconds(1))).ok());
 
     for (int i = 0; i < 500; i++) {
         CountDownLatch l(1);
-        ASSERT_TRUE(_pool->submit_func(boost::bind(&CountDownLatch::count_down, &l)).ok());
+        // CountDownLatch::count_down has multiple overloaded version,
+        // so an cast is needed to use std::bind
+        ASSERT_TRUE(_pool->submit_func(
+                std::bind((void (CountDownLatch::*)())(&CountDownLatch::count_down), &l)).ok());
         l.wait();
         // Sleeping a different amount in each iteration makes it more likely to hit
         // the bug.
@@ -303,7 +305,6 @@ TEST_F(ThreadPoolTest, TestZeroQueueSize) {
     _pool->shutdown();
 }
 
-/*
 // Test that a thread pool will crash if asked to run its own blocking
 // functions in a pool thread.
 //
@@ -319,20 +320,17 @@ TEST_F(ThreadPoolTest, TestDeadlocks) {
     const char* death_msg = "called pool function that would result in deadlock";
     ASSERT_DEATH({
             ASSERT_TRUE(rebuild_pool_with_min_max(1, 1).ok());
-            ASSERT_TRUE(_pool->submit_func(
-                    Bind(&ThreadPool::shutdown, Unretained(_pool.get()))).ok());
+            ASSERT_TRUE(_pool->submit_func(std::bind((&ThreadPool::shutdown), _pool.get())).ok());
             _pool->wait();
             }, death_msg);
 
     ASSERT_DEATH({
             ASSERT_TRUE(rebuild_pool_with_min_max(1, 1).ok());
-            ASSERT_TRUE(_pool->submit_func(
-                    Bind(&ThreadPool::ok(), Unretained(_pool.get()))).ok());
+            ASSERT_TRUE(_pool->submit_func(std::bind(&ThreadPool::wait, _pool.get())).ok());
             _pool->wait();
             }, death_msg);
 }
 #endif
-*/
 
 class SlowDestructorRunnable : public Runnable {
 public:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org