You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/04/12 21:57:15 UTC
[incubator-heron] branch master updated: Integrate runtime config
and rate limit (#2846)
This is an automated email from the ASF dual-hosted git repository.
huijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 1fe99ed Integrate runtime config and rate limit (#2846)
1fe99ed is described below
commit 1fe99edbe926b1a620ad32f5c5ec30eaa8cd8f7d
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Thu Apr 12 14:57:12 2018 -0700
Integrate runtime config and rate limit (#2846)
* Integrate runtime config and rate limit
* more clean up
* Fix integration test by avoiding changing hydrated topology in stmgr
---
.../src/cpp/config/topology-config-helper.cpp | 43 ++++++++++++++---
.../common/src/cpp/config/topology-config-helper.h | 15 ++++++
.../cpp/config/topology-config-helper_unittest.cpp | 42 +++++++++++++++-
heron/stmgr/src/cpp/manager/instance-server.cpp | 24 ++++++----
heron/stmgr/src/cpp/manager/stmgr.cpp | 42 ++++++++++++++--
heron/stmgr/src/cpp/manager/stmgr.h | 6 +++
heron/stmgr/tests/cpp/server/stmgr_unittest.cpp | 56 ++++++++++++++++++++++
heron/tmaster/src/cpp/manager/tcontroller.cpp | 4 +-
heron/tmaster/src/cpp/manager/tmaster.cpp | 23 +++++----
heron/tmaster/src/cpp/manager/tmaster.h | 10 ++--
.../tmaster/tests/cpp/server/tmaster_unittest.cpp | 3 +-
11 files changed, 228 insertions(+), 40 deletions(-)
diff --git a/heron/common/src/cpp/config/topology-config-helper.cpp b/heron/common/src/cpp/config/topology-config-helper.cpp
index e84a7aa..3200330 100644
--- a/heron/common/src/cpp/config/topology-config-helper.cpp
+++ b/heron/common/src/cpp/config/topology-config-helper.cpp
@@ -29,6 +29,9 @@
namespace heron {
namespace config {
+static const char TOPOLOGY_CONFIG_KEY[] = "_topology_";
+static const char RUNTIME_CONFIG_POSTFIX[] = ":runtime";
+
TopologyConfigVars::TopologyReliabilityMode StringToReliabilityMode(const std::string& _mode) {
if (_mode == "ATMOST_ONCE") {
return TopologyConfigVars::TopologyReliabilityMode::ATMOST_ONCE;
@@ -411,6 +414,20 @@ bool TopologyConfigHelper::DropTuplesUponBackpressure(const proto::api::Topology
TopologyConfigVars::TOPOLOGY_DROPTUPLES_UPON_BACKPRESSURE, false);
}
+std::string TopologyConfigHelper::GetRuntimeConfigKey(const std::string& _key) {
+ return _key + RUNTIME_CONFIG_POSTFIX;
+}
+
+// Convert configs in map to runtime configs (append runtime postfix)
+void TopologyConfigHelper::ConvertToRuntimeConfigs(
+ const std::map<std::string, std::string>& _origin,
+ std::map<std::string, std::string>& _retval) {
+ std::map<std::string, std::string>::const_iterator it;
+ for (it = _origin.begin(); it != _origin.end(); ++it) {
+ _retval[GetRuntimeConfigKey(it->first)] = it->second;
+ }
+}
+
// Return topology level config
void TopologyConfigHelper::GetTopologyConfig(const proto::api::Topology& _topology,
std::map<std::string, std::string>& retval) {
@@ -514,12 +531,9 @@ bool TopologyConfigHelper::GetBooleanConfigValue(const proto::api::Topology& _to
const std::string& _config_name,
bool _default_value) {
static const std::string value_true_ = "true";
- const proto::api::Config& cfg = _topology.topology_config();
- const std::string value = GetConfigValue(cfg, _config_name, "");
- if (!value.empty()) {
- return value_true_.compare(value.c_str()) == 0;
- }
- return _default_value;
+ const std::string value = GetTopologyConfigValue(_topology, _config_name, "");
+
+ return value_true_.compare(value.c_str()) == 0;
}
// Convert topology config to a key value map
@@ -530,6 +544,14 @@ void TopologyConfigHelper::ConvertConfigToKVMap(const proto::api::Config& _confi
}
}
+const std::string TopologyConfigHelper::GetTopologyConfigValue(
+ const proto::api::Topology& _topology,
+ const std::string& _key,
+ const std::string& _default) {
+ const proto::api::Config& cfg = _topology.topology_config();
+ return GetConfigValue(cfg, _key, _default);
+}
+
const std::string TopologyConfigHelper::GetComponentConfigValue(
const proto::api::Topology& _topology,
const std::string& _component,
@@ -552,13 +574,20 @@ const std::string TopologyConfigHelper::GetComponentConfigValue(
sp_int64 TopologyConfigHelper::GetComponentOutputBPS(const proto::api::Topology& _topology,
const std::string& _component) {
- const std::string value = GetComponentConfigValue(_topology, _component,
+ const std::string init_value = GetComponentConfigValue(_topology, _component,
TopologyConfigVars::TOPOLOGY_COMPONENT_OUTPUT_BPS, "");
+ const std::string value = GetComponentConfigValue(_topology, _component,
+ GetRuntimeConfigKey(TopologyConfigVars::TOPOLOGY_COMPONENT_OUTPUT_BPS), init_value);
if (!value.empty()) {
return atol(value.c_str());
}
+
return -1; // default to -1 (no rate limit)
}
+
+const char* TopologyConfigHelper::GetReservedTopologyConfigKey() {
+ return TOPOLOGY_CONFIG_KEY;
+}
} // namespace config
} // namespace heron
diff --git a/heron/common/src/cpp/config/topology-config-helper.h b/heron/common/src/cpp/config/topology-config-helper.h
index a8dd41b..b4b8651 100644
--- a/heron/common/src/cpp/config/topology-config-helper.h
+++ b/heron/common/src/cpp/config/topology-config-helper.h
@@ -136,6 +136,13 @@ class TopologyConfigHelper {
// Do we want to drop tuples upon backpressure detection
static bool DropTuplesUponBackpressure(const proto::api::Topology& _topology);
+ // Get runtime config key
+ static std::string GetRuntimeConfigKey(const std::string& key);
+
+ // Convert configs in map to runtime configs (append runtime postfix)
+ static void ConvertToRuntimeConfigs(const std::map<std::string, std::string>& _origin,
+ std::map<std::string, std::string>& _retval);
+
// Return topology level config
static void GetTopologyConfig(const proto::api::Topology& _topology,
std::map<std::string, std::string>& retval);
@@ -155,6 +162,11 @@ class TopologyConfigHelper {
const std::string& _component_name,
const std::map<std::string, std::string>& config);
+ // Get the topology config value given the config key
+ static const std::string GetTopologyConfigValue(const proto::api::Topology& _topology,
+ const std::string& _key,
+ const std::string& _default);
+
// Get the config value given component name and config key
static const std::string GetComponentConfigValue(const proto::api::Topology& _topology,
const std::string& _component,
@@ -170,6 +182,9 @@ class TopologyConfigHelper {
static sp_int64 GetComponentOutputBPS(const proto::api::Topology& _topology,
const std::string& _component);
+ // Get reserved topology config key.
+ static const char* GetReservedTopologyConfigKey();
+
private:
static bool GetBooleanConfigValue(const proto::api::Topology& _topology,
const std::string& _config_name,
diff --git a/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp b/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp
index f99a3d5..698c772 100644
--- a/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp
+++ b/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp
@@ -134,13 +134,23 @@ TEST(TopologyConfigHelper, GetAndSetTopologyConfig) {
"test_topology", "123", 3, NUM_SPOUT_INSTANCES, 3, NUM_BOLT_INSTANCES,
heron::proto::api::SHUFFLE);
- // Test init config
+ // Test initial config
std::map<std::string, std::string> old_config;
heron::config::TopologyConfigHelper::GetTopologyConfig(*test_topology, old_config);
EXPECT_EQ(old_config[heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS],
MESSAGE_TIMEOUT);
EXPECT_EQ(old_config[TOPOLOGY_USER_CONFIG], TOPOLOGY_USER_CONFIG_VALUE);
+ // Test GetComponentConfigValue function
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+ *test_topology, TOPOLOGY_USER_CONFIG, ""),
+ TOPOLOGY_USER_CONFIG_VALUE);
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+ *test_topology, TOPOLOGY_USER_CONFIG + ".bad", ""),
+ "");
+
// Set and then test updated config
std::string runtime_user_config_key = TOPOLOGY_USER_CONFIG + ":runtime";
std::map<std::string, std::string> update;
@@ -171,7 +181,7 @@ TEST(TopologyConfigHelper, GetAndSetComponentConfig) {
std::string non_test_spout = "test_spout2";
std::string test_bolt = "test_bolt2";
std::string non_test_bolt = "test_bolt1";
- // Test init config
+ // Test initial config
std::map<std::string, std::string> old_config;
heron::config::TopologyConfigHelper::GetComponentConfig(*test_topology, test_spout, old_config);
EXPECT_EQ(old_config[heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM],
@@ -183,6 +193,16 @@ TEST(TopologyConfigHelper, GetAndSetComponentConfig) {
std::to_string(NUM_BOLT_INSTANCES));
EXPECT_EQ(old_config[BOLT_USER_CONFIG], BOLT_USER_CONFIG_VALUE);
+ // Test GetComponentConfigValue function
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetComponentConfigValue(
+ *test_topology, test_spout, SPOUT_USER_CONFIG, ""),
+ SPOUT_USER_CONFIG_VALUE);
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetComponentConfigValue(
+ *test_topology, test_spout, SPOUT_USER_CONFIG + ".bad", ""),
+ "");
+
// Set user configs to new values
std::string runtime_spout_user_config_key = SPOUT_USER_CONFIG + ":runtime";
std::string runtime_bolt_user_config_key = BOLT_USER_CONFIG + ":runtime";
@@ -235,6 +255,24 @@ TEST(TopologyConfigHelper, GetAndSetComponentConfig) {
EXPECT_EQ(updated_config[runtime_bolt_user_config_key], NEW_BOLT_USER_CONFIG_VALUE_2);
}
+TEST(TopologyConfigHelper, GetRuntimeConfigKey) {
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetRuntimeConfigKey("conf.test1"),
+ "conf.test1:runtime");
+}
+
+TEST(TopologyConfigHelper, ConvertToRuntimeConfigs) {
+ std::map<std::string, std::string> original_config;
+ original_config["conf.test1"] = "a";
+ original_config["conf.test2"] = "b";
+
+ std::map<std::string, std::string> runtime_config;
+ heron::config::TopologyConfigHelper::ConvertToRuntimeConfigs(original_config, runtime_config);
+
+ EXPECT_EQ(runtime_config["conf.test1:runtime"], "a");
+ EXPECT_EQ(runtime_config["conf.test2:runtime"], "b");
+}
+
int main(int argc, char **argv) {
heron::common::Initialize(argv[0]);
testing::InitGoogleTest(&argc, argv);
diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp b/heron/stmgr/src/cpp/manager/instance-server.cpp
index 7288749..30e787e 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.cpp
+++ b/heron/stmgr/src/cpp/manager/instance-server.cpp
@@ -15,9 +15,11 @@
*/
#include "manager/instance-server.h"
+
#include <iostream>
-#include <unordered_set>
+#include <map>
#include <string>
+#include <unordered_set>
#include <vector>
#include "manager/checkpoint-gateway.h"
#include "util/neighbour-calculator.h"
@@ -422,6 +424,9 @@ void InstanceServer::DrainCheckpoint(sp_int32 _task_id,
void InstanceServer::BroadcastNewPhysicalPlan(const proto::system::PhysicalPlan& _pplan) {
// TODO(vikasr) We do not handle any changes to our local assignment
+ LOG(INFO) << "Broadcasting new PhysicalPlan:";
+ config::TopologyConfigHelper::LogTopology(_pplan.topology());
+
ComputeLocalSpouts(_pplan);
proto::stmgr::NewInstanceAssignmentMessage new_assignment;
new_assignment.mutable_pplan()->CopyFrom(_pplan);
@@ -439,17 +444,18 @@ void InstanceServer::BroadcastNewPhysicalPlan(const proto::system::PhysicalPlan&
void InstanceServer::SetRateLimit(const proto::system::PhysicalPlan& _pplan,
const std::string& _component,
Connection* _conn) const {
- sp_int64 read_bsp =
+ sp_int64 read_bps =
config::TopologyConfigHelper::GetComponentOutputBPS(_pplan.topology(), _component);
sp_int32 parallelism =
config::TopologyConfigHelper::GetComponentParallelism(_pplan.topology(), _component);
- sp_int64 burst_read_bsp = read_bsp + read_bsp / 2;
-
- // There should be parallelism hint and the per instance rate limit should be at least
- // one byte per second
- if (parallelism > 0 && read_bsp > parallelism && burst_read_bsp > parallelism) {
- LOG(INFO) << "Set rate limit in " << _component << " to " << read_bsp << "/" << burst_read_bsp;
- _conn->setRateLimit(read_bsp / parallelism, burst_read_bsp / parallelism);
+ // burst rate is 1.5 x of regular rate
+ sp_int64 burst_read_bps = read_bps + read_bps / 2;
+
+ LOG(INFO) << "Parallelism of component " << _component << " is " << parallelism;
+ LOG(INFO) << "Read BPS of component " << _component << " is " << read_bps;
+ if (parallelism > 0 && read_bps >= 0 && burst_read_bps >= 0) {
+ LOG(INFO) << "Set rate limit in " << _component << " to " << read_bps << "/" << burst_read_bps;
+ _conn->setRateLimit(read_bps / parallelism, burst_read_bps / parallelism);
} else {
LOG(INFO) << "Disable rate limit in " << _component;
_conn->disableRateLimit();
diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp
index 3fd805b..738fa03 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr.cpp
@@ -538,6 +538,7 @@ void StMgr::StartTMasterClient() {
void StMgr::NewPhysicalPlan(proto::system::PhysicalPlan* _pplan) {
LOG(INFO) << "Received a new physical plan from tmaster";
+ heron::config::TopologyConfigHelper::LogTopology(_pplan->topology());
// first make sure that we are part of the plan ;)
bool found = false;
for (sp_int32 i = 0; i < _pplan->stmgrs_size(); ++i) {
@@ -561,10 +562,10 @@ void StMgr::NewPhysicalPlan(proto::system::PhysicalPlan* _pplan) {
LOG(INFO) << "Topology state changed from " << pplan_->topology().state() << " to "
<< _pplan->topology().state();
}
- proto::api::TopologyState st = _pplan->topology().state();
- _pplan->clear_topology();
- _pplan->mutable_topology()->CopyFrom(*hydrated_topology_);
- _pplan->mutable_topology()->set_state(st);
+
+ PatchPhysicalPlanWithHydratedTopology(_pplan, hydrated_topology_);
+ LOG(INFO) << "Patched with hydrated topology";
+ heron::config::TopologyConfigHelper::LogTopology(_pplan->topology());
// TODO(vikasr) Currently we dont check if our role has changed
@@ -1115,5 +1116,38 @@ void StMgr::HandleStatefulRestoreDone(proto::system::StatusCode _status,
std::string _checkpoint_id, sp_int64 _restore_txid) {
tmaster_client_->SendRestoreTopologyStateResponse(_status, _checkpoint_id, _restore_txid);
}
+
+// Patch new physical plan with internal hydrated topology but keep new topology data:
+// - new topology state
+// - new topology/component config
+void StMgr::PatchPhysicalPlanWithHydratedTopology(proto::system::PhysicalPlan* _pplan,
+ proto::api::Topology* _topology) {
+ // Back up new topology data (state and configs)
+ proto::api::TopologyState st = _pplan->topology().state();
+
+ std::map<std::string, std::string> topology_config;
+ config::TopologyConfigHelper::GetTopologyConfig(_pplan->topology(), topology_config);
+
+ std::unordered_set<std::string> components;
+ std::map<std::string, std::map<std::string, std::string>> component_config;
+ config::TopologyConfigHelper::GetAllComponentNames(_pplan->topology(), components);
+ for (auto iter = components.begin(); iter != components.end(); ++iter) {
+ std::map<std::string, std::string> config;
+ config::TopologyConfigHelper::GetComponentConfig(_pplan->topology(), *iter, topology_config);
+ component_config[*iter] = config;
+ }
+
+ // Copy hydrated topology into pplan
+ _pplan->clear_topology();
+ _pplan->mutable_topology()->CopyFrom(*_topology);
+
+ // Restore new topology data
+ _pplan->mutable_topology()->set_state(st);
+ config::TopologyConfigHelper::SetTopologyConfig(_pplan->mutable_topology(), topology_config);
+ for (auto iter = components.begin(); iter != components.end(); ++iter) {
+ config::TopologyConfigHelper::SetComponentConfig(_pplan->mutable_topology(), *iter,
+ component_config[*iter]);
+ }
+}
} // namespace stmgr
} // namespace heron
diff --git a/heron/stmgr/src/cpp/manager/stmgr.h b/heron/stmgr/src/cpp/manager/stmgr.h
index cd0d0da..48e30be 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr.h
@@ -187,6 +187,12 @@ class StMgr {
void HandleStatefulRestoreDone(proto::system::StatusCode _status,
std::string _checkpoint_id, sp_int64 _restore_txid);
+ // Patch new physical plan with internal hydrated topology but keep new topology data:
+ // - new topology state
+ // - new topology/component config
+ static void PatchPhysicalPlanWithHydratedTopology(proto::system::PhysicalPlan* _pplan,
+ proto::api::Topology* _topology);
+
heron::common::HeronStateMgr* state_mgr_;
proto::system::PhysicalPlan* pplan_;
sp_string topology_name_;
diff --git a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
index 8670304..41363d9 100644
--- a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
+++ b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
@@ -1867,6 +1867,62 @@ TEST(StMgr, test_metricsmgr_reconnect) {
TearCommonResources(common);
}
+// Test PatchPhysicalPlanWithHydratedTopology function
+TEST(StMgr, test_PatchPhysicalPlanWithHydratedTopology) {
+ int32_t nSpouts = 2;
+ int32_t nSpoutInstances = 1;
+ int32_t nBolts = 3;
+ int32_t nBoltInstances = 1;
+ heron::proto::api::Topology* topology =
+ GenerateDummyTopology("topology_name",
+ "topology_id",
+ nSpouts, nSpoutInstances, nBolts, nBoltInstances,
+ heron::proto::api::SHUFFLE);
+
+ heron::proto::system::PhysicalPlan* pplan = new heron::proto::system::PhysicalPlan();
+ pplan->mutable_topology()->CopyFrom(*topology);
+
+ // Verify initial values
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+ *topology,
+ heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS,
+ ""),
+ "30");
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+ pplan->topology(),
+ heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS,
+ ""),
+ "30");
+ // Change runtime data in PhysicalPlan and patch it
+ std::map<std::string, std::string> update;
+ update["conf.new"] = "test";
+ update[heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS] = "10";
+ heron::config::TopologyConfigHelper::SetTopologyConfig(pplan->mutable_topology(), update);
+
+ // Verify updated runtime data is still in the patched physical plan
+ // The topology in the physical plan should have the old name
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+ *topology,
+ heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS,
+ ""),
+ "30"); // The internal topology object should still have the initial value
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+ pplan->topology(),
+ heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS,
+ ""),
+ "10"); // The topology object in the physical plan should have the new value
+ EXPECT_EQ(
+ heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+ pplan->topology(), "conf.new", ""),
+ "test"); // The topology object in the physical plan should have the new config
+
+ delete pplan;
+}
+
int main(int argc, char** argv) {
heron::common::Initialize(argv[0]);
std::cout << "Current working directory (to find stmgr logs) "
diff --git a/heron/tmaster/src/cpp/manager/tcontroller.cpp b/heron/tmaster/src/cpp/manager/tcontroller.cpp
index a13784e..d5f933c 100644
--- a/heron/tmaster/src/cpp/manager/tcontroller.cpp
+++ b/heron/tmaster/src/cpp/manager/tcontroller.cpp
@@ -23,6 +23,7 @@
#include <vector>
#include "basics/basics.h"
#include "basics/strutils.h"
+#include "config/topology-config-helper.h"
#include "errors/errors.h"
#include "manager/tmaster.h"
#include "network/network.h"
@@ -298,7 +299,8 @@ bool TController::ParseRuntimeConfig(const std::vector<std::string>& paramters,
std::vector<std::string> segments = StrUtils::split(*iter, ":");
if (segments.size() == 2) {
// Topology level config
- retval[TOPOLOGY_CONFIG_KEY][segments[0]] = segments[1];
+ const char* topology_key = config::TopologyConfigHelper::GetReservedTopologyConfigKey();
+ retval[topology_key][segments[0]] = segments[1];
} else if (segments.size() == 3) {
// Component level config
retval[segments[0]][segments[1]] = segments[2];
diff --git a/heron/tmaster/src/cpp/manager/tmaster.cpp b/heron/tmaster/src/cpp/manager/tmaster.cpp
index ad2affe..c27ccfe 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.cpp
+++ b/heron/tmaster/src/cpp/manager/tmaster.cpp
@@ -591,6 +591,9 @@ bool TMaster::UpdateRuntimeConfig(const ComponentConfigMap& _config,
VCallback<proto::system::StatusCode> cb) {
DCHECK(current_pplan_->topology().IsInitialized());
+ LOG(INFO) << "Update runtime config: ";
+ LogConfig(_config);
+
// Parse and set the new configs
proto::system::PhysicalPlan* new_pplan = new proto::system::PhysicalPlan();
new_pplan->CopyFrom(*current_pplan_);
@@ -631,11 +634,12 @@ bool TMaster::UpdateRuntimeConfigInTopology(proto::api::Topology* _topology,
DCHECK(_topology->IsInitialized());
ComponentConfigMap::const_iterator iter;
+ const char* topology_key = config::TopologyConfigHelper::GetReservedTopologyConfigKey();
for (iter = _config.begin(); iter != _config.end(); ++iter) {
// Get config for topology or component.
std::map<std::string, std::string> runtime_config;
- AppendPostfix(iter->second, RUNTIME_CONFIG_POSTFIX, runtime_config);
- if (iter->first == TOPOLOGY_CONFIG_KEY) {
+ config::TopologyConfigHelper::ConvertToRuntimeConfigs(iter->second, runtime_config);
+ if (iter->first == topology_key) {
config::TopologyConfigHelper::SetTopologyConfig(_topology, runtime_config);
} else {
config::TopologyConfigHelper::SetComponentConfig(_topology, iter->first, runtime_config);
@@ -1022,8 +1026,9 @@ bool TMaster::ValidateRuntimeConfigNames(const ComponentConfigMap& _config) cons
config::TopologyConfigHelper::GetAllComponentNames(topology, components);
ComponentConfigMap::const_iterator iter;
+ const char* topology_key = config::TopologyConfigHelper::GetReservedTopologyConfigKey();
for (iter = _config.begin(); iter != _config.end(); ++iter) {
- if (iter->first != TOPOLOGY_CONFIG_KEY) {
+ if (iter->first != topology_key) {
// It is a component, search for it
if (components.find(iter->first) == components.end()) {
return false;
@@ -1034,12 +1039,12 @@ bool TMaster::ValidateRuntimeConfigNames(const ComponentConfigMap& _config) cons
return true;
}
-void TMaster::AppendPostfix(const ConfigValueMap& _origin,
- const std::string& post_fix,
- ConfigValueMap& _retval) {
- ConfigValueMap::const_iterator it;
- for (it = _origin.begin(); it != _origin.end(); ++it) {
- _retval[it->first + post_fix] = it->second;
+void TMaster::LogConfig(const ComponentConfigMap& _config) {
+ for (auto iter = _config.begin(); iter != _config.end(); ++iter) {
+ LOG(INFO) << iter->first << " =>";
+ for (auto i = iter->second.begin(); i != iter->second.end(); ++i) {
+ LOG(INFO) << i->first << " : " << i->second;
+ }
}
}
diff --git a/heron/tmaster/src/cpp/manager/tmaster.h b/heron/tmaster/src/cpp/manager/tmaster.h
index ca2b0d4..defe8fd 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.h
+++ b/heron/tmaster/src/cpp/manager/tmaster.h
@@ -47,9 +47,6 @@ typedef std::map<std::string, std::string> ConfigValueMap;
// From component name to config/value pairs
typedef std::map<std::string, std::map<std::string, std::string>> ComponentConfigMap;
-const sp_string TOPOLOGY_CONFIG_KEY = "_topology_";
-const sp_string RUNTIME_CONFIG_POSTFIX = ":runtime";
-
class TMaster {
public:
TMaster(const std::string& _zk_hostport, const std::string& _topology_name,
@@ -124,6 +121,9 @@ class TMaster {
// Function to be called that calls MakePhysicalPlan and sends it to all stmgrs
void DoPhysicalPlan(EventLoop::Status _code);
+ // Log config object
+ void LogConfig(const ComponentConfigMap& _config);
+
// Big brother function that does the assignment to the workers
// If _new_stmgr is null, this means that there was a plan
// existing, but a _new_stmgr joined us. So redo his part
@@ -192,10 +192,6 @@ class TMaster {
sp_int32 port,
const std::string& stmgr_id);
- void AppendPostfix(const ConfigValueMap& _origin,
- const std::string& post_fix,
- ConfigValueMap& _update);
-
// map of active stmgr id to stmgr state
StMgrMap stmgrs_;
diff --git a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
index 6db68df..0f37dc8 100644
--- a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
+++ b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
@@ -735,7 +735,8 @@ TEST(StMgr, test_runtime_config) {
std::map<std::string, std::string> validate_good_config;
validate_good_config[topology_runtime_config_1] = "1";
validate_good_config[topology_runtime_config_2] = "2";
- validate_good_config_map[heron::tmaster::TOPOLOGY_CONFIG_KEY] = validate_good_config;
+ const char* topology_key = heron::config::TopologyConfigHelper::GetReservedTopologyConfigKey();
+ validate_good_config_map[topology_key] = validate_good_config;
validate_good_config_map["spout1"] = validate_good_config;
EXPECT_EQ(common.tmaster_->ValidateRuntimeConfig(validate_good_config_map), true);
--
To stop receiving notification emails like this one, please contact
huijun@apache.org.