You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/04/20 22:47:52 UTC
[incubator-heron] branch master updated: Fix kryo factor config
(#2868)
This is an automated email from the ASF dual-hosted git repository.
huijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new f5a3193 Fix kryo factor config (#2868)
f5a3193 is described below
commit f5a3193a547dd8a22e45aaf6fdbea66740679d32
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Fri Apr 20 15:47:49 2018 -0700
Fix kryo factor config (#2868)
* Fix runtime config patching in stmgr.
The old logic store and restore all the configs. However it doesn't work
for non-string configs because they have empty value field and the real
data is in the serialized_value field which is not covered.
* fix unit tests
---
.../src/cpp/config/topology-config-helper.cpp | 64 +++++++++------
.../common/src/cpp/config/topology-config-helper.h | 28 +++----
.../cpp/config/topology-config-helper_unittest.cpp | 92 +++++++++-------------
heron/stmgr/src/cpp/manager/stmgr.cpp | 9 ++-
heron/stmgr/tests/cpp/server/stmgr_unittest.cpp | 2 +-
heron/tmaster/src/cpp/manager/tmaster.cpp | 8 +-
.../tmaster/tests/cpp/server/tmaster_unittest.cpp | 89 +++++++--------------
7 files changed, 130 insertions(+), 162 deletions(-)
diff --git a/heron/common/src/cpp/config/topology-config-helper.cpp b/heron/common/src/cpp/config/topology-config-helper.cpp
index 3200330..caa2882 100644
--- a/heron/common/src/cpp/config/topology-config-helper.cpp
+++ b/heron/common/src/cpp/config/topology-config-helper.cpp
@@ -428,34 +428,41 @@ void TopologyConfigHelper::ConvertToRuntimeConfigs(
}
}
-// Return topology level config
-void TopologyConfigHelper::GetTopologyConfig(const proto::api::Topology& _topology,
- std::map<std::string, std::string>& retval) {
+
+// Return topology level runtime config
+// Note that all runtime configs are pure string so there is no need to worry about serialized_value
+void TopologyConfigHelper::GetTopologyRuntimeConfig(
+ const proto::api::Topology& _topology,
+ std::map<std::string, std::string>& retval) {
if (_topology.has_topology_config()) {
const proto::api::Config& config = _topology.topology_config();
- ConvertConfigToKVMap(config, retval);
+ ConvertRuntimeConfigToKVMap(config, retval);
}
}
-// Update topology level config
-void TopologyConfigHelper::SetTopologyConfig(proto::api::Topology* _topology,
- const std::map<std::string, std::string>& _update) {
+// Update topology level runtime config
+// Note that all runtime configs are pure string so there is no need to worry about serialized_value
+void TopologyConfigHelper::SetTopologyRuntimeConfig(
+ proto::api::Topology* _topology,
+ const std::map<std::string, std::string>& _update) {
if (_topology->has_topology_config()) {
proto::api::Config* config = _topology->mutable_topology_config();
- UpdateConfigFromKVMap(config, _update);
+ UpdateRuntimeConfigFromKVMap(config, _update);
}
}
-// Return component level config
-void TopologyConfigHelper::GetComponentConfig(const proto::api::Topology& _topology,
- const std::string& _component_name,
- std::map<std::string, std::string>& retval) {
+// Return component level runtime config
+// Note that all runtime configs are pure string so there is no need to worry about serialized_value
+void TopologyConfigHelper::GetComponentRuntimeConfig(
+ const proto::api::Topology& _topology,
+ const std::string& _component_name,
+ std::map<std::string, std::string>& retval) {
// We are assuming component names are unique and returning the config
// of the first spout or bolt found with the name.
for (sp_int32 i = 0; i < _topology.spouts_size(); ++i) {
if (_topology.spouts(i).comp().name() == _component_name) {
const proto::api::Config& config = _topology.spouts(i).comp().config();
- ConvertConfigToKVMap(config, retval);
+ ConvertRuntimeConfigToKVMap(config, retval);
return;
}
}
@@ -463,23 +470,25 @@ void TopologyConfigHelper::GetComponentConfig(const proto::api::Topology& _topol
for (sp_int32 i = 0; i < _topology.bolts_size(); ++i) {
if (_topology.bolts(i).comp().name() == _component_name) {
const proto::api::Config& config = _topology.bolts(i).comp().config();
- ConvertConfigToKVMap(config, retval);
+ ConvertRuntimeConfigToKVMap(config, retval);
return;
}
}
}
-// Update component level config
-void TopologyConfigHelper::SetComponentConfig(proto::api::Topology* _topology,
- const std::string& _component_name,
- const std::map<std::string, std::string>& _update) {
+// Update component level runtime config
+// Note that all runtime configs are pure string so there is no need to worry about serialized_value
+void TopologyConfigHelper::SetComponentRuntimeConfig(
+ proto::api::Topology* _topology,
+ const std::string& _component_name,
+ const std::map<std::string, std::string>& _update) {
// We are assuming component names are unique and updating config for all instances
// with the specific component name.
for (sp_int32 i = 0; i < _topology->spouts_size(); ++i) {
proto::api::Component* comp = _topology->mutable_spouts(i)->mutable_comp();
if (comp->name() == _component_name) {
proto::api::Config* config = comp->mutable_config();
- UpdateConfigFromKVMap(config, _update);
+ UpdateRuntimeConfigFromKVMap(config, _update);
}
}
@@ -487,13 +496,13 @@ void TopologyConfigHelper::SetComponentConfig(proto::api::Topology* _topology,
proto::api::Component* comp = _topology->mutable_bolts(i)->mutable_comp();
if (comp->name() == _component_name) {
proto::api::Config* config = comp->mutable_config();
- UpdateConfigFromKVMap(config, _update);
+ UpdateRuntimeConfigFromKVMap(config, _update);
}
}
}
// For every existing config, update the value; for every non-existing config, add it.
-void TopologyConfigHelper::UpdateConfigFromKVMap(proto::api::Config* _config,
+void TopologyConfigHelper::UpdateRuntimeConfigFromKVMap(proto::api::Config* _config,
const std::map<std::string, std::string>& _kv_map) {
std::set<std::string> updated;
for (sp_int32 i = 0; i < _config->kvs_size(); ++i) {
@@ -537,10 +546,17 @@ bool TopologyConfigHelper::GetBooleanConfigValue(const proto::api::Topology& _to
}
// Convert topology config to a key value map
-void TopologyConfigHelper::ConvertConfigToKVMap(const proto::api::Config& _config,
- std::map<std::string, std::string>& retval) {
+void TopologyConfigHelper::ConvertRuntimeConfigToKVMap(
+ const proto::api::Config& _config,
+ std::map<std::string, std::string>& retval) {
for (sp_int32 i = 0; i < _config.kvs_size(); ++i) {
- retval[_config.kvs(i).key()] = _config.kvs(i).value();
+ const std::string& key = _config.kvs(i).key();
+ // Assuming RUNTIME_CONFIG_POSTFIX is unique and doesn't happen in normal cases. When it is
+ // found in a config key, the config is considered a runtime config and extracted into the
+ // result
+ if (key.find(RUNTIME_CONFIG_POSTFIX) != std::string::npos) {
+ retval[_config.kvs(i).key()] = _config.kvs(i).value();
+ }
}
}
diff --git a/heron/common/src/cpp/config/topology-config-helper.h b/heron/common/src/cpp/config/topology-config-helper.h
index b4b8651..c2c5fe3 100644
--- a/heron/common/src/cpp/config/topology-config-helper.h
+++ b/heron/common/src/cpp/config/topology-config-helper.h
@@ -144,23 +144,23 @@ class TopologyConfigHelper {
std::map<std::string, std::string>& _retval);
// Return topology level config
- static void GetTopologyConfig(const proto::api::Topology& _topology,
- std::map<std::string, std::string>& retval);
+ static void GetTopologyRuntimeConfig(const proto::api::Topology& _topology,
+ std::map<std::string, std::string>& retval);
// Update topology level config
- static void SetTopologyConfig(proto::api::Topology* _topology,
- const std::map<std::string, std::string>& retval);
+ static void SetTopologyRuntimeConfig(proto::api::Topology* _topology,
+ const std::map<std::string, std::string>& retval);
// Return component level config
- static void GetComponentConfig(const proto::api::Topology& _topology,
- const std::string& _component_name,
- std::map<std::string, std::string>& config);
+ static void GetComponentRuntimeConfig(const proto::api::Topology& _topology,
+ const std::string& _component_name,
+ std::map<std::string, std::string>& config);
// Update component level config
- static void SetComponentConfig(proto::api::Topology* _topology,
- const std::string& _component_name,
- const std::map<std::string, std::string>& config);
+ static void SetComponentRuntimeConfig(proto::api::Topology* _topology,
+ const std::string& _component_name,
+ const std::map<std::string, std::string>& config);
// Get the topology config value given the config key
static const std::string GetTopologyConfigValue(const proto::api::Topology& _topology,
@@ -190,11 +190,11 @@ class TopologyConfigHelper {
const std::string& _config_name,
bool _default_value);
// Convert topology config to a key value map
- static void ConvertConfigToKVMap(const proto::api::Config& _config,
- std::map<std::string, std::string>& retval);
+ static void ConvertRuntimeConfigToKVMap(const proto::api::Config& _config,
+ std::map<std::string, std::string>& retval);
// Update topology config from a key value map
- static void UpdateConfigFromKVMap(proto::api::Config* _config,
- const std::map<std::string, std::string>& _kv_map);
+ static void UpdateRuntimeConfigFromKVMap(proto::api::Config* _config,
+ const std::map<std::string, std::string>& _kv_map);
};
} // namespace config
} // namespace heron
diff --git a/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp b/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp
index 698c772..f333314 100644
--- a/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp
+++ b/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp
@@ -30,15 +30,15 @@ const sp_string MESSAGE_TIMEOUT = "30"; // seconds
int NUM_SPOUT_INSTANCES = 2;
int NUM_BOLT_INSTANCES = 3;
-const sp_string TOPOLOGY_USER_CONFIG = "topology.user.test_config";
+const sp_string TOPOLOGY_USER_CONFIG = "topology.user.test_config:runtime";
const sp_string TOPOLOGY_USER_CONFIG_VALUE = "-1";
const sp_string NEW_TOPOLOGY_USER_CONFIG_VALUE = "1";
const sp_string NEW_TOPOLOGY_USER_CONFIG_VALUE_2 = "11";
-const sp_string SPOUT_USER_CONFIG = "topology.user.spout.test_config";
+const sp_string SPOUT_USER_CONFIG = "topology.user.spout.test_config:runtime";
const sp_string SPOUT_USER_CONFIG_VALUE = "-2";
const sp_string NEW_SPOUT_USER_CONFIG_VALUE = "2";
const sp_string NEW_SPOUT_USER_CONFIG_VALUE_2 = "22";
-const sp_string BOLT_USER_CONFIG = "topology.user.bolt.test_config";
+const sp_string BOLT_USER_CONFIG = "topology.user.bolt.test_config:runtime";
const sp_string BOLT_USER_CONFIG_VALUE = "-3";
const sp_string NEW_BOLT_USER_CONFIG_VALUE = "3";
const sp_string NEW_BOLT_USER_CONFIG_VALUE_2 = "33";
@@ -136,9 +136,7 @@ TEST(TopologyConfigHelper, GetAndSetTopologyConfig) {
// Test initial config
std::map<std::string, std::string> old_config;
- heron::config::TopologyConfigHelper::GetTopologyConfig(*test_topology, old_config);
- EXPECT_EQ(old_config[heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS],
- MESSAGE_TIMEOUT);
+ heron::config::TopologyConfigHelper::GetTopologyRuntimeConfig(*test_topology, old_config);
EXPECT_EQ(old_config[TOPOLOGY_USER_CONFIG], TOPOLOGY_USER_CONFIG_VALUE);
// Test GetComponentConfigValue function
@@ -152,24 +150,19 @@ TEST(TopologyConfigHelper, GetAndSetTopologyConfig) {
"");
// Set and then test updated config
- std::string runtime_user_config_key = TOPOLOGY_USER_CONFIG + ":runtime";
std::map<std::string, std::string> update;
- update[runtime_user_config_key] = NEW_TOPOLOGY_USER_CONFIG_VALUE;
- heron::config::TopologyConfigHelper::SetTopologyConfig(test_topology, update);
+ update[TOPOLOGY_USER_CONFIG] = NEW_TOPOLOGY_USER_CONFIG_VALUE;
+ heron::config::TopologyConfigHelper::SetTopologyRuntimeConfig(test_topology, update);
std::map<std::string, std::string> updated_config;
- heron::config::TopologyConfigHelper::GetTopologyConfig(*test_topology, updated_config);
- EXPECT_EQ(updated_config[heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS],
- MESSAGE_TIMEOUT);
- EXPECT_EQ(updated_config[TOPOLOGY_USER_CONFIG], TOPOLOGY_USER_CONFIG_VALUE);
- EXPECT_EQ(updated_config[runtime_user_config_key], NEW_TOPOLOGY_USER_CONFIG_VALUE);
+ heron::config::TopologyConfigHelper::GetTopologyRuntimeConfig(*test_topology, updated_config);
+ EXPECT_EQ(updated_config[TOPOLOGY_USER_CONFIG], NEW_TOPOLOGY_USER_CONFIG_VALUE);
- update[runtime_user_config_key] = NEW_TOPOLOGY_USER_CONFIG_VALUE_2;
- heron::config::TopologyConfigHelper::SetTopologyConfig(test_topology, update);
+ update[TOPOLOGY_USER_CONFIG] = NEW_TOPOLOGY_USER_CONFIG_VALUE_2;
+ heron::config::TopologyConfigHelper::SetTopologyRuntimeConfig(test_topology, update);
updated_config.clear();
- heron::config::TopologyConfigHelper::GetTopologyConfig(*test_topology, updated_config);
- EXPECT_EQ(updated_config[TOPOLOGY_USER_CONFIG], TOPOLOGY_USER_CONFIG_VALUE);
- EXPECT_EQ(updated_config[runtime_user_config_key], NEW_TOPOLOGY_USER_CONFIG_VALUE_2);
+ heron::config::TopologyConfigHelper::GetTopologyRuntimeConfig(*test_topology, updated_config);
+ EXPECT_EQ(updated_config[TOPOLOGY_USER_CONFIG], NEW_TOPOLOGY_USER_CONFIG_VALUE_2);
}
TEST(TopologyConfigHelper, GetAndSetComponentConfig) {
@@ -183,15 +176,17 @@ TEST(TopologyConfigHelper, GetAndSetComponentConfig) {
std::string non_test_bolt = "test_bolt1";
// Test initial config
std::map<std::string, std::string> old_config;
- heron::config::TopologyConfigHelper::GetComponentConfig(*test_topology, test_spout, old_config);
- EXPECT_EQ(old_config[heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM],
- std::to_string(NUM_SPOUT_INSTANCES));
+ heron::config::TopologyConfigHelper::GetComponentRuntimeConfig(*test_topology, test_spout,
+ old_config);
EXPECT_EQ(old_config[SPOUT_USER_CONFIG], SPOUT_USER_CONFIG_VALUE);
+ // parallelism is not a runtime config, hence it is not extracted
+ EXPECT_EQ(old_config[heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM], "");
old_config.clear();
- heron::config::TopologyConfigHelper::GetComponentConfig(*test_topology, test_bolt, old_config);
- EXPECT_EQ(old_config[heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM],
- std::to_string(NUM_BOLT_INSTANCES));
+ heron::config::TopologyConfigHelper::GetComponentRuntimeConfig(*test_topology, test_bolt,
+ old_config);
EXPECT_EQ(old_config[BOLT_USER_CONFIG], BOLT_USER_CONFIG_VALUE);
+ // parallelism is not a runtime config, hence it is not extracted
+ EXPECT_EQ(old_config[heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM], "");
// Test GetComponentConfigValue function
EXPECT_EQ(
@@ -204,55 +199,40 @@ TEST(TopologyConfigHelper, GetAndSetComponentConfig) {
"");
// Set user configs to new values
- std::string runtime_spout_user_config_key = SPOUT_USER_CONFIG + ":runtime";
- std::string runtime_bolt_user_config_key = BOLT_USER_CONFIG + ":runtime";
-
std::map<std::string, std::string> update;
- update[runtime_spout_user_config_key] = NEW_SPOUT_USER_CONFIG_VALUE;
- heron::config::TopologyConfigHelper::SetComponentConfig(test_topology, test_spout, update);
+ update[SPOUT_USER_CONFIG] = NEW_SPOUT_USER_CONFIG_VALUE;
+ heron::config::TopologyConfigHelper::SetComponentRuntimeConfig(test_topology, test_spout, update);
update.clear();
- update[runtime_bolt_user_config_key] = NEW_BOLT_USER_CONFIG_VALUE;
- heron::config::TopologyConfigHelper::SetComponentConfig(test_topology, test_bolt, update);
+ update[BOLT_USER_CONFIG] = NEW_BOLT_USER_CONFIG_VALUE;
+ heron::config::TopologyConfigHelper::SetComponentRuntimeConfig(test_topology, test_bolt, update);
// Test user configs are updated
std::map<std::string, std::string> updated_config;
- heron::config::TopologyConfigHelper::GetComponentConfig(
+ heron::config::TopologyConfigHelper::GetComponentRuntimeConfig(
*test_topology, test_spout, updated_config);
- EXPECT_EQ(updated_config[heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM],
- std::to_string(NUM_SPOUT_INSTANCES));
- EXPECT_EQ(updated_config[SPOUT_USER_CONFIG], SPOUT_USER_CONFIG_VALUE);
- EXPECT_EQ(updated_config[runtime_spout_user_config_key], NEW_SPOUT_USER_CONFIG_VALUE);
+ EXPECT_EQ(updated_config[SPOUT_USER_CONFIG], NEW_SPOUT_USER_CONFIG_VALUE);
updated_config.clear();
- heron::config::TopologyConfigHelper::GetComponentConfig(
+ heron::config::TopologyConfigHelper::GetComponentRuntimeConfig(
*test_topology, test_bolt, updated_config);
- EXPECT_EQ(updated_config[heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM],
- std::to_string(NUM_BOLT_INSTANCES));
- EXPECT_EQ(updated_config[BOLT_USER_CONFIG], BOLT_USER_CONFIG_VALUE);
- EXPECT_EQ(updated_config[runtime_bolt_user_config_key], NEW_BOLT_USER_CONFIG_VALUE);
+ EXPECT_EQ(updated_config[BOLT_USER_CONFIG], NEW_BOLT_USER_CONFIG_VALUE);
// Set to new value 2 and verify
update.clear();
- update[runtime_spout_user_config_key] = NEW_SPOUT_USER_CONFIG_VALUE_2;
- heron::config::TopologyConfigHelper::SetComponentConfig(test_topology, test_spout, update);
+ update[SPOUT_USER_CONFIG] = NEW_SPOUT_USER_CONFIG_VALUE_2;
+ heron::config::TopologyConfigHelper::SetComponentRuntimeConfig(test_topology, test_spout, update);
update.clear();
- update[runtime_bolt_user_config_key] = NEW_BOLT_USER_CONFIG_VALUE_2;
- heron::config::TopologyConfigHelper::SetComponentConfig(test_topology, test_bolt, update);
+ update[BOLT_USER_CONFIG] = NEW_BOLT_USER_CONFIG_VALUE_2;
+ heron::config::TopologyConfigHelper::SetComponentRuntimeConfig(test_topology, test_bolt, update);
// Test user configs are updated
updated_config.clear();
- heron::config::TopologyConfigHelper::GetComponentConfig(
+ heron::config::TopologyConfigHelper::GetComponentRuntimeConfig(
*test_topology, test_spout, updated_config);
- EXPECT_EQ(updated_config[heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM],
- std::to_string(NUM_SPOUT_INSTANCES));
- EXPECT_EQ(updated_config[SPOUT_USER_CONFIG], SPOUT_USER_CONFIG_VALUE);
- EXPECT_EQ(updated_config[runtime_spout_user_config_key], NEW_SPOUT_USER_CONFIG_VALUE_2);
+ EXPECT_EQ(updated_config[SPOUT_USER_CONFIG], NEW_SPOUT_USER_CONFIG_VALUE_2);
updated_config.clear();
- heron::config::TopologyConfigHelper::GetComponentConfig(
+ heron::config::TopologyConfigHelper::GetComponentRuntimeConfig(
*test_topology, test_bolt, updated_config);
- EXPECT_EQ(updated_config[heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM],
- std::to_string(NUM_BOLT_INSTANCES));
- EXPECT_EQ(updated_config[BOLT_USER_CONFIG], BOLT_USER_CONFIG_VALUE);
- EXPECT_EQ(updated_config[runtime_bolt_user_config_key], NEW_BOLT_USER_CONFIG_VALUE_2);
+ EXPECT_EQ(updated_config[BOLT_USER_CONFIG], NEW_BOLT_USER_CONFIG_VALUE_2);
}
TEST(TopologyConfigHelper, GetRuntimeConfigKey) {
diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp
index 738fa03..7097541 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr.cpp
@@ -1126,14 +1126,14 @@ void StMgr::PatchPhysicalPlanWithHydratedTopology(proto::system::PhysicalPlan* _
proto::api::TopologyState st = _pplan->topology().state();
std::map<std::string, std::string> topology_config;
- config::TopologyConfigHelper::GetTopologyConfig(_pplan->topology(), topology_config);
+ config::TopologyConfigHelper::GetTopologyRuntimeConfig(_pplan->topology(), topology_config);
std::unordered_set<std::string> components;
std::map<std::string, std::map<std::string, std::string>> component_config;
config::TopologyConfigHelper::GetAllComponentNames(_pplan->topology(), components);
for (auto iter = components.begin(); iter != components.end(); ++iter) {
std::map<std::string, std::string> config;
- config::TopologyConfigHelper::GetComponentConfig(_pplan->topology(), *iter, topology_config);
+ config::TopologyConfigHelper::GetComponentRuntimeConfig(_pplan->topology(), *iter, config);
component_config[*iter] = config;
}
@@ -1143,9 +1143,10 @@ void StMgr::PatchPhysicalPlanWithHydratedTopology(proto::system::PhysicalPlan* _
// Restore new topology data
_pplan->mutable_topology()->set_state(st);
- config::TopologyConfigHelper::SetTopologyConfig(_pplan->mutable_topology(), topology_config);
+ config::TopologyConfigHelper::SetTopologyRuntimeConfig(_pplan->mutable_topology(),
+ topology_config);
for (auto iter = components.begin(); iter != components.end(); ++iter) {
- config::TopologyConfigHelper::SetComponentConfig(_pplan->mutable_topology(), *iter,
+ config::TopologyConfigHelper::SetComponentRuntimeConfig(_pplan->mutable_topology(), *iter,
component_config[*iter]);
}
}
diff --git a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
index 41363d9..5686f41 100644
--- a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
+++ b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
@@ -1899,7 +1899,7 @@ TEST(StMgr, test_PatchPhysicalPlanWithHydratedTopology) {
std::map<std::string, std::string> update;
update["conf.new"] = "test";
update[heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS] = "10";
- heron::config::TopologyConfigHelper::SetTopologyConfig(pplan->mutable_topology(), update);
+ heron::config::TopologyConfigHelper::SetTopologyRuntimeConfig(pplan->mutable_topology(), update);
// Verify updated runtime data is still in the patched physical plan
// The topology in the physical plan should have the old name
diff --git a/heron/tmaster/src/cpp/manager/tmaster.cpp b/heron/tmaster/src/cpp/manager/tmaster.cpp
index c27ccfe..235e49c 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.cpp
+++ b/heron/tmaster/src/cpp/manager/tmaster.cpp
@@ -637,12 +637,12 @@ bool TMaster::UpdateRuntimeConfigInTopology(proto::api::Topology* _topology,
const char* topology_key = config::TopologyConfigHelper::GetReservedTopologyConfigKey();
for (iter = _config.begin(); iter != _config.end(); ++iter) {
// Get config for topology or component.
- std::map<std::string, std::string> runtime_config;
- config::TopologyConfigHelper::ConvertToRuntimeConfigs(iter->second, runtime_config);
+ std::map<std::string, std::string> config;
+ config::TopologyConfigHelper::ConvertToRuntimeConfigs(iter->second, config);
if (iter->first == topology_key) {
- config::TopologyConfigHelper::SetTopologyConfig(_topology, runtime_config);
+ config::TopologyConfigHelper::SetTopologyRuntimeConfig(_topology, config);
} else {
- config::TopologyConfigHelper::SetComponentConfig(_topology, iter->first, runtime_config);
+ config::TopologyConfigHelper::SetComponentRuntimeConfig(_topology, iter->first, config);
}
}
diff --git a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
index 0f37dc8..137e7cd 100644
--- a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
+++ b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
@@ -50,10 +50,10 @@ const sp_string heron_internals_config_filename =
const sp_string metrics_sinks_config_filename =
"../../../../../../../../heron/config/metrics_sinks.yaml";
-const sp_string topology_runtime_config_1 = "topology.runtime.test_config";
-const sp_string topology_runtime_config_2 = "topology.runtime.test_config2";
-const sp_string spout_runtime_config = "topology.runtime.spout.test_config";
-const sp_string bolt_runtime_config = "topology.runtime.bolt.test_config";
+const sp_string topology_init_config_1 = "topology.runtime.test_config";
+const sp_string topology_init_config_2 = "topology.runtime.test_config2";
+const sp_string spout_init_config = "topology.runtime.spout.test_config";
+const sp_string bolt_init_config = "topology.runtime.bolt.test_config";
// Generate a dummy topology
static heron::proto::api::Topology* GenerateDummyTopology(
@@ -93,7 +93,7 @@ static heron::proto::api::Topology* GenerateDummyTopology(
kv->set_value(std::to_string(num_spout_instances));
// Add runtime config
heron::proto::api::Config::KeyValue* kv1 = config->add_kvs();
- kv1->set_key(spout_runtime_config);
+ kv1->set_key(spout_init_config);
kv1->set_value("-1");
}
// Set bolts
@@ -123,7 +123,7 @@ static heron::proto::api::Topology* GenerateDummyTopology(
kv->set_value(std::to_string(num_bolt_instances));
// Add runtime config
heron::proto::api::Config::KeyValue* kv1 = config->add_kvs();
- kv1->set_key(bolt_runtime_config);
+ kv1->set_key(bolt_init_config);
kv1->set_value("-1");
}
// Set message timeout
@@ -133,10 +133,10 @@ static heron::proto::api::Topology* GenerateDummyTopology(
kv->set_value(MESSAGE_TIMEOUT);
// Add runtime config
heron::proto::api::Config::KeyValue* kv1 = topology_config->add_kvs();
- kv1->set_key(topology_runtime_config_1);
+ kv1->set_key(topology_init_config_1);
kv1->set_value("-1");
heron::proto::api::Config::KeyValue* kv2 = topology_config->add_kvs();
- kv2->set_key(topology_runtime_config_2);
+ kv2->set_key(topology_init_config_2);
kv2->set_value("-1");
// Set state
@@ -706,35 +706,13 @@ TEST(StMgr, test_runtime_config) {
// auto c = t.topology_config();
for (size_t i = 0; i < common.stmgrs_list_.size(); ++i) {
while (!common.stmgrs_list_[i]->GetPhysicalPlan()) sleep(1);
- std::map<std::string, std::string> init_config, init_spout_config, init_bolt_config;
- const heron::proto::system::PhysicalPlan* pplan = common.stmgrs_list_[i]->GetPhysicalPlan();
- heron::config::TopologyConfigHelper::GetTopologyConfig(pplan->topology(), init_config);
- EXPECT_EQ(init_config[topology_runtime_config_1], "-1");
- EXPECT_EQ(init_config[topology_runtime_config_2], "-1");
- heron::config::TopologyConfigHelper::GetComponentConfig(pplan->topology(),
- runtime_test_spout, init_spout_config);
- EXPECT_EQ(init_spout_config[spout_runtime_config], "-1");
- heron::config::TopologyConfigHelper::GetComponentConfig(pplan->topology(),
- runtime_test_bolt, init_bolt_config);
- EXPECT_EQ(init_bolt_config[bolt_runtime_config], "-1");
}
- std::map<std::string, std::string> init_config, init_spout_config, init_bolt_config;
- const heron::proto::system::PhysicalPlan* init_pplan = common.tmaster_->getPhysicalPlan();
- heron::config::TopologyConfigHelper::GetTopologyConfig(init_pplan->topology(), init_config);
- EXPECT_EQ(init_config[topology_runtime_config_1], "-1");
- EXPECT_EQ(init_config[topology_runtime_config_2], "-1");
- heron::config::TopologyConfigHelper::GetComponentConfig(init_pplan->topology(),
- runtime_test_spout, init_spout_config);
- EXPECT_EQ(init_spout_config[spout_runtime_config], "-1");
- heron::config::TopologyConfigHelper::GetComponentConfig(init_pplan->topology(),
- runtime_test_bolt, init_bolt_config);
- EXPECT_EQ(init_bolt_config[bolt_runtime_config], "-1");
// Test ValidateRuntimeConfig()
heron::tmaster::ComponentConfigMap validate_good_config_map;
std::map<std::string, std::string> validate_good_config;
- validate_good_config[topology_runtime_config_1] = "1";
- validate_good_config[topology_runtime_config_2] = "2";
+ validate_good_config[topology_init_config_1] = "1";
+ validate_good_config[topology_init_config_2] = "2";
const char* topology_key = heron::config::TopologyConfigHelper::GetReservedTopologyConfigKey();
validate_good_config_map[topology_key] = validate_good_config;
validate_good_config_map["spout1"] = validate_good_config;
@@ -742,7 +720,7 @@ TEST(StMgr, test_runtime_config) {
heron::tmaster::ComponentConfigMap validate_bad_config_map;
std::map<std::string, std::string> validate_bad_config;
- validate_good_config[topology_runtime_config_1] = "1";
+ validate_good_config[topology_init_config_1] = "1";
validate_bad_config_map["unknown_component"] = validate_good_config;
EXPECT_EQ(common.tmaster_->ValidateRuntimeConfig(validate_bad_config_map), false);
@@ -772,10 +750,10 @@ TEST(StMgr, test_runtime_config) {
// Post runtime config request with good configs and expect 200 response.
std::vector<std::string> good_config;
- good_config.push_back(topology_runtime_config_1 + ":1");
- good_config.push_back(topology_runtime_config_2 + ":2");
- good_config.push_back(runtime_test_spout + ":" + spout_runtime_config + ":3");
- good_config.push_back(runtime_test_bolt + ":" + bolt_runtime_config + ":4");
+ good_config.push_back(topology_init_config_1 + ":1");
+ good_config.push_back(topology_init_config_2 + ":2");
+ good_config.push_back(runtime_test_spout + ":" + spout_init_config + ":3");
+ good_config.push_back(runtime_test_bolt + ":" + bolt_init_config + ":4");
std::thread* good_config_update_thread = new std::thread(UpdateRuntimeConfig,
common.topology_id_, common.tmaster_controller_port_, good_config, 200, "good_config");
good_config_update_thread->join();
@@ -787,35 +765,28 @@ TEST(StMgr, test_runtime_config) {
for (size_t i = 0; i < common.stmgrs_list_.size(); ++i) {
std::map<std::string, std::string> updated_config, updated_spout_config, updated_bolt_config;
const heron::proto::system::PhysicalPlan* pplan = common.stmgrs_list_[i]->GetPhysicalPlan();
- heron::config::TopologyConfigHelper::GetTopologyConfig(pplan->topology(), updated_config);
- EXPECT_EQ(updated_config[topology_runtime_config_1], "-1");
- EXPECT_EQ(updated_config[topology_runtime_config_1 + ":runtime"], "1");
- EXPECT_EQ(updated_config[topology_runtime_config_2], "-1");
- EXPECT_EQ(updated_config[topology_runtime_config_2 + ":runtime"], "2");
- heron::config::TopologyConfigHelper::GetComponentConfig(pplan->topology(),
+ heron::config::TopologyConfigHelper::GetTopologyRuntimeConfig(pplan->topology(),
+ updated_config);
+ EXPECT_EQ(updated_config[topology_init_config_1 + ":runtime"], "1");
+ EXPECT_EQ(updated_config[topology_init_config_2 + ":runtime"], "2");
+ heron::config::TopologyConfigHelper::GetComponentRuntimeConfig(pplan->topology(),
runtime_test_spout, updated_spout_config);
- EXPECT_EQ(updated_spout_config[spout_runtime_config], "-1");
- EXPECT_EQ(updated_spout_config[spout_runtime_config + ":runtime"], "3");
- heron::config::TopologyConfigHelper::GetComponentConfig(pplan->topology(),
+ EXPECT_EQ(updated_spout_config[spout_init_config + ":runtime"], "3");
+ heron::config::TopologyConfigHelper::GetComponentRuntimeConfig(pplan->topology(),
runtime_test_bolt, updated_bolt_config);
- EXPECT_EQ(updated_bolt_config[bolt_runtime_config], "-1");
- EXPECT_EQ(updated_bolt_config[bolt_runtime_config + ":runtime"], "4");
+ EXPECT_EQ(updated_bolt_config[bolt_init_config + ":runtime"], "4");
}
std::map<std::string, std::string> updated_config, updated_spout_config, updated_bolt_config;
const heron::proto::system::PhysicalPlan* pplan = common.tmaster_->getPhysicalPlan();
- heron::config::TopologyConfigHelper::GetTopologyConfig(pplan->topology(), updated_config);
- EXPECT_EQ(updated_config[topology_runtime_config_1], "-1");
- EXPECT_EQ(updated_config[topology_runtime_config_1 + ":runtime"], "1");
- EXPECT_EQ(updated_config[topology_runtime_config_2], "-1");
- EXPECT_EQ(updated_config[topology_runtime_config_2 + ":runtime"], "2");
- heron::config::TopologyConfigHelper::GetComponentConfig(pplan->topology(),
+ heron::config::TopologyConfigHelper::GetTopologyRuntimeConfig(pplan->topology(), updated_config);
+ EXPECT_EQ(updated_config[topology_init_config_1 + ":runtime"], "1");
+ EXPECT_EQ(updated_config[topology_init_config_2 + ":runtime"], "2");
+ heron::config::TopologyConfigHelper::GetComponentRuntimeConfig(pplan->topology(),
runtime_test_spout, updated_spout_config);
- EXPECT_EQ(updated_spout_config[spout_runtime_config], "-1");
- EXPECT_EQ(updated_spout_config[spout_runtime_config + ":runtime"], "3");
- heron::config::TopologyConfigHelper::GetComponentConfig(pplan->topology(),
+ EXPECT_EQ(updated_spout_config[spout_init_config + ":runtime"], "3");
+ heron::config::TopologyConfigHelper::GetComponentRuntimeConfig(pplan->topology(),
runtime_test_bolt, updated_bolt_config);
- EXPECT_EQ(updated_bolt_config[bolt_runtime_config], "-1");
- EXPECT_EQ(updated_bolt_config[bolt_runtime_config + ":runtime"], "4");
+ EXPECT_EQ(updated_bolt_config[bolt_init_config + ":runtime"], "4");
// Stop the schedulers
for (size_t i = 0; i < common.ss_list_.size(); ++i) {
--
To stop receiving notification emails like this one, please contact
huijun@apache.org.