You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/08 08:02:41 UTC

[doris] branch branch-1.1-lts updated (ed831aec90 -> 844cfceb76)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


    from ed831aec90 Revert "[improvement](memory) set TCMALLOC_HEAP_LIMIT_MB to control memory consumption of tcmalloc (#12981)" (#13152)
     new 65247d03b0 (runtimefilter) shorter time prepare consumes (#13127)
     new ea7d9ec34f [fix](string) allocate memory according to actual size instead of max size (#13112)
     new 844cfceb76 [improvement](load) config flush_thread_num_per_store to be 6 by default (#13076)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/common/config.h                |  2 +-
 be/src/olap/wrapper_field.cpp         |  6 +-----
 be/src/runtime/runtime_filter_mgr.cpp | 32 ++++++++++++++++++++------------
 be/src/runtime/runtime_filter_mgr.h   | 15 +++++++++++----
 4 files changed, 33 insertions(+), 22 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 03/03: [improvement](load) config flush_thread_num_per_store to be 6 by default (#13076)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 844cfceb76ecd865454ba1d1600d55e69c22b316
Author: Yongqiang YANG <98...@users.noreply.github.com>
AuthorDate: Sat Oct 8 09:16:22 2022 +0800

    [improvement](load) config flush_thread_num_per_store to be 6 by default (#13076)
    
    Flushing memtable is cpu bound, so 2 thread for a disk is tool small.
---
 be/src/common/config.h | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8e91db663a..dfba339af5 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -540,7 +540,7 @@ CONF_mInt32(storage_flood_stage_usage_percent, "90"); // 90%
 // The min bytes that should be left of a data dir
 CONF_mInt64(storage_flood_stage_left_capacity_bytes, "1073741824"); // 1GB
 // number of thread for flushing memtable per store
-CONF_Int32(flush_thread_num_per_store, "2");
+CONF_Int32(flush_thread_num_per_store, "6");
 // number of thread for flushing memtable per store, for high priority load task
 CONF_Int32(high_priority_flush_thread_num_per_store, "1");
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 02/03: [fix](string) allocate memory according to actual size instead of max size (#13112)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ea7d9ec34f9ff611e0e0c0019bf7ac250316bbc6
Author: Yongqiang YANG <98...@users.noreply.github.com>
AuthorDate: Thu Oct 6 09:56:22 2022 +0800

    [fix](string) allocate memory according to actual size instead of max size (#13112)
    
    String column lengh is 2GB, if we allocate memory according to column length,
    string would consume a lot of memory. It also misleads memory tracker.
---
 be/src/olap/wrapper_field.cpp | 6 +-----
 1 file changed, 1 insertion(+), 5 deletions(-)

diff --git a/be/src/olap/wrapper_field.cpp b/be/src/olap/wrapper_field.cpp
index 887b93b259..ffb4516193 100644
--- a/be/src/olap/wrapper_field.cpp
+++ b/be/src/olap/wrapper_field.cpp
@@ -52,11 +52,7 @@ WrapperField* WrapperField::create(const TabletColumn& column, uint32_t len) {
         variable_len =
                 std::max(len, static_cast<uint32_t>(column.length() - sizeof(VarcharLengthType)));
     } else if (column.type() == OLAP_FIELD_TYPE_STRING) {
-        // column.length is the serialized varchar length
-        // the first sizeof(StringLengthType) bytes is the length of varchar
-        // variable_len is the real length of varchar
-        variable_len =
-                std::max(len, static_cast<uint32_t>(column.length() - sizeof(StringLengthType)));
+        variable_len = len;
     } else {
         variable_len = column.length();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 01/03: (runtimefilter) shorter time prepare consumes (#13127)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 65247d03b081f1b2618efab18c6b262fc28850fe
Author: Yongqiang YANG <98...@users.noreply.github.com>
AuthorDate: Thu Oct 6 10:12:29 2022 +0800

    (runtimefilter) shorter time prepare consumes (#13127)
    
    Now, every preare put a runtime filter controller, so it takes the
    mutex lock on the controller map. Init of bloom filter takes some
    time in allocate and memset. If we run p1 tests with -parallel=20
    -suiteParallel=20 -actionParallel=20, then we get error message like
    'send fragment timeout 5s'.
    
    The patch fixes the problem in the following 2 ways:
    1. Replace one mutex block with 128s.
    2. If a plan fragment does not have a runtime filter, it does not need to take
    the locks.
---
 be/src/runtime/runtime_filter_mgr.cpp | 32 ++++++++++++++++++++------------
 be/src/runtime/runtime_filter_mgr.h   | 15 +++++++++++----
 2 files changed, 31 insertions(+), 16 deletions(-)

diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index b5302aeace..ffea62a155 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -272,48 +272,56 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
 Status RuntimeFilterMergeController::add_entity(
         const TExecPlanFragmentParams& params,
         std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) {
+    if (!params.params.__isset.runtime_filter_params ||
+        params.params.runtime_filter_params.rid_to_runtime_filter.size() == 0) {
+        return Status::OK();
+    }
+
     runtime_filter_merge_entity_closer entity_closer =
             std::bind(runtime_filter_merge_entity_close, this, std::placeholders::_1);
 
-    std::lock_guard<std::mutex> guard(_controller_mutex);
     UniqueId query_id(params.params.query_id);
     std::string query_id_str = query_id.to_string();
-    auto iter = _filter_controller_map.find(query_id_str);
     UniqueId fragment_instance_id = UniqueId(params.params.fragment_instance_id);
+    uint32_t shard = _get_controller_shard_idx(query_id);
+    std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
+    auto iter = _filter_controller_map[shard].find(query_id_str);
 
-    if (iter == _filter_controller_map.end()) {
+    if (iter == _filter_controller_map[shard].end()) {
         *handle = std::shared_ptr<RuntimeFilterMergeControllerEntity>(
                 new RuntimeFilterMergeControllerEntity(), entity_closer);
-        _filter_controller_map[query_id_str] = *handle;
+        _filter_controller_map[shard][query_id_str] = *handle;
         const TRuntimeFilterParams& filter_params = params.params.runtime_filter_params;
         if (params.params.__isset.runtime_filter_params) {
             RETURN_IF_ERROR(handle->get()->init(query_id, fragment_instance_id, filter_params, params.query_options));
         }
     } else {
-        *handle = _filter_controller_map[query_id_str].lock();
+        *handle = _filter_controller_map[shard][query_id_str].lock();
     }
     return Status::OK();
 }
 
 Status RuntimeFilterMergeController::acquire(
         UniqueId query_id, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle) {
-    std::lock_guard<std::mutex> guard(_controller_mutex);
+    uint32_t shard = _get_controller_shard_idx(query_id);
+    std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
     std::string query_id_str = query_id.to_string();
-    auto iter = _filter_controller_map.find(query_id_str);
-    if (iter == _filter_controller_map.end()) {
+    auto iter = _filter_controller_map[shard].find(query_id_str);
+    if (iter == _filter_controller_map[shard].end()) {
         LOG(WARNING) << "not found entity, query-id:" << query_id_str;
         return Status::InvalidArgument("not found entity");
     }
-    *handle = _filter_controller_map[query_id_str].lock();
+    *handle = _filter_controller_map[shard][query_id_str].lock();
     if (*handle == nullptr) {
         return Status::InvalidArgument("entity is closed");
     }
     return Status::OK();
 }
 
-Status RuntimeFilterMergeController::remove_entity(UniqueId queryId) {
-    std::lock_guard<std::mutex> guard(_controller_mutex);
-    _filter_controller_map.erase(queryId.to_string());
+Status RuntimeFilterMergeController::remove_entity(UniqueId query_id) {
+    uint32_t shard = _get_controller_shard_idx(query_id);
+    std::lock_guard<std::mutex> guard(_controller_mutex[shard]);
+    _filter_controller_map[shard].erase(query_id.to_string());
     return Status::OK();
 }
 
diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h
index 653ce675b2..346a3cbab8 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -22,6 +22,7 @@
 #include <memory>
 #include <mutex>
 #include <thread>
+#include <unordered_map>
 
 #include "common/object_pool.h"
 #include "common/status.h"
@@ -163,16 +164,22 @@ public:
     // thread safe
     // remove a entity by query-id
     // remove_entity will be called automatically by entity when entity is destroyed
-    Status remove_entity(UniqueId queryId);
+    Status remove_entity(UniqueId query_id);
+
+    static const int kShardNum = 128;
 
 private:
-    std::mutex _controller_mutex;
+    uint32_t _get_controller_shard_idx(UniqueId& query_id) {
+        return (uint32_t)query_id.hi % kShardNum;
+    }
+
+    std::mutex _controller_mutex[kShardNum];
     // We store the weak pointer here.
     // When the external object is destroyed, we need to clear this record
     using FilterControllerMap =
-            std::map<std::string, std::weak_ptr<RuntimeFilterMergeControllerEntity>>;
+            std::unordered_map<std::string, std::weak_ptr<RuntimeFilterMergeControllerEntity>>;
     // str(query-id) -> entity
-    FilterControllerMap _filter_controller_map;
+    FilterControllerMap _filter_controller_map[kShardNum];
 };
 
 using runtime_filter_merge_entity_closer = std::function<void(RuntimeFilterMergeControllerEntity*)>;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org