You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/01/04 01:29:00 UTC

[incubator-doris] branch master updated: [Bug] Fix scanner threads heap-use-after-free (#5111)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 17d939b  [Bug] Fix scanner threads heap-use-after-free (#5111)
17d939b is described below

commit 17d939b789eaf7afc84defd2496b5990decf577f
Author: HuangWei <hu...@apache.org>
AuthorDate: Mon Jan 4 09:28:51 2021 +0800

    [Bug] Fix scanner threads heap-use-after-free (#5111)
    
    Scanner threads may be running and using the member vars of OlapScanNode,
    when the OlapScanNode has already destroyed.
    
    We can use `_running_thread` to be the last accessed member variable.
    And `transfer_thread` need to wait for `_running_thread==0`.
    After `transfer_thread` joined, `OlapScanNode::close()` can continue.
---
 be/src/exec/olap_scan_node.cpp | 26 +++++++++++++++++++++-----
 be/src/exec/olap_scan_node.h   |  4 ++--
 be/src/exec/olap_scanner.cpp   |  1 -
 3 files changed, 23 insertions(+), 8 deletions(-)

diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index ab09df1..52635d4 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -59,7 +59,6 @@ OlapScanNode::OlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
           _status(Status::OK()),
           _resource_info(nullptr),
           _buffered_bytes(0),
-          _running_thread(0),
           _eval_conjuncts_fn(nullptr) {}
 
 OlapScanNode::~OlapScanNode() {}
@@ -1251,12 +1250,22 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
 
     state->resource_pool()->release_thread_token(true);
     VLOG(1) << "TransferThread finish.";
-    std::unique_lock<std::mutex> l(_row_batches_lock);
-    _transfer_done = true;
-    _row_batch_added_cv.notify_all();
+    {
+        std::unique_lock<std::mutex> l(_row_batches_lock);
+        _transfer_done = true;
+        _row_batch_added_cv.notify_all();
+    }
+
+    std::unique_lock<std::mutex> l(_scan_batches_lock);
+    _scan_thread_exit_cv.wait(l, [this] { return _running_thread == 0; });
+    VLOG(1) << "Scanner threads have been exited. TransferThread exit.";
 }
 
 void OlapScanNode::scanner_thread(OlapScanner* scanner) {
+    // Do not use ScopedTimer. There is no guarantee that, the counter
+    // (_scan_cpu_timer, the class member) is not destroyed after `_running_thread==0`.
+    ThreadCpuStopWatch cpu_watch;
+    cpu_watch.start();
     Status status = Status::OK();
     bool eos = false;
     RuntimeState* state = scanner->runtime_state();
@@ -1345,7 +1354,6 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
         if (!eos) {
             _olap_scanners.push_front(scanner);
         }
-        _running_thread--;
     }
     if (eos) {
         // close out of batches lock. we do this before _progress update
@@ -1359,7 +1367,15 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
             _scanner_done = true;
         }
     }
+
+    _scan_cpu_timer->update(cpu_watch.elapsed_time());
     _scan_batch_added_cv.notify_one();
+
+    // The transfer thead will wait for `_running_thread==0`, to make sure all scanner threads won't access class members.
+    // Do not access class members after this code.
+    std::unique_lock<std::mutex> l(_scan_batches_lock);
+    _running_thread--;
+    _scan_thread_exit_cv.notify_one();
 }
 
 Status OlapScanNode::add_one_batch(RowBatchInterface* row_batch) {
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index 83ffbd7..b677c86 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -232,7 +232,8 @@ private:
 
     std::mutex _scan_batches_lock;
     std::condition_variable _scan_batch_added_cv;
-    int32_t _scanner_task_finish_count;
+    int64_t _running_thread = 0;
+    std::condition_variable _scan_thread_exit_cv;
 
     std::list<RowBatchInterface*> _scan_row_batches;
 
@@ -260,7 +261,6 @@ private:
     TResourceInfo* _resource_info;
 
     int64_t _buffered_bytes;
-    int64_t _running_thread;
     EvalConjunctsFn _eval_conjuncts_fn;
 
     bool _need_agg_finalize = true;
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 4e7b63d..96c82b5 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -253,7 +253,6 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
     int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num;
     {
         SCOPED_TIMER(_parent->_scan_timer);
-        SCOPED_CPU_TIMER(_parent->_scan_cpu_timer);
         while (true) {
             // Batch is full, break
             if (batch->is_full()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org