You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/07/04 08:29:36 UTC

ambari git commit: AMBARI-21394. Create a topic to send hostLevelParams (aonishuk)

Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf 51b1a14d7 -> 917898cdb


AMBARI-21394. Create a topic to send hostLevelParams (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/917898cd
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/917898cd
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/917898cd

Branch: refs/heads/branch-3.0-perf
Commit: 917898cdbed3bd37e1bbfa813b115f93dfb6dcaf
Parents: 51b1a14
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Tue Jul 4 11:29:31 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Tue Jul 4 11:29:31 2017 +0300

----------------------------------------------------------------------
 .../ambari_agent/ClusterHostLevelParamsCache.py | 45 +++++++++++++++
 .../src/main/python/ambari_agent/Constants.py   |  4 +-
 .../ambari_agent/CustomServiceOrchestrator.py   |  4 +-
 .../main/python/ambari_agent/HeartbeatThread.py | 14 +++--
 .../python/ambari_agent/InitializerModule.py    |  2 +
 .../main/python/ambari_agent/RecoveryManager.py | 16 ++---
 .../listeners/HostLevelParamsEventListener.py   | 61 ++++++++++++++++++++
 .../listeners/MetadataEventListener.py          | 11 +---
 .../ambari_agent/TestAgentStompResponses.py     | 20 +++++--
 .../dummy_files/stomp/host_level_params.json    | 37 ++++++++++++
 .../stomp/metadata_after_registration.json      |  9 +--
 .../dummy_files/stomp/topology_add_host.json    |  5 +-
 .../stomp/topology_cache_expected.json          |  6 --
 .../dummy_files/stomp/topology_create.json      | 15 +----
 14 files changed, 187 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py
new file mode 100644
index 0000000..3e490c5
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from ambari_agent.ClusterCache import ClusterCache
+import logging
+
+logger = logging.getLogger(__name__)
+
+class ClusterHostLevelParamsCache(ClusterCache):
+  """
+  Maintains an in-memory cache and disk cache of the host level params send from server for
+  every cluster. This is useful for having quick access to any of the
+  topology properties.
+
+  Host level params. Is parameters used by execution and status commands which can be generated
+  differently for every host.
+  """
+
+  def __init__(self, cluster_cache_dir):
+    """
+    Initializes the host level params cache.
+    :param cluster_cache_dir:
+    :return:
+    """
+    super(ClusterHostLevelParamsCache, self).__init__(cluster_cache_dir)
+
+  def get_cache_name(self):
+    return 'host_level_params'

http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 02945ee..17ed2be 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -21,16 +21,18 @@ limitations under the License.
 
 COMMANDS_TOPIC = '/user/commands'
 CONFIGURATIONS_TOPIC = '/user/configs'
+HOST_LEVEL_PARAMS_TOPIC = '/user/host_level_params'
 METADATA_TOPIC = '/events/metadata'
 TOPOLOGIES_TOPIC = '/events/topologies'
 SERVER_RESPONSES_TOPIC = '/user/'
 
 PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC]
-POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC]
+POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC, HOST_LEVEL_PARAMS_TOPIC]
 
 TOPOLOGY_REQUEST_ENDPOINT = '/agents/topologies'
 METADATA_REQUEST_ENDPOINT = '/agents/metadata'
 CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs'
+HOST_LEVEL_PARAMS_TOPIC_ENPOINT = '/agents/host_level_params'
 COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status'
 COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status'
 HOST_STATUS_REPORTS_ENDPOINT = '/reports/host_status'

http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 6d1a491..c0b20ed 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -81,6 +81,7 @@ class CustomServiceOrchestrator():
     self.metadata_cache = initializer_module.metadata_cache
     self.topology_cache = initializer_module.topology_cache
     self.configurations_cache = initializer_module.configurations_cache
+    self.host_level_params_cache = initializer_module.host_level_params_cache
     self.config = initializer_module.config
     self.tmp_dir = self.config.get('agent', 'prefix')
     self.force_https_protocol = self.config.get_force_https_protocol()
@@ -459,13 +460,14 @@ class CustomServiceOrchestrator():
 
     metadata_cache = self.metadata_cache[cluster_id]
     configurations_cache = self.configurations_cache[cluster_id]
+    host_level_params_cache = self.host_level_params_cache[cluster_id]
 
     component_dict = self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name)
 
     command_dict = {
       'clusterLevelParams': metadata_cache.clusterLevelParams,
       'serviceLevelParams': metadata_cache.serviceLevelParams[service_name],
-      'hostLevelParams': self.topology_cache.get_current_host_info(cluster_id).hostLevelParams,
+      'hostLevelParams': host_level_params_cache,
       'componentLevelParams': component_dict.componentLevelParams,
       'script_type': self.SCRIPT_TYPE_PYTHON
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index dbf4006..6ba84e6 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -31,6 +31,7 @@ from ambari_agent.listeners.TopologyEventListener import TopologyEventListener
 from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEventListener
 from ambari_agent.listeners.MetadataEventListener import MetadataEventListener
 from ambari_agent.listeners.CommandsEventListener import CommandsEventListener
+from ambari_agent.listeners.HostLevelParamsEventListener import HostLevelParamsEventListener
 
 HEARTBEAT_INTERVAL = 10
 REQUEST_RESPONSE_TIMEOUT = 10
@@ -49,20 +50,21 @@ class HeartbeatThread(threading.Thread):
     self.registration_builder = Register(initializer_module.config)
 
     self.initializer_module = initializer_module
-    self.caches = [initializer_module.metadata_cache, initializer_module.topology_cache, initializer_module.configurations_cache]
 
     # listeners
     self.server_responses_listener = ServerResponsesListener()
     self.commands_events_listener = CommandsEventListener(initializer_module.action_queue)
-    self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache, initializer_module.recovery_manager)
+    self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache)
     self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache)
     self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache)
-    self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener]
+    self.host_level_params_events_listener = HostLevelParamsEventListener(initializer_module.host_level_params_cache, initializer_module.recovery_manager)
+    self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener, self.host_level_params_events_listener]
 
     self.post_registration_requests = [
     (Constants.TOPOLOGY_REQUEST_ENDPOINT, initializer_module.topology_cache, self.topology_events_listener),
     (Constants.METADATA_REQUEST_ENDPOINT, initializer_module.metadata_cache, self.metadata_events_listener),
-    (Constants.CONFIGURATIONS_REQUEST_ENDPOINT, initializer_module.configurations_cache, self.configuration_events_listener)
+    (Constants.CONFIGURATIONS_REQUEST_ENDPOINT, initializer_module.configurations_cache, self.configuration_events_listener),
+    (Constants.HOST_LEVEL_PARAMS_TOPIC_ENPOINT, initializer_module.host_level_params_cache, self.host_level_params_events_listener)
     ]
     self.responseId = 0
 
@@ -152,6 +154,10 @@ class HeartbeatThread(threading.Thread):
     else:
       self.responseId = serverId
 
+    if 'restartAgent' in response and response['restartAgent'].lower() == "true":
+      logger.warn("Restarting the agent by the request from server")
+      Utils.restartAgent()
+
   def get_heartbeat_body(self):
     """
     Heartbeat body to be send to server

http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index 8de1fa5..dbc26e8 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -26,6 +26,7 @@ from ambari_agent.AmbariConfig import AmbariConfig
 from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache
 from ambari_agent.ClusterTopologyCache import ClusterTopologyCache
 from ambari_agent.ClusterMetadataCache import ClusterMetadataCache
+from ambari_agent.ClusterHostLevelParamsCache import ClusterHostLevelParamsCache
 from ambari_agent.Utils import lazy_property
 from ambari_agent.security import AmbariStompConnection
 from ambari_agent.ActionQueue import ActionQueue
@@ -73,6 +74,7 @@ class InitializerModule:
     self.metadata_cache = ClusterMetadataCache(self.cluster_cache_dir)
     self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir, self.config)
     self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir)
+    self.host_level_params_cache = ClusterHostLevelParamsCache(self.cluster_cache_dir)
     self.customServiceOrchestrator = CustomServiceOrchestrator(self)
 
     self.recovery_manager = RecoveryManager(self.recovery_cache_dir)

http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index 68dd0be..19f163e 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -97,7 +97,6 @@ class RecoveryManager:
     self.__cache_lock = threading.RLock()
     self.active_command_count = 0
     self.paused = False
-    self.recovery_timestamp = -1
     self.cluster_id = None
 
     if not os.path.exists(cache_dir):
@@ -110,7 +109,7 @@ class RecoveryManager:
 
     self.actions = {}
 
-    self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start, "", -1)
+    self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start, "")
 
     pass
 
@@ -560,8 +559,7 @@ class RecoveryManager:
       "maxCount" : 10,
       "windowInMinutes" : 60,
       "retryGap" : 0,
-      "components" : "a,b",
-      "recoveryTimestamp" : 1458150424380
+      "components" : "a,b"
       }
     """
 
@@ -573,7 +571,6 @@ class RecoveryManager:
     retry_gap = 5
     max_lifetime_count = 12
     enabled_components = ""
-    recovery_timestamp = -1 # Default value if recoveryTimestamp is not available.
 
 
     if dictionary and "recoveryConfig" in dictionary:
@@ -599,11 +596,8 @@ class RecoveryManager:
       if 'components' in config:
         enabled_components = config['components']
 
-      if 'recoveryTimestamp' in config:
-        recovery_timestamp = config['recoveryTimestamp']
-
     self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only,
-                       auto_install_start, enabled_components, recovery_timestamp)
+                       auto_install_start, enabled_components)
     pass
 
   """
@@ -617,10 +611,9 @@ class RecoveryManager:
   auto_start_only - True if AUTO_START recovery type was specified. False otherwise.
   auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise.
   enabled_components - CSV of componenents enabled for auto start.
-  recovery_timestamp - Timestamp when the recovery values were last updated. -1 on start up.
   """
   def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled,
-                    auto_start_only, auto_install_start, enabled_components, recovery_timestamp):
+                    auto_start_only, auto_install_start, enabled_components):
     """
     Update recovery configuration, recovery is disabled if configuration values
     are not correct
@@ -653,7 +646,6 @@ class RecoveryManager:
     self.auto_install_start = auto_install_start
     self.max_lifetime_count = max_lifetime_count
     self.enabled_components = []
-    self.recovery_timestamp = recovery_timestamp
 
     self.allowed_desired_states = [self.STARTED, self.INSTALLED]
     self.allowed_current_states = [self.INIT, self.INSTALL_FAILED, self.INSTALLED, self.STARTED]

http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
new file mode 100644
index 0000000..5aee634
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import ambari_stomp
+
+from ambari_agent.listeners import EventListener
+from ambari_agent import Constants
+
+logger = logging.getLogger(__name__)
+
+class HostLevelParamsEventListener(EventListener):
+  """
+  Listener of Constants.HOST_LEVEL_PARAMS_TOPIC events from server.
+  """
+  def __init__(self, host_level_params_cache, recovery_manager):
+    self.host_level_params_cache = host_level_params_cache
+    self.recovery_manager = recovery_manager
+
+  def on_event(self, headers, message):
+    """
+    Is triggered when an event to Constants.CONFIGURATIONS_TOPIC topic is received from server.
+
+    @param headers: headers dictionary
+    @param message: message payload dictionary
+    """
+    # this kind of response is received if hash was identical. And server does not need to change anything
+    if message == {}:
+      return
+
+    self.host_level_params_cache.rewrite_cache(message['clusters'])
+    self.host_level_params_cache.hash = message['hash']
+
+    if message['clusters']:
+      # FIXME: Recovery manager does not support multiple cluster as of now.
+      cluster_id = message['clusters'].keys()[0]
+
+      if 'recoveryConfig' in message['clusters'][cluster_id]:
+        logging.info("Updating recoveryConfig from metadata")
+        self.recovery_manager.update_recovery_config(self.host_level_params_cache[cluster_id])
+        self.recovery_manager.cluster_id = cluster_id
+
+  def get_handled_path(self):
+    return Constants.HOST_LEVEL_PARAMS_TOPIC
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
index 364d8af..5802ffe 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
@@ -32,9 +32,8 @@ class MetadataEventListener(EventListener):
   """
   Listener of Constants.METADATA_TOPIC events from server.
   """
-  def __init__(self, metadata_cache, recovery_manager):
+  def __init__(self, metadata_cache):
     self.metadata_cache = metadata_cache
-    self.recovery_manager = recovery_manager
 
   def on_event(self, headers, message):
     """
@@ -50,13 +49,5 @@ class MetadataEventListener(EventListener):
     self.metadata_cache.rewrite_cache(message['clusters'])
     self.metadata_cache.hash = message['hash']
 
-    # FIXME: Recovery manager does not support multiple cluster as of now.
-    cluster_id = message['clusters'].keys()[0]
-
-    if 'recoveryConfig' in message['clusters'][cluster_id]:
-      logging.info("Updating recoveryConfig from metadata")
-      self.recovery_manager.update_recovery_config(self.metadata_cache[cluster_id])
-      self.recovery_manager.cluster_id = cluster_id
-
   def get_handled_path(self):
     return Constants.METADATA_TOPIC
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index c41f87e..feaf7dd 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -40,7 +40,7 @@ from mock.mock import MagicMock, patch
 @patch("ambari_agent.hostname.hostname", new=MagicMock(return_value="c6401.ambari.apache.org"))
 class TestAgentStompResponses(BaseStompServerTestCase):
   def setUp(self):
-    self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json'])
+    self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json', '/tmp/host_level_params.json'])
 
     if not os.path.exists("/tmp/ambari-agent"):
       os.mkdir("/tmp/ambari-agent")
@@ -83,9 +83,13 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body=self.get_json("configurations_update.json"))
     self.server.topic_manager.send(f)
 
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=self.get_json("host_level_params.json"))
+    self.server.topic_manager.send(f)
+
     initial_topology_request = self.server.frames_queue.get()
     initial_metadata_request = self.server.frames_queue.get()
     initial_configs_request = self.server.frames_queue.get()
+    initial_host_level_params_request = self.server.frames_queue.get()
 
     while not initializer_module.is_registered:
       time.sleep(0.1)
@@ -103,6 +107,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     configurations_subscribe_frame = self.server.frames_queue.get()
     metadata_subscribe_frame = self.server.frames_queue.get()
     topologies_subscribe_frame = self.server.frames_queue.get()
+    host_level_params_subscribe_frame = self.server.frames_queue.get()
     heartbeat_frame = self.server.frames_queue.get()
     dn_install_in_progress_frame = json.loads(self.server.frames_queue.get().body)
     dn_install_failed_frame = json.loads(self.server.frames_queue.get().body)
@@ -116,7 +121,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
 
     initializer_module.stop_event.set()
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'}))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '5'}, body=json.dumps({'id':'1'}))
     self.server.topic_manager.send(f)
 
     command_status_reporter.join()
@@ -179,6 +184,9 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}')
     self.server.topic_manager.send(f)
 
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body='{}')
+    self.server.topic_manager.send(f)
+
     commands_subscribe_frame = self.server.frames_queue.get()
     configurations_subscribe_frame = self.server.frames_queue.get()
     metadata_subscribe_frame = self.server.frames_queue.get()
@@ -188,7 +196,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
 
     initializer_module.stop_event.set()
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'}))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '5'}, body=json.dumps({'id':'1'}))
     self.server.topic_manager.send(f)
 
     heartbeat_thread.join()
@@ -222,9 +230,13 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}')
     self.server.topic_manager.send(f)
 
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body='{}')
+    self.server.topic_manager.send(f)
+
     initial_topology_request = self.server.frames_queue.get()
     initial_metadata_request = self.server.frames_queue.get()
     initial_configs_request = self.server.frames_queue.get()
+    initial_host_level_params_request = self.server.frames_queue.get()
 
     while not initializer_module.is_registered:
       time.sleep(0.1)
@@ -262,7 +274,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
 
     initializer_module.stop_event.set()
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'}))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '5'}, body=json.dumps({'id':'1'}))
     self.server.topic_manager.send(f)
 
     heartbeat_thread.join()

http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/host_level_params.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/host_level_params.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/host_level_params.json
new file mode 100644
index 0000000..7447e93
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/host_level_params.json
@@ -0,0 +1,37 @@
+{
+  "hash":"aa4cd03688d36f18fc2cbba06614bafe",
+  "clusters":{
+    "0":{
+      "repoInfo": [
+        {
+          "defaultBaseUrl": "http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3",
+          "ambariManagedRepositories": true,
+          "baseUrl": "http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/",
+          "latestBaseUrl": "http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129",
+          "repoSaved": true,
+          "repoName": "HDP",
+          "osType": "redhat6",
+          "unique": true,
+          "repoId": "HDP-2.6"
+        },
+        {
+          "defaultBaseUrl": "http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6",
+          "ambariManagedRepositories": true,
+          "baseUrl": "http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6",
+          "latestBaseUrl": "http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6",
+          "repoSaved": true,
+          "repoName": "HDP-UTILS",
+          "osType": "redhat6",
+          "unique": false,
+          "repoId": "HDP-UTILS-1.1.0.21"
+        }
+      ],
+      "recoveryConfig": {
+        "type" : "AUTO_INSTALL_START",
+        "maxCount" : 10,
+        "windowInMinutes" : 60,
+        "components" : "NAMENODE,DATANODE"
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
index 6462ccf..c1aba28 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
@@ -38,14 +38,7 @@
       },
       "status_commands_to_run": [
         "STATUS"
-      ],
-      "recoveryConfig": {
-        "type" : "AUTO_INSTALL_START",
-        "maxCount" : 10,
-        "windowInMinutes" : 60,
-        "components" : "NAMENODE,DATANODE",
-        "recoveryTimestamp" : 1458150424380
-      }
+      ]
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
index 2458f08..a9407c3 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
@@ -8,10 +8,7 @@
           "hostId":2,
           "hostName":"c6403.ambari.apache.org",
           "rackName":"/default-rack",
-          "ipv4":"192.168.64.103",
-          "hostLevelParams": {
-            "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
-          }
+          "ipv4":"192.168.64.103"
         }
       ]
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
index 53d0e0d..ff2b3fd 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
@@ -46,18 +46,12 @@
     "hosts": [
       {
         "hostId": 1,
-        "hostLevelParams": {
-          "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
-        },
         "hostName": "c6402.ambari.apache.org",
         "ipv4": "192.168.64.102",
         "rackName": "/default-rack"
       },
       {
         "hostId": 2,
-        "hostLevelParams": {
-          "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
-        },
         "hostName": "c6403.ambari.apache.org",
         "ipv4": "192.168.64.103",
         "rackName": "/default-rack"

http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
index dfe17b9..6df4bc3 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
@@ -50,19 +50,13 @@
           "hostId":0,
           "hostName":"c6401.ambari.apache.org",
           "rackName":"/default-rack",
-          "ipv4":"192.168.64.101",
-          "hostLevelParams": {
-            "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
-          }
+          "ipv4":"192.168.64.101"
         },
         {
           "hostId":1,
           "hostName":"c6402.ambari.apache.org",
           "rackName":"/default-rack",
-          "ipv4":"192.168.64.102",
-          "hostLevelParams": {
-            "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
-          }
+          "ipv4":"192.168.64.102"
         }
       ]
     },
@@ -87,10 +81,7 @@
           "hostId":0,
           "hostName":"c6401.ambari.apache.org",
           "rackName":"/default-rack",
-          "ipv4":"192.168.64.101",
-          "hostLevelParams": {
-            "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]"
-          }
+          "ipv4":"192.168.64.101"
         }
       ]
     }