You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2024/01/09 06:52:58 UTC

(doris) branch master updated: [Enhancement](wal) Add fault injection case for wal back pressure (#29689)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new eb84992bbf2 [Enhancement](wal) Add fault injection case for wal back pressure (#29689)
eb84992bbf2 is described below

commit eb84992bbf2419a649aa385fd83bafcb914b2128
Author: abmdocrt <Yu...@gmail.com>
AuthorDate: Tue Jan 9 14:52:51 2024 +0800

    [Enhancement](wal) Add fault injection case for wal back pressure (#29689)
---
 be/src/runtime/group_commit_mgr.cpp                |   6 ++
 ...m_back_pressure_time_out_fault_injection.groovy | 110 +++++++++++++++++++++
 2 files changed, 116 insertions(+)

diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp
index fc4a2df427f..c7333e21d64 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -20,10 +20,13 @@
 #include <gen_cpp/Types_types.h>
 #include <glog/logging.h>
 
+#include <chrono>
+
 #include "client_cache.h"
 #include "common/config.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
+#include "util/debug_points.h"
 #include "util/thrift_rpc_helper.h"
 
 namespace doris {
@@ -33,6 +36,9 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
     std::unique_lock l(mutex);
     RETURN_IF_ERROR(status);
     auto start = std::chrono::steady_clock::now();
+    DBUG_EXECUTE_IF("LoadBlockQueue.add_block.back_pressure_time_out", {
+        start = std::chrono::steady_clock::now() - std::chrono::milliseconds(120000);
+    });
     while (!runtime_state->is_cancelled() && status.ok() &&
            _all_block_queues_bytes->load(std::memory_order_relaxed) >=
                    config::group_commit_queue_mem_limit) {
diff --git a/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_time_out_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_time_out_fault_injection.groovy
new file mode 100644
index 00000000000..8e9dd22f790
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_time_out_fault_injection.groovy
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_wal_mem_back_pressure_time_out_fault_injection","nonConcurrent") {
+
+
+    def tableName = "wal_test"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k` int ,
+            `v` int ,
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k`) 
+        BUCKETS 5 
+        properties("replication_num" = "1")
+        """
+
+    def enable_back_pressure = {
+        try {
+            def fes = sql_return_maparray "show frontends"
+            def bes = sql_return_maparray "show backends"
+            logger.info("frontends: ${fes}")
+                def fe = fes[0]
+                def be = bes[0]
+                    def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/"
+                    logger.info("observer url: " + url)
+                    StringBuilder sb = new StringBuilder();
+                    sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
+                    sb.append("/rest/v2/manager/node/set_config/be")
+                    sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"")
+                    sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": \\"false\\"}}\"""")
+                    String command = sb.toString()
+                    logger.info(command)
+                    def process = command.execute()
+
+                    sb = new StringBuilder();
+                    sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
+                    sb.append("/rest/v2/manager/node/set_config/be")
+                    sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"")
+                    sb.append(""" -d \"{\\"group_commit_memory_rows_for_max_filter_ratio\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": \\"false\\"}}\"""")
+                    command = sb.toString()
+                    logger.info(command)
+                    process = command.execute()
+        } finally {
+        }
+    }
+
+    def disable_back_pressure = {
+        try {
+            def fes = sql_return_maparray "show frontends"
+            def bes = sql_return_maparray "show backends"
+            logger.info("frontends: ${fes}")
+                def fe = fes[0]
+                def be = bes[0]
+                    def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/"
+                    logger.info("observer url: " + url)
+                    StringBuilder sb = new StringBuilder();
+                    sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
+                    sb.append("/rest/v2/manager/node/set_config/be")
+                    sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"")
+                    sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"67108864\\",\\"persist\\": \\"false\\"}}\"""")
+                    String command = sb.toString()
+                    logger.info(command)
+                    def process = command.execute()
+
+                    sb = new StringBuilder();
+                    sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
+                    sb.append("/rest/v2/manager/node/set_config/be")
+                    sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"")
+                    sb.append(""" -d \"{\\"group_commit_memory_rows_for_max_filter_ratio\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"10000\\",\\"persist\\": \\"false\\"}}\"""")
+                    command = sb.toString()
+                    logger.info(command)
+                    process = command.execute()
+        } finally {
+        }
+    }
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+    enable_back_pressure()
+
+    sql """ set group_commit = async_mode; """
+        try {
+            GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue.add_block.back_pressure_time_out")
+            sql """insert into ${tableName} values(1,1)"""
+        } catch (Exception e) {
+            logger.info(e.getMessage())
+            assertTrue(e.getMessage().contains('Wal memory back pressure wait too much time!'))
+        } finally {
+            GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue.add_block.back_pressure_time_out")
+        }
+
+    disable_back_pressure()
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org