You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2024/01/09 06:52:58 UTC
(doris) branch master updated: [Enhancement](wal) Add fault injection case for wal back pressure (#29689)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new eb84992bbf2 [Enhancement](wal) Add fault injection case for wal back pressure (#29689)
eb84992bbf2 is described below
commit eb84992bbf2419a649aa385fd83bafcb914b2128
Author: abmdocrt <Yu...@gmail.com>
AuthorDate: Tue Jan 9 14:52:51 2024 +0800
[Enhancement](wal) Add fault injection case for wal back pressure (#29689)
---
be/src/runtime/group_commit_mgr.cpp | 6 ++
...m_back_pressure_time_out_fault_injection.groovy | 110 +++++++++++++++++++++
2 files changed, 116 insertions(+)
diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp
index fc4a2df427f..c7333e21d64 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -20,10 +20,13 @@
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
+#include <chrono>
+
#include "client_cache.h"
#include "common/config.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
+#include "util/debug_points.h"
#include "util/thrift_rpc_helper.h"
namespace doris {
@@ -33,6 +36,9 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
std::unique_lock l(mutex);
RETURN_IF_ERROR(status);
auto start = std::chrono::steady_clock::now();
+ DBUG_EXECUTE_IF("LoadBlockQueue.add_block.back_pressure_time_out", {
+ start = std::chrono::steady_clock::now() - std::chrono::milliseconds(120000);
+ });
while (!runtime_state->is_cancelled() && status.ok() &&
_all_block_queues_bytes->load(std::memory_order_relaxed) >=
config::group_commit_queue_mem_limit) {
diff --git a/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_time_out_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_time_out_fault_injection.groovy
new file mode 100644
index 00000000000..8e9dd22f790
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_time_out_fault_injection.groovy
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_wal_mem_back_pressure_time_out_fault_injection","nonConcurrent") {
+
+
+ def tableName = "wal_test"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k` int ,
+ `v` int ,
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k`)
+ BUCKETS 5
+ properties("replication_num" = "1")
+ """
+
+ def enable_back_pressure = {
+ try {
+ def fes = sql_return_maparray "show frontends"
+ def bes = sql_return_maparray "show backends"
+ logger.info("frontends: ${fes}")
+ def fe = fes[0]
+ def be = bes[0]
+ def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/"
+ logger.info("observer url: " + url)
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
+ sb.append("/rest/v2/manager/node/set_config/be")
+ sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"")
+ sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": \\"false\\"}}\"""")
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+
+ sb = new StringBuilder();
+ sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
+ sb.append("/rest/v2/manager/node/set_config/be")
+ sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"")
+ sb.append(""" -d \"{\\"group_commit_memory_rows_for_max_filter_ratio\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": \\"false\\"}}\"""")
+ command = sb.toString()
+ logger.info(command)
+ process = command.execute()
+ } finally {
+ }
+ }
+
+ def disable_back_pressure = {
+ try {
+ def fes = sql_return_maparray "show frontends"
+ def bes = sql_return_maparray "show backends"
+ logger.info("frontends: ${fes}")
+ def fe = fes[0]
+ def be = bes[0]
+ def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/"
+ logger.info("observer url: " + url)
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
+ sb.append("/rest/v2/manager/node/set_config/be")
+ sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"")
+ sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"67108864\\",\\"persist\\": \\"false\\"}}\"""")
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+
+ sb = new StringBuilder();
+ sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
+ sb.append("/rest/v2/manager/node/set_config/be")
+ sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"")
+ sb.append(""" -d \"{\\"group_commit_memory_rows_for_max_filter_ratio\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"10000\\",\\"persist\\": \\"false\\"}}\"""")
+ command = sb.toString()
+ logger.info(command)
+ process = command.execute()
+ } finally {
+ }
+ }
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ enable_back_pressure()
+
+ sql """ set group_commit = async_mode; """
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue.add_block.back_pressure_time_out")
+ sql """insert into ${tableName} values(1,1)"""
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ assertTrue(e.getMessage().contains('Wal memory back pressure wait too much time!'))
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs("LoadBlockQueue.add_block.back_pressure_time_out")
+ }
+
+ disable_back_pressure()
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org