You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by la...@apache.org on 2022/12/01 11:49:36 UTC
[incubator-pegasus] branch master updated: feat(config): Make the configs take effect for the FLAGS_* only used once (#1198)
This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new d4b2ce986 feat(config): Make the configs take effect for the FLAGS_* only used once (#1198)
d4b2ce986 is described below
commit d4b2ce986f7165ccea8279b3d4722664763b7c20
Author: Yingchun Lai <la...@apache.org>
AuthorDate: Thu Dec 1 19:49:31 2022 +0800
feat(config): Make the configs take effect for the FLAGS_* only used once (#1198)
---
src/common/replication_common.cpp | 6 --
src/common/replication_common.h | 1 -
src/http/builtin_http_calls.cpp | 5 -
src/http/builtin_http_calls.h | 2 -
src/http/config_http_service.cpp | 18 ----
src/http/http_call_registry.h | 18 +++-
src/http/http_server.cpp | 49 +++++++--
src/http/http_server.h | 27 ++++-
src/meta/meta_http_service.h | 2 +-
..._test.cpp => dup_replica_http_service_test.cpp} | 11 +-
src/replica/replica_http_service.cpp | 3 +
src/replica/replica_http_service.h | 6 +-
src/replica/replica_stub.cpp | 21 +++-
src/replica/replica_stub.h | 5 +-
src/replica/test/replica_http_service_test.cpp | 115 +++++++++++++++++++++
src/runtime/task/async_calls.h | 24 ++---
src/runtime/task/task.cpp | 23 +++--
src/runtime/task/task.h | 9 +-
src/server/info_collector_app.cpp | 5 +
src/utils/flags.h | 5 +
20 files changed, 278 insertions(+), 77 deletions(-)
diff --git a/src/common/replication_common.cpp b/src/common/replication_common.cpp
index 36665f0e3..67ba1db2b 100644
--- a/src/common/replication_common.cpp
+++ b/src/common/replication_common.cpp
@@ -102,7 +102,6 @@ replication_options::replication_options()
log_shared_pending_size_throttling_delay_ms = 0;
config_sync_disabled = false;
- config_sync_interval_ms = 30000;
mem_release_enabled = true;
mem_release_check_interval_ms = 3600000;
@@ -361,11 +360,6 @@ void replication_options::initialize()
"config_sync_disabled",
config_sync_disabled,
"whether to disable replica configuration periodical sync with the meta server");
- config_sync_interval_ms = (int)dsn_config_get_value_uint64(
- "replication",
- "config_sync_interval_ms",
- config_sync_interval_ms,
- "every this period(ms) the replica syncs replica configuration with the meta server");
mem_release_enabled = dsn_config_get_value_bool("replication",
"mem_release_enabled",
diff --git a/src/common/replication_common.h b/src/common/replication_common.h
index 85b019f32..29c72826a 100644
--- a/src/common/replication_common.h
+++ b/src/common/replication_common.h
@@ -113,7 +113,6 @@ public:
int32_t log_shared_pending_size_throttling_delay_ms;
bool config_sync_disabled;
- int32_t config_sync_interval_ms;
bool mem_release_enabled;
int32_t mem_release_check_interval_ms;
diff --git a/src/http/builtin_http_calls.cpp b/src/http/builtin_http_calls.cpp
index 6089e7071..6fb0ab9e1 100644
--- a/src/http/builtin_http_calls.cpp
+++ b/src/http/builtin_http_calls.cpp
@@ -92,11 +92,6 @@ namespace dsn {
})
.with_help("Gets the value of a perf counter");
- register_http_call("updateConfig")
- .with_callback(
- [](const http_request &req, http_response &resp) { update_config(req, resp); })
- .with_help("Updates the value of a config");
-
register_http_call("config")
.with_callback([](const http_request &req, http_response &resp) { get_config(req, resp); })
.with_help("get the details of a specified config");
diff --git a/src/http/builtin_http_calls.h b/src/http/builtin_http_calls.h
index b8f6f3a81..520759311 100644
--- a/src/http/builtin_http_calls.h
+++ b/src/http/builtin_http_calls.h
@@ -40,8 +40,6 @@ extern void get_version_handler(const http_request &req, http_response &resp);
extern void get_recent_start_time_handler(const http_request &req, http_response &resp);
-extern void update_config(const http_request &req, http_response &resp);
-
extern void list_all_configs(const http_request &req, http_response &resp);
extern void get_config(const http_request &req, http_response &resp);
diff --git a/src/http/config_http_service.cpp b/src/http/config_http_service.cpp
index b32cde3cd..7bbddb568 100644
--- a/src/http/config_http_service.cpp
+++ b/src/http/config_http_service.cpp
@@ -20,24 +20,6 @@
#include "utils/output_utils.h"
namespace dsn {
-void update_config(const http_request &req, http_response &resp)
-{
- if (req.query_args.size() != 1) {
- resp.status_code = http_status_code::bad_request;
- return;
- }
-
- auto iter = req.query_args.begin();
- auto res = update_flag(iter->first, iter->second);
-
- utils::table_printer tp;
- tp.add_row_name_and_data("update_status", res.description());
- std::ostringstream out;
- tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
- resp.body = out.str();
- resp.status_code = http_status_code::ok;
-}
-
void list_all_configs(const http_request &req, http_response &resp)
{
if (!req.query_args.empty()) {
diff --git a/src/http/http_call_registry.h b/src/http/http_call_registry.h
index cf5ae11f9..71d798a8c 100644
--- a/src/http/http_call_registry.h
+++ b/src/http/http_call_registry.h
@@ -23,6 +23,10 @@
namespace dsn {
+namespace replication {
+class replica_http_service_test;
+}
+
// A singleton registry for all the HTTP calls
class http_call_registry : public utils::singleton<http_call_registry>
{
@@ -47,7 +51,7 @@ public:
{
auto call = std::shared_ptr<http_call>(call_uptr.release());
std::lock_guard<std::mutex> guard(_mu);
- CHECK_EQ(_call_map.count(call->path), 0);
+ CHECK_EQ_MSG(_call_map.count(call->path), 0, call->path);
_call_map[call->path] = call;
}
@@ -64,6 +68,18 @@ public:
private:
friend class utils::singleton<http_call_registry>;
+ friend class replication::replica_http_service_test;
+
+ // Just for testing.
+ // Since paths are registered to a singleton, some paths will be re-registered when create http
+ // service in test, and cause crash.
+ // We will remove this function when make http_call_registry as a non-singleton.
+ void clear_paths()
+ {
+ std::lock_guard<std::mutex> guard(_mu);
+ _call_map.clear();
+ }
+
http_call_registry() = default;
~http_call_registry() = default;
diff --git a/src/http/http_server.cpp b/src/http/http_server.cpp
index fcc6d68d4..47a6f608f 100644
--- a/src/http/http_server.cpp
+++ b/src/http/http_server.cpp
@@ -16,22 +16,38 @@
// under the License.
#include "http_server.h"
-#include "runtime/tool_api.h"
-#include "utils/time_utils.h"
+
#include <boost/algorithm/string.hpp>
#include <fmt/ostream.h>
-#include "http_message_parser.h"
-#include "pprof_http_service.h"
#include "builtin_http_calls.h"
-#include "uri_decoder.h"
#include "http_call_registry.h"
+#include "http_message_parser.h"
#include "http_server_impl.h"
+#include "pprof_http_service.h"
+#include "runtime/tool_api.h"
+#include "uri_decoder.h"
+#include "utils/output_utils.h"
+#include "utils/time_utils.h"
namespace dsn {
DSN_DEFINE_bool("http", enable_http_server, true, "whether to enable the embedded HTTP server");
+namespace {
+error_s update_config(const http_request &req)
+{
+ if (req.query_args.size() != 1) {
+ return error_s::make(ERR_INVALID_PARAMETERS,
+ "there should be exactly one config to be updated once");
+ }
+
+ auto iter = req.query_args.begin();
+ return update_flag(iter->first, iter->second);
+}
+
+} // anonymous namespace
+
/*extern*/ std::string http_status_code_to_string(http_status_code code)
{
switch (code) {
@@ -65,21 +81,38 @@ DSN_DEFINE_bool("http", enable_http_server, true, "whether to enable the embedde
http_call_registry::instance().remove(full_path);
}
-void http_service::register_handler(std::string path, http_callback cb, std::string help)
+void http_service::register_handler(std::string sub_path, http_callback cb, std::string help)
{
+ CHECK(!sub_path.empty(), "");
if (!FLAGS_enable_http_server) {
return;
}
auto call = make_unique<http_call>();
call->path = this->path();
- if (!path.empty()) {
- call->path += "/" + std::move(path);
+ if (!call->path.empty()) {
+ call->path += '/';
}
+ call->path += sub_path;
call->callback = std::move(cb);
call->help = std::move(help);
http_call_registry::instance().add(std::move(call));
}
+void http_server_base::update_config_handler(const http_request &req, http_response &resp)
+{
+ auto res = dsn::update_config(req);
+ if (res.is_ok()) {
+ CHECK_EQ(1, req.query_args.size());
+ update_config(req.query_args.begin()->first);
+ }
+ utils::table_printer tp;
+ tp.add_row_name_and_data("update_status", res.description());
+ std::ostringstream out;
+ tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
+ resp.body = out.str();
+ resp.status_code = http_status_code::ok;
+}
+
http_server::http_server() : serverlet<http_server>("http_server")
{
if (!FLAGS_enable_http_server) {
diff --git a/src/http/http_server.h b/src/http/http_server.h
index 3b60829a8..15d312018 100644
--- a/src/http/http_server.h
+++ b/src/http/http_server.h
@@ -100,7 +100,31 @@ public:
virtual std::string path() const = 0;
- void register_handler(std::string path, http_callback cb, std::string help);
+ void register_handler(std::string sub_path, http_callback cb, std::string help);
+};
+
+class http_server_base : public http_service
+{
+public:
+ explicit http_server_base()
+ {
+ static std::once_flag flag;
+ std::call_once(flag, [&]() {
+ register_handler("updateConfig",
+ std::bind(&http_server_base::update_config_handler,
+ this,
+ std::placeholders::_1,
+ std::placeholders::_2),
+ "ip:port/updateConfig?<key>=<value>");
+ });
+ }
+
+ std::string path() const override { return ""; }
+
+protected:
+ void update_config_handler(const http_request &req, http_response &resp);
+
+ virtual void update_config(const std::string &name) {}
};
// Example:
@@ -128,4 +152,5 @@ inline bool is_http_message(dsn::task_code code)
{
return code == RPC_HTTP_SERVICE || code == RPC_HTTP_SERVICE_ACK;
}
+
} // namespace dsn
diff --git a/src/meta/meta_http_service.h b/src/meta/meta_http_service.h
index dbd497880..180bd07ab 100644
--- a/src/meta/meta_http_service.h
+++ b/src/meta/meta_http_service.h
@@ -52,7 +52,7 @@ struct usage_scenario_info
};
class meta_service;
-class meta_http_service : public http_service
+class meta_http_service : public http_server_base
{
public:
explicit meta_http_service(meta_service *s) : _service(s)
diff --git a/src/replica/duplication/test/replica_http_service_test.cpp b/src/replica/duplication/test/dup_replica_http_service_test.cpp
similarity index 86%
rename from src/replica/duplication/test/replica_http_service_test.cpp
rename to src/replica/duplication/test/dup_replica_http_service_test.cpp
index 2e6d1f338..ff5555456 100644
--- a/src/replica/duplication/test/replica_http_service_test.cpp
+++ b/src/replica/duplication/test/dup_replica_http_service_test.cpp
@@ -21,11 +21,11 @@
namespace dsn {
namespace replication {
-class replica_http_service_test : public duplication_test_base
+class dup_replica_http_service_test : public duplication_test_base
{
};
-TEST_F(replica_http_service_test, query_duplication_handler)
+TEST_F(dup_replica_http_service_test, query_duplication_handler)
{
auto pri = stub->add_primary_replica(1, 1);
@@ -58,11 +58,8 @@ TEST_F(replica_http_service_test, query_duplication_handler)
http_svc.query_duplication_handler(req, resp);
ASSERT_EQ(resp.status_code, http_status_code::ok);
ASSERT_EQ(
- resp.body,
- R"({)"
- R"("1583306653":)"
- R"({"1.1":{"duplicating":false,"fail_mode":"FAIL_SLOW","not_confirmed_mutations_num":100,"not_duplicated_mutations_num":50}})"
- R"(})");
+ R"({"1583306653":{"1.1":{"duplicating":false,"fail_mode":"FAIL_SLOW","not_confirmed_mutations_num":100,"not_duplicated_mutations_num":50}}})",
+ resp.body);
}
} // namespace replication
diff --git a/src/replica/replica_http_service.cpp b/src/replica/replica_http_service.cpp
index a9ff53da0..d9685ef32 100644
--- a/src/replica/replica_http_service.cpp
+++ b/src/replica/replica_http_service.cpp
@@ -21,6 +21,7 @@
#include <nlohmann/json.hpp>
#include "duplication/duplication_sync_timer.h"
+#include "http/http_server.h"
#include "utils/output_utils.h"
#include "utils/string_conv.h"
@@ -155,5 +156,7 @@ void replica_http_service::query_manual_compaction_handler(const http_request &r
resp.body = json.dump();
}
+void replica_http_service::update_config(const std::string &name) { _stub->update_config(name); }
+
} // namespace replication
} // namespace dsn
diff --git a/src/replica/replica_http_service.h b/src/replica/replica_http_service.h
index 4202b0865..dab9b6522 100644
--- a/src/replica/replica_http_service.h
+++ b/src/replica/replica_http_service.h
@@ -22,7 +22,7 @@
namespace dsn {
namespace replication {
-class replica_http_service : public http_service
+class replica_http_service : public http_server_base
{
public:
explicit replica_http_service(replica_stub *stub) : _stub(stub)
@@ -71,6 +71,10 @@ public:
}
private:
+ friend class replica_http_service_test;
+
+ void update_config(const std::string &name) override;
+
replica_stub *_stub;
};
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index cd4e64726..db750f290 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -77,6 +77,14 @@ DSN_DEFINE_uint32("replication",
"max concurrent manual emergency checkpoint running count");
DSN_TAG_VARIABLE(max_concurrent_manual_emergency_checkpointing_count, FT_MUTABLE);
+DSN_DEFINE_uint32(
+ "replication",
+ config_sync_interval_ms,
+ 30000,
+ "The interval milliseconds of replica server to syncs replica configuration with meta server");
+DSN_TAG_VARIABLE(config_sync_interval_ms, FT_MUTABLE);
+DSN_DEFINE_validator(config_sync_interval_ms, [](uint32_t value) -> bool { return value > 0; });
+
bool replica_stub::s_not_exit_on_log_failure = false;
replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
@@ -787,9 +795,9 @@ void replica_stub::initialize_start()
zauto_lock l(_state_lock);
this->query_configuration_by_node();
},
- std::chrono::milliseconds(_options.config_sync_interval_ms),
+ std::chrono::milliseconds(FLAGS_config_sync_interval_ms),
0,
- std::chrono::milliseconds(_options.config_sync_interval_ms));
+ std::chrono::milliseconds(FLAGS_config_sync_interval_ms));
}
#ifdef DSN_ENABLE_GPERF
@@ -811,7 +819,7 @@ void replica_stub::initialize_start()
// init liveness monitor
CHECK_EQ(NS_Disconnected, _state);
- if (_options.fd_disabled == false) {
+ if (!_options.fd_disabled) {
_failure_detector = std::make_shared<dsn::dist::slave_failure_detector_with_multimaster>(
_options.meta_servers,
[this]() { this->on_meta_server_disconnected(); },
@@ -3002,5 +3010,12 @@ void replica_stub::update_disks_status()
}
}
+void replica_stub::update_config(const std::string &name)
+{
+ // The new value has been validated and FLAGS_* has been updated, it's safety to use it
+ // directly.
+ UPDATE_CONFIG(_config_sync_timer_task->update_interval, config_sync_interval_ms, name);
+}
+
} // namespace replication
} // namespace dsn
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index d21310ea4..041894edf 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -230,6 +230,8 @@ public:
// query last checkpoint info for follower in duplication process
void on_query_last_checkpoint(query_last_checkpoint_info_rpc rpc);
+ void update_config(const std::string &name);
+
private:
enum replica_node_state
{
@@ -335,6 +337,7 @@ private:
friend class open_replica_test;
friend class replica_follower;
friend class replica_follower_test;
+ friend class replica_http_service_test;
typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
@@ -363,7 +366,7 @@ private:
// temproal states
::dsn::task_ptr _config_query_task;
- ::dsn::task_ptr _config_sync_timer_task;
+ ::dsn::timer_task_ptr _config_sync_timer_task;
::dsn::task_ptr _gc_timer_task;
::dsn::task_ptr _disk_stat_timer_task;
::dsn::task_ptr _mem_release_timer_task;
diff --git a/src/replica/test/replica_http_service_test.cpp b/src/replica/test/replica_http_service_test.cpp
new file mode 100644
index 000000000..15bb165eb
--- /dev/null
+++ b/src/replica/test/replica_http_service_test.cpp
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "http/builtin_http_calls.h"
+#include "http/http_call_registry.h"
+#include "replica/replica_http_service.h"
+#include "replica/test/replica_test_base.h"
+
+using std::map;
+using std::string;
+
+namespace dsn {
+namespace replication {
+
+DSN_DECLARE_uint32(config_sync_interval_ms);
+
+class replica_http_service_test : public replica_test_base
+{
+public:
+ replica_http_service_test()
+ {
+ // Disable unnecessary works before starting stub.
+ stub->_options.fd_disabled = true;
+ stub->_options.duplication_enabled = false;
+ stub->initialize_start();
+
+ http_call_registry::instance().clear_paths();
+ _http_svc = dsn::make_unique<replica_http_service>(stub.get());
+ }
+
+ void test_update_config(const map<string, string> &configs, const string &expect_resp)
+ {
+ http_request req;
+ for (const auto &config : configs) {
+ req.query_args[config.first] = config.second;
+ }
+
+ http_response resp;
+ _http_svc->update_config_handler(req, resp);
+ ASSERT_EQ(resp.status_code, http_status_code::ok);
+ ASSERT_EQ(expect_resp, resp.body);
+ }
+
+ void test_check_config(const string &config, const string &expect_value)
+ {
+ http_request req;
+ http_response resp;
+ req.query_args["name"] = config;
+ get_config(req, resp);
+ ASSERT_EQ(resp.status_code, http_status_code::ok);
+ const string unfilled_resp =
+ R"({{"name":"config_sync_interval_ms","section":"replication","type":"FV_UINT32","tags":"flag_tag::FT_MUTABLE","description":"The interval milliseconds of replica server to syncs replica configuration with meta server","value":"{}"}})"
+ "\n";
+ ASSERT_EQ(fmt::format(unfilled_resp, expect_value), resp.body);
+ }
+
+private:
+ std::unique_ptr<replica_http_service> _http_svc;
+};
+
+TEST_F(replica_http_service_test, update_config_handler)
+{
+ // Test the default value.
+ test_check_config("config_sync_interval_ms", "30000");
+ ASSERT_EQ(30000, FLAGS_config_sync_interval_ms);
+
+ // Update config failed and value not changed.
+ test_update_config(
+ {},
+ R"({"update_status":"ERR_INVALID_PARAMETERS: there should be exactly one config to be updated once"})"
+ "\n");
+ test_check_config("config_sync_interval_ms", "30000");
+ ASSERT_EQ(30000, FLAGS_config_sync_interval_ms);
+
+ // Update config failed and value not changed.
+ test_update_config(
+ {{"config_sync_interval_ms", "10"}, {"fds_write_limit_rate", "50"}},
+ R"({"update_status":"ERR_INVALID_PARAMETERS: there should be exactly one config to be updated once"})"
+ "\n");
+ test_check_config("config_sync_interval_ms", "30000");
+ ASSERT_EQ(30000, FLAGS_config_sync_interval_ms);
+
+ // Update config failed and value not changed.
+ test_update_config({{"config_sync_interval_ms", "-1"}},
+ R"({"update_status":"ERR_INVALID_PARAMETERS: -1 is invalid"})"
+ "\n");
+ test_check_config("config_sync_interval_ms", "30000");
+ ASSERT_EQ(30000, FLAGS_config_sync_interval_ms);
+
+ // Update config success and value changed.
+ test_update_config({{"config_sync_interval_ms", "10"}},
+ R"({"update_status":"ERR_OK"})"
+ "\n");
+ test_check_config("config_sync_interval_ms", "10");
+ ASSERT_EQ(10, FLAGS_config_sync_interval_ms);
+}
+
+} // namespace replication
+} // namespace dsn
diff --git a/src/runtime/task/async_calls.h b/src/runtime/task/async_calls.h
index 244a53530..e991e3227 100644
--- a/src/runtime/task/async_calls.h
+++ b/src/runtime/task/async_calls.h
@@ -68,13 +68,13 @@ create_task(task_code code, task_tracker *tracker, task_handler &&callback, int
return t;
}
-inline task_ptr create_timer_task(task_code code,
- task_tracker *tracker,
- task_handler &&callback,
- std::chrono::milliseconds interval,
- int hash = 0)
+inline timer_task_ptr create_timer_task(task_code code,
+ task_tracker *tracker,
+ task_handler &&callback,
+ std::chrono::milliseconds interval,
+ int hash = 0)
{
- task_ptr t(new timer_task(code, std::move(callback), interval.count(), hash, nullptr));
+ timer_task_ptr t(new timer_task(code, std::move(callback), interval.count(), hash, nullptr));
t->set_tracker(tracker);
t->spec().on_task_create.execute(task::get_current_task(), t);
return t;
@@ -92,12 +92,12 @@ inline task_ptr enqueue(task_code code,
return tsk;
}
-inline task_ptr enqueue_timer(task_code evt,
- task_tracker *tracker,
- task_handler &&callback,
- std::chrono::milliseconds timer_interval,
- int hash = 0,
- std::chrono::milliseconds delay = std::chrono::milliseconds(0))
+inline timer_task_ptr enqueue_timer(task_code evt,
+ task_tracker *tracker,
+ task_handler &&callback,
+ std::chrono::milliseconds timer_interval,
+ int hash = 0,
+ std::chrono::milliseconds delay = std::chrono::milliseconds(0))
{
auto tsk = create_timer_task(evt, tracker, std::move(callback), timer_interval, hash);
tsk->set_delay(static_cast<int>(delay.count()));
diff --git a/src/runtime/task/task.cpp b/src/runtime/task/task.cpp
index 43e45138e..0ceed65d5 100644
--- a/src/runtime/task/task.cpp
+++ b/src/runtime/task/task.cpp
@@ -433,9 +433,10 @@ const std::vector<task_worker *> &get_threadpool_threads_info(threadpool_code co
}
timer_task::timer_task(
- task_code code, const task_handler &cb, int interval_milliseconds, int hash, service_node *node)
- : task(code, hash, node), _interval_milliseconds(interval_milliseconds), _cb(cb)
+ task_code code, const task_handler &cb, int interval_ms, int hash, service_node *node)
+ : task(code, hash, node), _interval_ms(interval_ms), _cb(cb)
{
+ DCHECK_GE(_interval_ms, 0);
CHECK_EQ_MSG(
TASK_TYPE_COMPUTE,
spec().type,
@@ -444,9 +445,10 @@ timer_task::timer_task(
}
timer_task::timer_task(
- task_code code, task_handler &&cb, int interval_milliseconds, int hash, service_node *node)
- : task(code, hash, node), _interval_milliseconds(interval_milliseconds), _cb(std::move(cb))
+ task_code code, task_handler &&cb, int interval_ms, int hash, service_node *node)
+ : task(code, hash, node), _interval_ms(interval_ms), _cb(std::move(cb))
{
+ DCHECK_GE(_interval_ms, 0);
CHECK_EQ_MSG(
TASK_TYPE_COMPUTE,
spec().type,
@@ -458,7 +460,7 @@ void timer_task::enqueue()
{
// enable timer randomization to avoid lots of timers execution simultaneously
if (delay_milliseconds() == 0 && spec().randomize_timer_delay_if_zero) {
- set_delay(rand::next_u32(0, _interval_milliseconds));
+ set_delay(rand::next_u32(0, _interval_ms));
}
return task::enqueue();
@@ -470,12 +472,19 @@ void timer_task::exec()
_cb();
}
// valid interval, we reset task state to READY
- if (dsn_likely(_interval_milliseconds > 0)) {
+ if (dsn_likely(_interval_ms > 0)) {
CHECK(set_retry(true),
"timer task set retry failed, with state = {}",
enum_to_string(state()));
- set_delay(_interval_milliseconds);
+ set_delay(_interval_ms);
}
}
+void timer_task::update_interval(int interval_ms)
+{
+ // Not allowed to set to 0 for periodical task.
+ CHECK_GE(interval_ms, 0);
+ _interval_ms = interval_ms;
+}
+
} // namespace dsn
diff --git a/src/runtime/task/task.h b/src/runtime/task/task.h
index 44260dd6f..4cea16417 100644
--- a/src/runtime/task/task.h
+++ b/src/runtime/task/task.h
@@ -359,15 +359,18 @@ public:
void exec() override;
void enqueue() override;
+ void update_interval(int interval_ms);
+
protected:
void clear_non_trivial_on_task_end() override { _cb = nullptr; }
private:
- // ATTENTION: if _interval_milliseconds <= 0, then timer task will just be executed once;
- // otherwise, timer task will be executed periodically(period = _interval_milliseconds)
- int _interval_milliseconds;
+ // ATTENTION: if _interval_ms == 0, then timer task will just be executed once;
+ // otherwise, timer task will be executed periodically(period = _interval_ms)
+ int _interval_ms;
task_handler _cb;
};
+typedef dsn::ref_ptr<dsn::timer_task> timer_task_ptr;
template <typename First, typename... Remaining>
class future_task : public task
diff --git a/src/server/info_collector_app.cpp b/src/server/info_collector_app.cpp
index 87683b516..e2f2a113d 100644
--- a/src/server/info_collector_app.cpp
+++ b/src/server/info_collector_app.cpp
@@ -44,9 +44,14 @@
namespace pegasus {
namespace server {
+class collector_http_service : public ::dsn::http_server_base
+{
+};
+
info_collector_app::info_collector_app(const dsn::service_app_info *info)
: service_app(info), _updater_started(false)
{
+ register_http_service(new collector_http_service());
dsn::start_http_server();
}
diff --git a/src/utils/flags.h b/src/utils/flags.h
index 1d77e998e..91f0f8815 100644
--- a/src/utils/flags.h
+++ b/src/utils/flags.h
@@ -121,6 +121,11 @@ struct hash<flag_tag>
COMPILE_ASSERT(sizeof(decltype(FLAGS_##name)), exist_##name##_##tag); \
static dsn::flag_tagger FLAGS_TAGGER_##name##_##tag(#name, flag_tag::tag)
+#define UPDATE_CONFIG(fn, flag, name) \
+ if (name == #flag) { \
+ fn(FLAGS_##flag); \
+ }
+
namespace dsn {
// An utility class that registers a flag upon initialization.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org