You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/04/12 21:57:15 UTC

[incubator-heron] branch master updated: Integrate runtime config and rate limit (#2846)

This is an automated email from the ASF dual-hosted git repository.

huijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fe99ed  Integrate runtime config and rate limit (#2846)
1fe99ed is described below

commit 1fe99edbe926b1a620ad32f5c5ec30eaa8cd8f7d
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Thu Apr 12 14:57:12 2018 -0700

    Integrate runtime config and rate limit (#2846)
    
    * Integrate runtime config and rate limit
    
    * more clean up
    
    * Fix integration test by avoiding changing hydrated topology in stmgr
---
 .../src/cpp/config/topology-config-helper.cpp      | 43 ++++++++++++++---
 .../common/src/cpp/config/topology-config-helper.h | 15 ++++++
 .../cpp/config/topology-config-helper_unittest.cpp | 42 +++++++++++++++-
 heron/stmgr/src/cpp/manager/instance-server.cpp    | 24 ++++++----
 heron/stmgr/src/cpp/manager/stmgr.cpp              | 42 ++++++++++++++--
 heron/stmgr/src/cpp/manager/stmgr.h                |  6 +++
 heron/stmgr/tests/cpp/server/stmgr_unittest.cpp    | 56 ++++++++++++++++++++++
 heron/tmaster/src/cpp/manager/tcontroller.cpp      |  4 +-
 heron/tmaster/src/cpp/manager/tmaster.cpp          | 23 +++++----
 heron/tmaster/src/cpp/manager/tmaster.h            | 10 ++--
 .../tmaster/tests/cpp/server/tmaster_unittest.cpp  |  3 +-
 11 files changed, 228 insertions(+), 40 deletions(-)

diff --git a/heron/common/src/cpp/config/topology-config-helper.cpp b/heron/common/src/cpp/config/topology-config-helper.cpp
index e84a7aa..3200330 100644
--- a/heron/common/src/cpp/config/topology-config-helper.cpp
+++ b/heron/common/src/cpp/config/topology-config-helper.cpp
@@ -29,6 +29,9 @@
 namespace heron {
 namespace config {
 
+static const char TOPOLOGY_CONFIG_KEY[] = "_topology_";
+static const char RUNTIME_CONFIG_POSTFIX[] = ":runtime";
+
 TopologyConfigVars::TopologyReliabilityMode StringToReliabilityMode(const std::string& _mode) {
   if (_mode == "ATMOST_ONCE") {
     return TopologyConfigVars::TopologyReliabilityMode::ATMOST_ONCE;
@@ -411,6 +414,20 @@ bool TopologyConfigHelper::DropTuplesUponBackpressure(const proto::api::Topology
                                TopologyConfigVars::TOPOLOGY_DROPTUPLES_UPON_BACKPRESSURE, false);
 }
 
+std::string TopologyConfigHelper::GetRuntimeConfigKey(const std::string& _key) {
+  return _key + RUNTIME_CONFIG_POSTFIX;
+}
+
+// Convert configs in map to runtime configs (append runtime postfix)
+void TopologyConfigHelper::ConvertToRuntimeConfigs(
+    const std::map<std::string, std::string>& _origin,
+    std::map<std::string, std::string>& _retval) {
+  std::map<std::string, std::string>::const_iterator it;
+  for (it = _origin.begin(); it != _origin.end(); ++it) {
+    _retval[GetRuntimeConfigKey(it->first)] = it->second;
+  }
+}
+
 // Return topology level config
 void TopologyConfigHelper::GetTopologyConfig(const proto::api::Topology& _topology,
                                              std::map<std::string, std::string>& retval) {
@@ -514,12 +531,9 @@ bool TopologyConfigHelper::GetBooleanConfigValue(const proto::api::Topology& _to
                                                  const std::string& _config_name,
                                                  bool _default_value) {
   static const std::string value_true_ = "true";
-  const proto::api::Config& cfg = _topology.topology_config();
-  const std::string value = GetConfigValue(cfg, _config_name, "");
-  if (!value.empty()) {
-    return value_true_.compare(value.c_str()) == 0;
-  }
-  return _default_value;
+  const std::string value = GetTopologyConfigValue(_topology, _config_name, "");
+
+  return value_true_.compare(value.c_str()) == 0;
 }
 
 // Convert topology config to a key value map
@@ -530,6 +544,14 @@ void TopologyConfigHelper::ConvertConfigToKVMap(const proto::api::Config& _confi
   }
 }
 
+const std::string TopologyConfigHelper::GetTopologyConfigValue(
+    const proto::api::Topology& _topology,
+    const std::string& _key,
+    const std::string& _default) {
+  const proto::api::Config& cfg = _topology.topology_config();
+  return GetConfigValue(cfg, _key, _default);
+}
+
 const std::string TopologyConfigHelper::GetComponentConfigValue(
     const proto::api::Topology& _topology,
     const std::string& _component,
@@ -552,13 +574,20 @@ const std::string TopologyConfigHelper::GetComponentConfigValue(
 
 sp_int64 TopologyConfigHelper::GetComponentOutputBPS(const proto::api::Topology& _topology,
                                                      const std::string& _component) {
-  const std::string value = GetComponentConfigValue(_topology, _component,
+  const std::string init_value = GetComponentConfigValue(_topology, _component,
       TopologyConfigVars::TOPOLOGY_COMPONENT_OUTPUT_BPS, "");
+  const std::string value = GetComponentConfigValue(_topology, _component,
+      GetRuntimeConfigKey(TopologyConfigVars::TOPOLOGY_COMPONENT_OUTPUT_BPS), init_value);
 
   if (!value.empty()) {
     return atol(value.c_str());
   }
+
   return -1;  // default to -1 (no rate limit)
 }
+
+const char* TopologyConfigHelper::GetReservedTopologyConfigKey() {
+  return TOPOLOGY_CONFIG_KEY;
+}
 }  // namespace config
 }  // namespace heron
diff --git a/heron/common/src/cpp/config/topology-config-helper.h b/heron/common/src/cpp/config/topology-config-helper.h
index a8dd41b..b4b8651 100644
--- a/heron/common/src/cpp/config/topology-config-helper.h
+++ b/heron/common/src/cpp/config/topology-config-helper.h
@@ -136,6 +136,13 @@ class TopologyConfigHelper {
   // Do we want to drop tuples upon backpressure detection
   static bool DropTuplesUponBackpressure(const proto::api::Topology& _topology);
 
+  // Get runtime config key
+  static std::string GetRuntimeConfigKey(const std::string& key);
+
+  // Convert configs in map to runtime configs (append runtime postfix)
+  static void ConvertToRuntimeConfigs(const std::map<std::string, std::string>& _origin,
+                                      std::map<std::string, std::string>& _retval);
+
   // Return topology level config
   static void GetTopologyConfig(const proto::api::Topology& _topology,
                                 std::map<std::string, std::string>& retval);
@@ -155,6 +162,11 @@ class TopologyConfigHelper {
                                  const std::string& _component_name,
                                  const std::map<std::string, std::string>& config);
 
+  // Get the topology config value given the config key
+  static const std::string GetTopologyConfigValue(const proto::api::Topology& _topology,
+                                                  const std::string& _key,
+                                                  const std::string& _default);
+
   // Get the config value given component name and config key
   static const std::string GetComponentConfigValue(const proto::api::Topology& _topology,
                                                    const std::string& _component,
@@ -170,6 +182,9 @@ class TopologyConfigHelper {
   static sp_int64 GetComponentOutputBPS(const proto::api::Topology& _topology,
                                         const std::string& _component);
 
+  // Get reserved topology config key.
+  static const char* GetReservedTopologyConfigKey();
+
  private:
   static bool GetBooleanConfigValue(const proto::api::Topology& _topology,
                                     const std::string& _config_name,
diff --git a/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp b/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp
index f99a3d5..698c772 100644
--- a/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp
+++ b/heron/common/tests/cpp/config/topology-config-helper_unittest.cpp
@@ -134,13 +134,23 @@ TEST(TopologyConfigHelper, GetAndSetTopologyConfig) {
       "test_topology", "123", 3, NUM_SPOUT_INSTANCES, 3, NUM_BOLT_INSTANCES,
       heron::proto::api::SHUFFLE);
 
-  // Test init config
+  // Test initial config
   std::map<std::string, std::string> old_config;
   heron::config::TopologyConfigHelper::GetTopologyConfig(*test_topology, old_config);
   EXPECT_EQ(old_config[heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS],
             MESSAGE_TIMEOUT);
   EXPECT_EQ(old_config[TOPOLOGY_USER_CONFIG], TOPOLOGY_USER_CONFIG_VALUE);
 
+  // Test GetComponentConfigValue function
+  EXPECT_EQ(
+      heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+          *test_topology, TOPOLOGY_USER_CONFIG, ""),
+      TOPOLOGY_USER_CONFIG_VALUE);
+  EXPECT_EQ(
+      heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+          *test_topology, TOPOLOGY_USER_CONFIG + ".bad", ""),
+      "");
+
   // Set and then test updated config
   std::string runtime_user_config_key = TOPOLOGY_USER_CONFIG + ":runtime";
   std::map<std::string, std::string> update;
@@ -171,7 +181,7 @@ TEST(TopologyConfigHelper, GetAndSetComponentConfig) {
   std::string non_test_spout = "test_spout2";
   std::string test_bolt = "test_bolt2";
   std::string non_test_bolt = "test_bolt1";
-  // Test init config
+  // Test initial config
   std::map<std::string, std::string> old_config;
   heron::config::TopologyConfigHelper::GetComponentConfig(*test_topology, test_spout, old_config);
   EXPECT_EQ(old_config[heron::config::TopologyConfigVars::TOPOLOGY_COMPONENT_PARALLELISM],
@@ -183,6 +193,16 @@ TEST(TopologyConfigHelper, GetAndSetComponentConfig) {
       std::to_string(NUM_BOLT_INSTANCES));
   EXPECT_EQ(old_config[BOLT_USER_CONFIG], BOLT_USER_CONFIG_VALUE);
 
+  // Test GetComponentConfigValue function
+  EXPECT_EQ(
+      heron::config::TopologyConfigHelper::GetComponentConfigValue(
+          *test_topology, test_spout, SPOUT_USER_CONFIG, ""),
+      SPOUT_USER_CONFIG_VALUE);
+  EXPECT_EQ(
+      heron::config::TopologyConfigHelper::GetComponentConfigValue(
+          *test_topology, test_spout, SPOUT_USER_CONFIG + ".bad", ""),
+      "");
+
   // Set user configs to new values
   std::string runtime_spout_user_config_key = SPOUT_USER_CONFIG + ":runtime";
   std::string runtime_bolt_user_config_key = BOLT_USER_CONFIG + ":runtime";
@@ -235,6 +255,24 @@ TEST(TopologyConfigHelper, GetAndSetComponentConfig) {
   EXPECT_EQ(updated_config[runtime_bolt_user_config_key], NEW_BOLT_USER_CONFIG_VALUE_2);
 }
 
+TEST(TopologyConfigHelper, GetRuntimeConfigKey) {
+  EXPECT_EQ(
+      heron::config::TopologyConfigHelper::GetRuntimeConfigKey("conf.test1"),
+      "conf.test1:runtime");
+}
+
+TEST(TopologyConfigHelper, ConvertToRuntimeConfigs) {
+  std::map<std::string, std::string> original_config;
+  original_config["conf.test1"] = "a";
+  original_config["conf.test2"] = "b";
+
+  std::map<std::string, std::string> runtime_config;
+  heron::config::TopologyConfigHelper::ConvertToRuntimeConfigs(original_config, runtime_config);
+
+  EXPECT_EQ(runtime_config["conf.test1:runtime"], "a");
+  EXPECT_EQ(runtime_config["conf.test2:runtime"], "b");
+}
+
 int main(int argc, char **argv) {
   heron::common::Initialize(argv[0]);
   testing::InitGoogleTest(&argc, argv);
diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp b/heron/stmgr/src/cpp/manager/instance-server.cpp
index 7288749..30e787e 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.cpp
+++ b/heron/stmgr/src/cpp/manager/instance-server.cpp
@@ -15,9 +15,11 @@
  */
 
 #include "manager/instance-server.h"
+
 #include <iostream>
-#include <unordered_set>
+#include <map>
 #include <string>
+#include <unordered_set>
 #include <vector>
 #include "manager/checkpoint-gateway.h"
 #include "util/neighbour-calculator.h"
@@ -422,6 +424,9 @@ void InstanceServer::DrainCheckpoint(sp_int32 _task_id,
 
 void InstanceServer::BroadcastNewPhysicalPlan(const proto::system::PhysicalPlan& _pplan) {
   // TODO(vikasr) We do not handle any changes to our local assignment
+  LOG(INFO) << "Broadcasting new PhysicalPlan:";
+  config::TopologyConfigHelper::LogTopology(_pplan.topology());
+
   ComputeLocalSpouts(_pplan);
   proto::stmgr::NewInstanceAssignmentMessage new_assignment;
   new_assignment.mutable_pplan()->CopyFrom(_pplan);
@@ -439,17 +444,18 @@ void InstanceServer::BroadcastNewPhysicalPlan(const proto::system::PhysicalPlan&
 void InstanceServer::SetRateLimit(const proto::system::PhysicalPlan& _pplan,
                                   const std::string& _component,
                                   Connection* _conn) const {
-  sp_int64 read_bsp =
+  sp_int64 read_bps =
       config::TopologyConfigHelper::GetComponentOutputBPS(_pplan.topology(), _component);
   sp_int32 parallelism =
       config::TopologyConfigHelper::GetComponentParallelism(_pplan.topology(), _component);
-  sp_int64 burst_read_bsp = read_bsp + read_bsp / 2;
-
-  // There should be parallelism hint and the per instance rate limit should be at least
-  // one byte per second
-  if (parallelism > 0 && read_bsp > parallelism && burst_read_bsp > parallelism) {
-    LOG(INFO) << "Set rate limit in " << _component << " to " << read_bsp << "/" << burst_read_bsp;
-    _conn->setRateLimit(read_bsp / parallelism, burst_read_bsp / parallelism);
+  // burst rate is 1.5 x of regular rate
+  sp_int64 burst_read_bps = read_bps + read_bps / 2;
+
+  LOG(INFO) << "Parallelism of component " << _component << " is " << parallelism;
+  LOG(INFO) << "Read BPS of component " << _component << " is " << read_bps;
+  if (parallelism > 0 && read_bps >= 0 && burst_read_bps >= 0) {
+    LOG(INFO) << "Set rate limit in " << _component << " to " << read_bps << "/" << burst_read_bps;
+    _conn->setRateLimit(read_bps / parallelism, burst_read_bps / parallelism);
   } else {
     LOG(INFO) << "Disable rate limit in " << _component;
     _conn->disableRateLimit();
diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp
index 3fd805b..738fa03 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr.cpp
@@ -538,6 +538,7 @@ void StMgr::StartTMasterClient() {
 
 void StMgr::NewPhysicalPlan(proto::system::PhysicalPlan* _pplan) {
   LOG(INFO) << "Received a new physical plan from tmaster";
+  heron::config::TopologyConfigHelper::LogTopology(_pplan->topology());
   // first make sure that we are part of the plan ;)
   bool found = false;
   for (sp_int32 i = 0; i < _pplan->stmgrs_size(); ++i) {
@@ -561,10 +562,10 @@ void StMgr::NewPhysicalPlan(proto::system::PhysicalPlan* _pplan) {
     LOG(INFO) << "Topology state changed from " << pplan_->topology().state() << " to "
               << _pplan->topology().state();
   }
-  proto::api::TopologyState st = _pplan->topology().state();
-  _pplan->clear_topology();
-  _pplan->mutable_topology()->CopyFrom(*hydrated_topology_);
-  _pplan->mutable_topology()->set_state(st);
+
+  PatchPhysicalPlanWithHydratedTopology(_pplan, hydrated_topology_);
+  LOG(INFO) << "Patched with hydrated topology";
+  heron::config::TopologyConfigHelper::LogTopology(_pplan->topology());
 
   // TODO(vikasr) Currently we dont check if our role has changed
 
@@ -1115,5 +1116,38 @@ void StMgr::HandleStatefulRestoreDone(proto::system::StatusCode _status,
                                       std::string _checkpoint_id, sp_int64 _restore_txid) {
   tmaster_client_->SendRestoreTopologyStateResponse(_status, _checkpoint_id, _restore_txid);
 }
+
+// Patch new physical plan with internal hydrated topology but keep new topology data:
+// - new topology state
+// - new topology/component config
+void StMgr::PatchPhysicalPlanWithHydratedTopology(proto::system::PhysicalPlan* _pplan,
+                                                  proto::api::Topology* _topology) {
+  // Back up new topology data (state and configs)
+  proto::api::TopologyState st = _pplan->topology().state();
+
+  std::map<std::string, std::string> topology_config;
+  config::TopologyConfigHelper::GetTopologyConfig(_pplan->topology(), topology_config);
+
+  std::unordered_set<std::string> components;
+  std::map<std::string, std::map<std::string, std::string>> component_config;
+  config::TopologyConfigHelper::GetAllComponentNames(_pplan->topology(), components);
+  for (auto iter = components.begin(); iter != components.end(); ++iter) {
+    std::map<std::string, std::string> config;
+    config::TopologyConfigHelper::GetComponentConfig(_pplan->topology(), *iter, topology_config);
+    component_config[*iter] = config;
+  }
+
+  // Copy hydrated topology into pplan
+  _pplan->clear_topology();
+  _pplan->mutable_topology()->CopyFrom(*_topology);
+
+  // Restore new topology data
+  _pplan->mutable_topology()->set_state(st);
+  config::TopologyConfigHelper::SetTopologyConfig(_pplan->mutable_topology(), topology_config);
+  for (auto iter = components.begin(); iter != components.end(); ++iter) {
+    config::TopologyConfigHelper::SetComponentConfig(_pplan->mutable_topology(), *iter,
+        component_config[*iter]);
+  }
+}
 }  // namespace stmgr
 }  // namespace heron
diff --git a/heron/stmgr/src/cpp/manager/stmgr.h b/heron/stmgr/src/cpp/manager/stmgr.h
index cd0d0da..48e30be 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr.h
@@ -187,6 +187,12 @@ class StMgr {
   void HandleStatefulRestoreDone(proto::system::StatusCode _status,
                                  std::string _checkpoint_id, sp_int64 _restore_txid);
 
+  // Patch new physical plan with internal hydrated topology but keep new topology data:
+  // - new topology state
+  // - new topology/component config
+  static void PatchPhysicalPlanWithHydratedTopology(proto::system::PhysicalPlan* _pplan,
+                                                    proto::api::Topology* _topology);
+
   heron::common::HeronStateMgr* state_mgr_;
   proto::system::PhysicalPlan* pplan_;
   sp_string topology_name_;
diff --git a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
index 8670304..41363d9 100644
--- a/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
+++ b/heron/stmgr/tests/cpp/server/stmgr_unittest.cpp
@@ -1867,6 +1867,62 @@ TEST(StMgr, test_metricsmgr_reconnect) {
   TearCommonResources(common);
 }
 
+// Test PatchPhysicalPlanWithHydratedTopology function
+TEST(StMgr, test_PatchPhysicalPlanWithHydratedTopology) {
+  int32_t nSpouts = 2;
+  int32_t nSpoutInstances = 1;
+  int32_t nBolts = 3;
+  int32_t nBoltInstances = 1;
+  heron::proto::api::Topology* topology =
+      GenerateDummyTopology("topology_name",
+                            "topology_id",
+                            nSpouts, nSpoutInstances, nBolts, nBoltInstances,
+                            heron::proto::api::SHUFFLE);
+
+  heron::proto::system::PhysicalPlan* pplan = new heron::proto::system::PhysicalPlan();
+  pplan->mutable_topology()->CopyFrom(*topology);
+
+  // Verify initial values
+  EXPECT_EQ(
+    heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+        *topology,
+        heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS,
+        ""),
+    "30");
+  EXPECT_EQ(
+    heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+        pplan->topology(),
+        heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS,
+        ""),
+    "30");
+  // Change runtime data in PhysicalPlan and patch it
+  std::map<std::string, std::string> update;
+  update["conf.new"] = "test";
+  update[heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS] = "10";
+  heron::config::TopologyConfigHelper::SetTopologyConfig(pplan->mutable_topology(), update);
+
+  // Verify updated runtime data is still in the patched physical plan
+  // The topology in the physical plan should have the old name
+  EXPECT_EQ(
+    heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+        *topology,
+        heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS,
+        ""),
+    "30");  // The internal topology object should still have the initial value
+  EXPECT_EQ(
+    heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+        pplan->topology(),
+        heron::config::TopologyConfigVars::TOPOLOGY_MESSAGE_TIMEOUT_SECS,
+        ""),
+    "10");  // The topology object in the physical plan should have the new value
+  EXPECT_EQ(
+    heron::config::TopologyConfigHelper::GetTopologyConfigValue(
+        pplan->topology(), "conf.new", ""),
+    "test");  // The topology object in the physical plan should have the new config
+
+  delete pplan;
+}
+
 int main(int argc, char** argv) {
   heron::common::Initialize(argv[0]);
   std::cout << "Current working directory (to find stmgr logs) "
diff --git a/heron/tmaster/src/cpp/manager/tcontroller.cpp b/heron/tmaster/src/cpp/manager/tcontroller.cpp
index a13784e..d5f933c 100644
--- a/heron/tmaster/src/cpp/manager/tcontroller.cpp
+++ b/heron/tmaster/src/cpp/manager/tcontroller.cpp
@@ -23,6 +23,7 @@
 #include <vector>
 #include "basics/basics.h"
 #include "basics/strutils.h"
+#include "config/topology-config-helper.h"
 #include "errors/errors.h"
 #include "manager/tmaster.h"
 #include "network/network.h"
@@ -298,7 +299,8 @@ bool TController::ParseRuntimeConfig(const std::vector<std::string>& paramters,
     std::vector<std::string> segments = StrUtils::split(*iter, ":");
     if (segments.size() == 2) {
       // Topology level config
-      retval[TOPOLOGY_CONFIG_KEY][segments[0]] = segments[1];
+      const char* topology_key = config::TopologyConfigHelper::GetReservedTopologyConfigKey();
+      retval[topology_key][segments[0]] = segments[1];
     } else if (segments.size() == 3) {
       // Component level config
       retval[segments[0]][segments[1]] = segments[2];
diff --git a/heron/tmaster/src/cpp/manager/tmaster.cpp b/heron/tmaster/src/cpp/manager/tmaster.cpp
index ad2affe..c27ccfe 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.cpp
+++ b/heron/tmaster/src/cpp/manager/tmaster.cpp
@@ -591,6 +591,9 @@ bool TMaster::UpdateRuntimeConfig(const ComponentConfigMap& _config,
                                   VCallback<proto::system::StatusCode> cb) {
   DCHECK(current_pplan_->topology().IsInitialized());
 
+  LOG(INFO) << "Update runtime config: ";
+  LogConfig(_config);
+
   // Parse and set the new configs
   proto::system::PhysicalPlan* new_pplan = new proto::system::PhysicalPlan();
   new_pplan->CopyFrom(*current_pplan_);
@@ -631,11 +634,12 @@ bool TMaster::UpdateRuntimeConfigInTopology(proto::api::Topology* _topology,
   DCHECK(_topology->IsInitialized());
 
   ComponentConfigMap::const_iterator iter;
+  const char* topology_key = config::TopologyConfigHelper::GetReservedTopologyConfigKey();
   for (iter = _config.begin(); iter != _config.end(); ++iter) {
     // Get config for topology or component.
     std::map<std::string, std::string> runtime_config;
-    AppendPostfix(iter->second, RUNTIME_CONFIG_POSTFIX, runtime_config);
-    if (iter->first == TOPOLOGY_CONFIG_KEY) {
+    config::TopologyConfigHelper::ConvertToRuntimeConfigs(iter->second, runtime_config);
+    if (iter->first == topology_key) {
       config::TopologyConfigHelper::SetTopologyConfig(_topology, runtime_config);
     } else {
       config::TopologyConfigHelper::SetComponentConfig(_topology, iter->first, runtime_config);
@@ -1022,8 +1026,9 @@ bool TMaster::ValidateRuntimeConfigNames(const ComponentConfigMap& _config) cons
   config::TopologyConfigHelper::GetAllComponentNames(topology, components);
 
   ComponentConfigMap::const_iterator iter;
+  const char* topology_key = config::TopologyConfigHelper::GetReservedTopologyConfigKey();
   for (iter = _config.begin(); iter != _config.end(); ++iter) {
-    if (iter->first != TOPOLOGY_CONFIG_KEY) {
+    if (iter->first != topology_key) {
       // It is a component, search for it
       if (components.find(iter->first) == components.end()) {
         return false;
@@ -1034,12 +1039,12 @@ bool TMaster::ValidateRuntimeConfigNames(const ComponentConfigMap& _config) cons
   return true;
 }
 
-void TMaster::AppendPostfix(const ConfigValueMap& _origin,
-                            const std::string& post_fix,
-                            ConfigValueMap& _retval) {
-  ConfigValueMap::const_iterator it;
-  for (it = _origin.begin(); it != _origin.end(); ++it) {
-    _retval[it->first + post_fix] = it->second;
+void TMaster::LogConfig(const ComponentConfigMap& _config) {
+  for (auto iter = _config.begin(); iter != _config.end(); ++iter) {
+    LOG(INFO) << iter->first << " =>";
+    for (auto i = iter->second.begin(); i != iter->second.end(); ++i) {
+      LOG(INFO) << i->first << " : " << i->second;
+    }
   }
 }
 
diff --git a/heron/tmaster/src/cpp/manager/tmaster.h b/heron/tmaster/src/cpp/manager/tmaster.h
index ca2b0d4..defe8fd 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.h
+++ b/heron/tmaster/src/cpp/manager/tmaster.h
@@ -47,9 +47,6 @@ typedef std::map<std::string, std::string> ConfigValueMap;
 // From component name to config/value pairs
 typedef std::map<std::string, std::map<std::string, std::string>> ComponentConfigMap;
 
-const sp_string TOPOLOGY_CONFIG_KEY = "_topology_";
-const sp_string RUNTIME_CONFIG_POSTFIX = ":runtime";
-
 class TMaster {
  public:
   TMaster(const std::string& _zk_hostport, const std::string& _topology_name,
@@ -124,6 +121,9 @@ class TMaster {
   // Function to be called that calls MakePhysicalPlan and sends it to all stmgrs
   void DoPhysicalPlan(EventLoop::Status _code);
 
+  // Log config object
+  void LogConfig(const ComponentConfigMap& _config);
+
   // Big brother function that does the assignment to the workers
   // If _new_stmgr is null, this means that there was a plan
   // existing, but a _new_stmgr joined us. So redo his part
@@ -192,10 +192,6 @@ class TMaster {
                      sp_int32 port,
                      const std::string& stmgr_id);
 
-  void AppendPostfix(const ConfigValueMap& _origin,
-                     const std::string& post_fix,
-                     ConfigValueMap& _update);
-
   // map of active stmgr id to stmgr state
   StMgrMap stmgrs_;
 
diff --git a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
index 6db68df..0f37dc8 100644
--- a/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
+++ b/heron/tmaster/tests/cpp/server/tmaster_unittest.cpp
@@ -735,7 +735,8 @@ TEST(StMgr, test_runtime_config) {
   std::map<std::string, std::string> validate_good_config;
   validate_good_config[topology_runtime_config_1] = "1";
   validate_good_config[topology_runtime_config_2] = "2";
-  validate_good_config_map[heron::tmaster::TOPOLOGY_CONFIG_KEY] = validate_good_config;
+  const char* topology_key = heron::config::TopologyConfigHelper::GetReservedTopologyConfigKey();
+  validate_good_config_map[topology_key] = validate_good_config;
   validate_good_config_map["spout1"] = validate_good_config;
   EXPECT_EQ(common.tmaster_->ValidateRuntimeConfig(validate_good_config_map), true);
 

-- 
To stop receiving notification emails like this one, please contact
huijun@apache.org.