You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/06/01 12:05:27 UTC

ambari git commit: AMBARI-21165. Register with server and changes to events format and handle graceful stop or threads (aonishuk)

Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf 6303e9358 -> 5d0962421


AMBARI-21165. Register with server and changes to events format and handle graceful stop or threads (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5d096242
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5d096242
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5d096242

Branch: refs/heads/branch-3.0-perf
Commit: 5d0962421397d64b5e7a383eb23fb6ae22028b0b
Parents: 6303e93
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Thu Jun 1 15:05:03 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Thu Jun 1 15:05:03 2017 +0300

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py |   9 +-
 .../python/ambari_agent/ClusterTopologyCache.py |  36 +++-
 .../python/ambari_agent/CommandStatusDict.py    |  12 +-
 .../ambari_agent/CommandStatusReporter.py       |  14 +-
 .../ambari_agent/ComponentStatusExecutor.py     |   8 +-
 .../src/main/python/ambari_agent/Constants.py   |   4 +-
 .../python/ambari_agent/HeartbeatHandlers.py    |  12 +-
 .../main/python/ambari_agent/HeartbeatThread.py |  71 +++++--
 .../src/main/python/ambari_agent/NetUtil.py     |  23 +--
 .../src/main/python/ambari_agent/Register.py    |  20 +-
 .../src/main/python/ambari_agent/Utils.py       |  28 ++-
 .../listeners/CommandsEventListener.py          |   7 +-
 .../python/ambari_agent/listeners/__init__.py   |   4 +-
 .../src/main/python/ambari_agent/main.py        |  30 +--
 .../ambari_agent/BaseStompServerTestCase.py     |  14 +-
 .../ambari_agent/TestAgentStompResponses.py     |  62 ++++---
 .../dummy_files/stomp/execution_commands.json   | 186 ++++++++-----------
 .../stomp/registration_response.json            |   2 +-
 18 files changed, 293 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index e9a3045..dbd9f4c 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -364,9 +364,10 @@ class ActionQueue(threading.Thread):
       roleResult['stderr'] = 'None'
 
     # let ambari know name of custom command
+    """
     if command['hostLevelParams'].has_key('custom_command'):
       roleResult['customCommand'] = command['hostLevelParams']['custom_command']
-
+    """
     if 'structuredOut' in commandresult:
       roleResult['structuredOut'] = str(json.dumps(commandresult['structuredOut']))
     else:
@@ -566,12 +567,6 @@ class ActionQueue(threading.Thread):
       logger.warn(err)
     pass
 
-
-  # Store action result to agent response queue
-  def result(self):
-    return self.commandStatuses.generate_report()
-
-
   def status_update_callback(self):
     """
     Actions that are executed every time when command status changes

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
index 1102cd1..5810e67 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
@@ -42,20 +42,35 @@ class ClusterTopologyCache(ClusterCache):
     return 'topology'
 
   @staticmethod
-  def find_host_by_id(host_dicts, cluster_id, host_id):
+  def find_host_by_id(host_dicts, host_id):
+    """
+    Find host by id in list of host dictionaries.
+    """
     for host_dict in host_dicts:
       if host_dict['hostId'] == host_id:
         return host_dict
     return None
 
   @staticmethod
-  def find_component(component_dicts, cluster_id, service_name, component_name):
+  def find_component(component_dicts, service_name, component_name):
+    """
+    Find component by service_name and component_name in list of component dictionaries.
+    """
     for component_dict in component_dicts:
       if component_dict['serviceName'] == service_name and component_dict['componentName'] == component_name:
         return component_dict
     return None
 
   def cache_update(self, cache_update):
+    """
+    Handle event of update of topology.
+
+    Possible scenarios are:
+    - add new host
+    - update existing host information by hostId (e.g. rack name)
+    - add new component
+    - update component information by service_name and component_name
+    """
     mutable_dict = self._get_mutable_copy()
 
     for cluster_id, cluster_updates_dict in cache_update.iteritems():
@@ -66,7 +81,7 @@ class ClusterTopologyCache(ClusterCache):
       if 'hosts' in cluster_updates_dict:
         hosts_mutable_list = mutable_dict[cluster_id]['hosts']
         for host_updates_dict in cluster_updates_dict['hosts']:
-          host_mutable_dict = ClusterTopologyCache.find_host_by_id(hosts_mutable_list, cluster_id, host_updates_dict['hostId'])
+          host_mutable_dict = ClusterTopologyCache.find_host_by_id(hosts_mutable_list, host_updates_dict['hostId'])
           if host_mutable_dict is not None:
             host_mutable_dict.update(host_updates_dict)
           else:
@@ -75,7 +90,7 @@ class ClusterTopologyCache(ClusterCache):
       if 'components' in cluster_updates_dict:
         components_mutable_list = mutable_dict[cluster_id]['components']
         for component_updates_dict in cluster_updates_dict['components']:
-          component_mutable_dict = ClusterTopologyCache.find_component(components_mutable_list, cluster_id, component_updates_dict['serviceName'], component_updates_dict['componentName'])
+          component_mutable_dict = ClusterTopologyCache.find_component(components_mutable_list, component_updates_dict['serviceName'], component_updates_dict['componentName'])
           if component_mutable_dict is not None:
             component_updates_dict['hostIds'] += component_mutable_dict['hostIds']
             component_updates_dict['hostIds'] = list(set(component_updates_dict['hostIds']))
@@ -86,6 +101,15 @@ class ClusterTopologyCache(ClusterCache):
     self.rewrite_cache(mutable_dict)
 
   def cache_delete(self, cache_update):
+    """
+    Handle event of delete on topology.
+
+    Possible scenarios are:
+    - delete host
+    - delete component
+    - delete component host
+    - delete cluster
+    """
     mutable_dict = self._get_mutable_copy()
     clusters_ids_to_delete = []
 
@@ -97,7 +121,7 @@ class ClusterTopologyCache(ClusterCache):
       if 'hosts' in cluster_updates_dict:
         hosts_mutable_list = mutable_dict[cluster_id]['hosts']
         for host_updates_dict in cluster_updates_dict['hosts']:
-          host_to_delete = ClusterTopologyCache.find_host_by_id(hosts_mutable_list, cluster_id, host_updates_dict['hostId'])
+          host_to_delete = ClusterTopologyCache.find_host_by_id(hosts_mutable_list, host_updates_dict['hostId'])
           if host_to_delete is not None:
             mutable_dict[cluster_id]['hosts'] = [host_dict for host_dict in hosts_mutable_list if host_dict != host_to_delete]
           else:
@@ -106,7 +130,7 @@ class ClusterTopologyCache(ClusterCache):
       if 'components' in cluster_updates_dict:
         components_mutable_list = mutable_dict[cluster_id]['components']
         for component_updates_dict in cluster_updates_dict['components']:
-          component_mutable_dict = ClusterTopologyCache.find_component(components_mutable_list, cluster_id, component_updates_dict['serviceName'], component_updates_dict['componentName'])
+          component_mutable_dict = ClusterTopologyCache.find_component(components_mutable_list, component_updates_dict['serviceName'], component_updates_dict['componentName'])
           if 'hostIds' in component_mutable_dict:
             exclude_host_ids = component_updates_dict['hostIds']
             component_mutable_dict['hostIds'] = [host_id for host_id in component_mutable_dict['hostIds'] if host_id not in exclude_host_ids]

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index afaf77d..d0f6801 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -43,6 +43,7 @@ class CommandStatusDict():
     self.current_state = {} # Contains all statuses
     self.lock = threading.RLock()
     self.initializer_module = initializer_module
+    self.reported_reports = set()
 
 
   def put_command_status(self, command, new_report):
@@ -52,6 +53,7 @@ class CommandStatusDict():
     key = command['taskId']
     with self.lock: # Synchronized
       self.current_state[key] = (command, new_report)
+      self.reported_reports.discard(key)
 
     self.force_update_to_server([new_report])
 
@@ -69,6 +71,7 @@ class CommandStatusDict():
     FAILED. Statuses for COMPLETE or FAILED commands are forgotten after
     generation
     """
+    self.generated_reports = []
     from ActionQueue import ActionQueue
     with self.lock: # Synchronized
       resultReports = []
@@ -78,17 +81,20 @@ class CommandStatusDict():
         if command ['commandType'] in [ActionQueue.EXECUTION_COMMAND, ActionQueue.BACKGROUND_EXECUTION_COMMAND]:
           if (report['status']) != ActionQueue.IN_PROGRESS_STATUS:
             resultReports.append(report)
-            # Removing complete/failed command status from dict
-            del self.current_state[key]
+            self.reported_reports.append(key)
           else:
             in_progress_report = self.generate_in_progress_report(command, report)
             resultReports.append(in_progress_report)
         elif command ['commandType'] in [ActionQueue.AUTO_EXECUTION_COMMAND]:
           logger.debug("AUTO_EXECUTION_COMMAND task deleted " + str(command['commandId']))
-          del self.current_state[key]
+          self.reported_reports.append(key)
           pass
       return resultReports
 
+  def clear_reported_reports(self):
+    with self.lock:
+      for key in self.reported_reports:
+        del self.current_state[key]
 
   def generate_in_progress_report(self, command, report):
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
index 216f20b..6ee4474 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
@@ -42,12 +42,14 @@ class CommandStatusReporter(threading.Thread):
 
     while not self.stop_event.is_set():
       try:
-        # TODO STOMP: if not registered, reports should not be on agent until next registration
-        report = self.commandStatuses.generate_report()
-        if report and self.initializer_module.is_registered:
-          self.initializer_module.connection.send(message=report, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
-        self.stop_event.wait(self.command_reports_interval)
+        if self.initializer_module.is_registered:
+          report = self.commandStatuses.generate_report()
+          if report:
+            self.initializer_module.connection.send(message=report, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
+          self.commandStatuses.clear_reported_reports()
       except:
         logger.exception("Exception in CommandStatusReporter. Re-running it")
-        pass
+
+      self.stop_event.wait(self.command_reports_interval)
+
     logger.info("CommandStatusReporter has successfully finished")

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 41f0df4..3a2e105 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -72,7 +72,7 @@ class ComponentStatusExecutor(threading.Thread):
               component_name = component_dict.componentName
 
               # TODO STOMP: run real command
-              logger.info("Running {0}/{1}".format(component_dict.statusCommandParams.servicePackageFolder, component_dict.statusCommandParams.script))
+              logger.info("Running {0}/{1}".format(component_dict.statusCommandParams.service_package_folder, component_dict.statusCommandParams.script))
               #self.customServiceOrchestrator.requestComponentStatus(command)
               status = random.choice(["INSTALLED","STARTED"])
               result = {
@@ -87,16 +87,14 @@ class ComponentStatusExecutor(threading.Thread):
                 logging.info("Status for {0} has changed to {1}".format(component_name, status))
                 cluster_reports[cluster_id].append(result)
 
-        # TODO STOMP: what if not registered?
         self.send_updates_to_server(cluster_reports)
-        self.stop_event.wait(Constants.STATUS_COMMANDS_PACK_INTERVAL_SECONDS)
       except:
         logger.exception("Exception in ComponentStatusExecutor. Re-running it")
-        pass
+
+      self.stop_event.wait(Constants.STATUS_COMMANDS_PACK_INTERVAL_SECONDS)
     logger.info("ComponentStatusExecutor has successfully finished")
 
   def send_updates_to_server(self, cluster_reports):
-    # TODO STOMP: override send to send dicts and lists? and not use json.dump
     if not cluster_reports or not self.initializer_module.is_registered:
       return
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index e15d1d8..8fafca1 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -22,13 +22,13 @@ limitations under the License.
 COMMANDS_TOPIC = '/user/commands'
 CONFIGURATIONS_TOPIC = '/user/configs'
 METADATA_TOPIC = '/events/metadata'
-TOPOLOGIES_TOPIC = '/events/topology'
+TOPOLOGIES_TOPIC = '/events/topologies'
 SERVER_RESPONSES_TOPIC = '/user/'
 
 PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC]
 POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC]
 
-TOPOLOGY_REQUEST_ENDPOINT = '/agents/topology'
+TOPOLOGY_REQUEST_ENDPOINT = '/agents/topologies'
 METADATA_REQUEST_ENDPOINT = '/agents/metadata'
 CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs'
 COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status'

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
index 836ab07..fefe32a 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
@@ -26,7 +26,6 @@ import signal
 import threading
 import traceback
 from ambari_commons.os_family_impl import OsFamilyImpl
-import sys
 
 from ambari_agent.RemoteDebugUtils import bind_debug_signal_handlers
 
@@ -80,15 +79,14 @@ class HeartbeatStopHandlersWindows(HeartbeatStopHandlers):
 # linux impl
 
 def signal_handler(signum, frame):
-  global _handler
   logger.info("Ambari-agent received {0} signal, stopping...".format(signum))
-  _handler.set_stop()
+  _handler.set()
 
 
 def debug(sig, frame):
   """Interrupt running process, and provide a stacktrace of threads """
   d = {'_frame': frame}  # Allow access to frame object.
-  d.update(frame.f_globals)  # Unless shadowed by global
+  d.update(frame.f_globals)  # Uamnless shadowed by global
   d.update(frame.f_locals)
 
   message = "Signal received.\nTraceback:\n"
@@ -123,7 +121,7 @@ class HeartbeatStopHandlersLinux(HeartbeatStopHandlers):
 
 
 
-def bind_signal_handlers(agentPid):
+def bind_signal_handlers(agentPid, stop_event):
   global _handler
   if OSCheck.get_os_family() != OSConst.WINSRV_FAMILY:
     if os.getpid() == agentPid:
@@ -132,7 +130,7 @@ def bind_signal_handlers(agentPid):
 
       bind_debug_signal_handlers()
 
-    _handler = HeartbeatStopHandlersLinux()
+    _handler = stop_event
   else:
-    _handler = HeartbeatStopHandlersWindows()
+    _handler = stop_event
   return _handler

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index b7bb5ed..e54c0c1 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -23,6 +23,9 @@ import ambari_stomp
 import threading
 
 from ambari_agent import Constants
+from ambari_agent.Register import Register
+from ambari_agent.Utils import BlockingDictionary
+from ambari_agent.Utils import Utils
 from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener
 from ambari_agent.listeners.TopologyEventListener import TopologyEventListener
 from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEventListener
@@ -30,6 +33,7 @@ from ambari_agent.listeners.MetadataEventListener import MetadataEventListener
 from ambari_agent.listeners.CommandsEventListener import CommandsEventListener
 
 HEARTBEAT_INTERVAL = 10
+REQUEST_RESPONSE_TIMEOUT = 10
 
 logger = logging.getLogger(__name__)
 
@@ -42,6 +46,8 @@ class HeartbeatThread(threading.Thread):
     self.heartbeat_interval = HEARTBEAT_INTERVAL
     self.stop_event = initializer_module.stop_event
 
+    self.registration_builder = Register(initializer_module.ambariConfig)
+
     self.initializer_module = initializer_module
     self.caches = [initializer_module.metadata_cache, initializer_module.topology_cache, initializer_module.configurations_cache]
 
@@ -52,18 +58,19 @@ class HeartbeatThread(threading.Thread):
     self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache)
     self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache)
     self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener]
+
     self.post_registration_requests = [
     (Constants.TOPOLOGY_REQUEST_ENDPOINT, initializer_module.topology_cache, self.topology_events_listener),
     (Constants.METADATA_REQUEST_ENDPOINT, initializer_module.metadata_cache, self.metadata_events_listener),
     (Constants.CONFIGURATIONS_REQUEST_ENDPOINT, initializer_module.configurations_cache, self.configuration_events_listener)
     ]
+    self.responseId = 0
 
 
   def run(self):
     """
     Run an endless loop of hearbeat with registration upon init or exception in heartbeating.
     """
-    # TODO STOMP: stop the thread on SIGTERM
     while not self.stop_event.is_set():
       try:
         if not self.initializer_module.is_registered:
@@ -73,17 +80,17 @@ class HeartbeatThread(threading.Thread):
         logger.debug("Heartbeat body is {0}".format(heartbeat_body))
         response = self.blocking_request(heartbeat_body, Constants.HEARTBEAT_ENDPOINT)
         logger.debug("Heartbeat response is {0}".format(response))
-
-        self.stop_event.wait(self.heartbeat_interval)
-        # TODO STOMP: handle heartbeat reponse
+        self.handle_heartbeat_reponse(response)
       except:
         logger.exception("Exception in HeartbeatThread. Re-running the registration")
-        self.stop_event.wait(self.heartbeat_interval)
         self.initializer_module.is_registered = False
         self.initializer_module.connection.disconnect()
-        pass
+        delattr(self.initializer_module, '_connection')
+
+      self.stop_event.wait(self.heartbeat_interval)
 
     self.initializer_module.connection.disconnect()
+    delattr(self.initializer_module, '_connection')
     logger.info("HeartbeatThread has successfully finished")
 
   def register(self):
@@ -93,7 +100,7 @@ class HeartbeatThread(threading.Thread):
     self.add_listeners()
     self.subscribe_to_topics(Constants.PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE)
 
-    registration_request = self.get_registration_request()
+    registration_request = self.registration_builder.build()
     logger.info("Sending registration request")
     logger.debug("Registration request is {0}".format(registration_request))
 
@@ -102,26 +109,53 @@ class HeartbeatThread(threading.Thread):
     logger.info("Registration response received")
     logger.debug("Registration response is {0}".format(response))
 
-    self.registration_response = response
+    self.handle_registration_response(response)
 
     for endpoint, cache, listener in self.post_registration_requests:
+      # should not hang forever on these requests
       response = self.blocking_request({'hash': cache.hash}, endpoint)
-      listener.on_event({}, response)
+      try:
+        listener.on_event({}, response)
+      except:
+        logger.exception("Exception while handing response to request at {0}. {1}".format(endpoint, response))
+        raise
 
     self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
     self.initializer_module.is_registered = True
 
-  def get_registration_request(self):
-    """
-    Get registration request body to send it to server
-    """
-    return {'registration-response':'true'}
+  def handle_registration_response(self, response):
+    # exitstatus is a code of error which was raised on server side.
+    # exitstatus = 0 (OK - Default)
+    # exitstatus = 1 (Registration failed because different version of agent and server)
+    exitstatus = 0
+    if 'exitstatus' in response.keys():
+      exitstatus = int(response['exitstatus'])
+
+    if exitstatus != 0:
+      # log - message, which will be printed to agents log
+      if 'log' in response.keys():
+        error_message = "Registration failed due to: {0}".format(response['log'])
+      else:
+        error_message = "Registration failed"
+
+      raise Exception(error_message)
+
+    self.responseId = int(response['id'])
+
+  def handle_heartbeat_reponse(self, response):
+    serverId = int(response['id'])
+
+    if serverId != self.responseId + 1:
+      logger.error("Error in responseId sequence - restarting")
+      Utils.restartAgent()
+    else:
+      self.responseId = serverId
 
   def get_heartbeat_body(self):
     """
     Heartbeat body to be send to server
     """
-    return {'hostname':'true'}
+    return {'id':self.responseId}
 
   def add_listeners(self):
     """
@@ -134,9 +168,12 @@ class HeartbeatThread(threading.Thread):
     for topic_name in topics_list:
       self.initializer_module.connection.subscribe(destination=topic_name, id='sub', ack='client-individual')
 
-  def blocking_request(self, message, destination):
+  def blocking_request(self, message, destination, timeout=REQUEST_RESPONSE_TIMEOUT):
     """
     Send a request to server and waits for the response from it. The response it detected by the correspondence of correlation_id.
     """
     correlation_id = self.initializer_module.connection.send(message=message, destination=destination)
-    return self.server_responses_listener.responses.blocking_pop(str(correlation_id))
\ No newline at end of file
+    try:
+      return self.server_responses_listener.responses.blocking_pop(str(correlation_id), timeout=timeout)
+    except BlockingDictionary.DictionaryPopTimeout:
+      raise Exception("{0} seconds timeout expired waiting for response from server at {1} to message from {2}".format(timeout, Constants.SERVER_RESPONSES_TOPIC, destination))

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/NetUtil.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/NetUtil.py b/ambari-agent/src/main/python/ambari_agent/NetUtil.py
index 9b29633..3f4895d 100644
--- a/ambari-agent/src/main/python/ambari_agent/NetUtil.py
+++ b/ambari-agent/src/main/python/ambari_agent/NetUtil.py
@@ -19,7 +19,6 @@ import logging
 import httplib
 import sys
 from ssl import SSLError
-from HeartbeatHandlers import HeartbeatStopHandlers
 from ambari_agent.AmbariConfig import AmbariConfig
 from ambari_commons.inet_utils import ensure_ssl_using_protocol
 
@@ -44,16 +43,8 @@ class NetUtil:
   # For testing purposes
   DEBUG_STOP_RETRIES_FLAG = False
 
-  # Stop implementation
-  # Typically, it waits for a certain time for the daemon/service to receive the stop signal.
-  # Received the number of seconds to wait as an argument
-  # Returns true if the application is stopping, false if continuing execution
-  stopCallback = None
-
-  def __init__(self, config, stop_callback=None):
-    if stop_callback is None:
-      stop_callback = HeartbeatStopHandlers()
-    self.stopCallback = stop_callback
+  def __init__(self, config, stop_event=None):
+    self.stop_event = stop_event
     self.config = config
     self.connect_retry_delay = int(config.get('server','connect_retry_delay',
                                               default=self.DEFAULT_CONNECT_RETRY_DELAY_SEC))
@@ -71,13 +62,13 @@ class NetUtil:
 
     try:
       parsedurl = urlparse(url)
-      
+
       if sys.version_info >= (2,7,9) and not ssl_verify_cert:
           import ssl
           ca_connection = httplib.HTTPSConnection(parsedurl[1], context=ssl._create_unverified_context())
       else:
           ca_connection = httplib.HTTPSConnection(parsedurl[1])
-          
+
       ca_connection.request("GET", parsedurl[2])
       response = ca_connection.getresponse()
       status = response.status
@@ -121,11 +112,13 @@ class NetUtil:
             self.connect_retry_delay))
         retries += 1
 
-      if 0 == self.stopCallback.wait(self.connect_retry_delay):
+      self.stop_event.wait(self.connect_retry_delay)
+
+      if self.stop_event.is_set():
         #stop waiting
         if logger is not None:
           logger.info("Stop event received")
-        self.DEBUG_STOP_RETRIES_FLAG = True
+        break
 
     return retries, connected, self.DEBUG_STOP_RETRIES_FLAG
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/Register.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Register.py b/ambari-agent/src/main/python/ambari_agent/Register.py
index 0c811c6..4c4bf32 100644
--- a/ambari-agent/src/main/python/ambari_agent/Register.py
+++ b/ambari-agent/src/main/python/ambari_agent/Register.py
@@ -18,15 +18,12 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
-import os
 import time
-import subprocess
-from Hardware import Hardware
-import hostname
-from HostInfo import HostInfo
+from ambari_agent import hostname
+from ambari_agent.Hardware import Hardware
+from ambari_agent.HostInfo import HostInfo
+from ambari_agent.Utils import Utils
 
-
-firstContact = True
 class Register:
   """ Registering with the server. Get the hardware profile and
   declare success for now """
@@ -34,24 +31,23 @@ class Register:
     self.config = config
     self.hardware = Hardware(self.config)
 
-  def build(self, version, id='-1'):
-    global clusterId, clusterDefinitionRevision, firstContact
+  def build(self, response_id='-1'):
     timestamp = int(time.time()*1000)
 
     hostInfo = HostInfo(self.config)
     agentEnv = { }
     hostInfo.register(agentEnv, False, False)
 
-    current_ping_port = self.config.get('agent','current_ping_port')
+    current_ping_port = self.config.get('agent','ping_port')
 
-    register = { 'responseId'        : int(id),
+    register = { 'id'                : int(response_id),
                  'timestamp'         : timestamp,
                  'hostname'          : hostname.hostname(self.config),
                  'currentPingPort'   : int(current_ping_port),
                  'publicHostname'    : hostname.public_hostname(self.config),
                  'hardwareProfile'   : self.hardware.get(),
                  'agentEnv'          : agentEnv,
-                 'agentVersion'      : version,
+                 'agentVersion'      : Utils.read_agent_version(self.config),
                  'prefix'            : self.config.get('agent', 'prefix')
                }
     return register

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/Utils.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py
index f94eba0..845eb30 100644
--- a/ambari-agent/src/main/python/ambari_agent/Utils.py
+++ b/ambari-agent/src/main/python/ambari_agent/Utils.py
@@ -17,8 +17,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
+import os
 import threading
 from functools import wraps
+from ambari_agent.ExitHelper import ExitHelper
+
+AGENT_AUTO_RESTART_EXIT_CODE = 77
 
 class BlockingDictionary():
   """
@@ -39,16 +43,20 @@ class BlockingDictionary():
       self.dict[key] = value
     self.put_event.set()
 
-  def blocking_pop(self, key):
+  def blocking_pop(self, key, timeout=None):
     """
     Block until a key in dictionary is available and than pop it.
+    If timeout exceeded None is returned.
     """
     with self.dict_lock:
       if key in self.dict:
         return self.dict.pop(key)
 
     while True:
-      self.put_event.wait()
+      self.put_event.wait(timeout)
+      if not self.put_event.is_set():
+        raise BlockingDictionary.DictionaryPopTimeout()
+
       self.put_event.clear()
       with self.dict_lock:
         if key in self.dict:
@@ -60,6 +68,9 @@ class BlockingDictionary():
   def __str__(self):
     return self.dict.__str__()
 
+  class DictionaryPopTimeout(Exception):
+    pass
+
 class Utils(object):
   @staticmethod
   def make_immutable(value):
@@ -84,6 +95,19 @@ class Utils(object):
 
     return param
 
+  @staticmethod
+  def read_agent_version(config):
+    data_dir = config.get('agent', 'prefix')
+    ver_file = os.path.join(data_dir, 'version')
+    with open(ver_file, "r") as f:
+      version = f.read().strip()
+    return version
+
+  @staticmethod
+  def restartAgent():
+    # TODO STOMP: set stop event?
+    ExitHelper().exit(AGENT_AUTO_RESTART_EXIT_CODE)
+
 class ImmutableDictionary(dict):
   def __init__(self, dictionary):
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
index b851443..c3839cb 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
@@ -40,12 +40,11 @@ class CommandsEventListener(EventListener):
     @param headers: headers dictionary
     @param message: message payload dictionary
     """
+    ""
     commands = []
-    for cluster_id in message.keys():
-      cluster_dict = message[cluster_id]
-      host_level_params = cluster_dict['hostLevelParams']
+    for cluster_id in message['clusters'].keys():
+      cluster_dict = message['clusters'][cluster_id]
       for command in cluster_dict['commands']:
-        command['hostLevelParams'] = host_level_params
         commands.append(command)
 
     self.action_queue.put(commands)

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
index ddc5900..3dead4b 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
@@ -17,7 +17,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 """
 
-import json
+import ambari_simplejson as json
 import ambari_stomp
 import logging
 
@@ -38,14 +38,12 @@ class EventListener(ambari_stomp.ConnectionListener):
       return
 
     destination = headers['destination']
-
     if destination.rstrip('/') == self.get_handled_path().rstrip('/'):
       try:
         message_json = json.loads(message)
       except ValueError:
         logger.exception("Received from server event is not a valid message json. Message is:\n{0}".format(message))
         return
-
       logger.info("Event from server at {0}{1}".format(destination, self.get_log_message(headers, message_json)))
       try:
         self.on_event(headers, message_json)

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 72d6c70..a1a98e8 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -79,11 +79,8 @@ fix_encoding_reimport_bug()
 
 import logging.handlers
 import logging.config
-import signal
 from optparse import OptionParser
 import sys
-import traceback
-import getpass
 import os
 import time
 import locale
@@ -92,7 +89,6 @@ import ConfigParser
 import ProcessHelper
 import resource
 from logging.handlers import SysLogHandler
-from Controller import Controller
 import AmbariConfig
 from NetUtil import NetUtil
 from PingPortListener import PingPortListener
@@ -102,9 +98,7 @@ from ambari_agent.ExitHelper import ExitHelper
 import socket
 from ambari_commons import OSConst, OSCheck
 from ambari_commons.shell import shellRunner
-from ambari_commons.network import reconfigure_urllib2_opener
-from ambari_commons import shell
-import HeartbeatHandlers
+#from ambari_commons.network import reconfigure_urllib2_opener
 from HeartbeatHandlers import bind_signal_handlers
 from ambari_commons.constants import AMBARI_SUDO_BINARY
 from resource_management.core.logger import Logger
@@ -349,13 +343,7 @@ def reset_agent(options):
 
 MAX_RETRIES = 10
 
-# TODO STOMP: remove from globals
-initializer_module = None
-
-def run_threads():
-  global initializer_module
-  initializer_module = InitializerModule()
-
+def run_threads(initializer_module):
   heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
   heartbeat_thread.start()
 
@@ -370,13 +358,12 @@ def run_threads():
   while not initializer_module.stop_event.is_set():
     time.sleep(0.1)
 
-  # TODO STOMP: if thread cannot stop by itself kill it hard after some timeout.
   heartbeat_thread.join()
   component_status_executor.join()
 
 # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
 # we need this for windows os, where no sigterm available
-def main(heartbeat_stop_callback=None):
+def main(initializer_module, heartbeat_stop_callback=None):
   global config
   global home_dir
 
@@ -449,7 +436,7 @@ def main(heartbeat_stop_callback=None):
 
   if not config.use_system_proxy_setting():
     logger.info('Agent is configured to ignore system proxy settings')
-    reconfigure_urllib2_opener(ignore_system_proxy=True)
+    #reconfigure_urllib2_opener(ignore_system_proxy=True)
 
   if not OSCheck.get_os_family() == OSConst.WINSRV_FAMILY:
     daemonize()
@@ -475,7 +462,7 @@ def main(heartbeat_stop_callback=None):
         logger.warn("Unable to determine the IP address of the Ambari server '%s'", server_hostname)
 
       # Wait until MAX_RETRIES to see if server is reachable
-      netutil = NetUtil(config, heartbeat_stop_callback)
+      netutil = NetUtil(config, initializer_module.stop_event)
       (retries, connected, stopped) = netutil.try_to_connect(server_url, MAX_RETRIES, logger)
 
       # if connected, launch controller
@@ -484,7 +471,7 @@ def main(heartbeat_stop_callback=None):
         # Set the active server
         active_server = server_hostname
         # Launch Controller communication
-        run_threads()
+        run_threads(initializer_module)
 
       #
       # If Ambari Agent connected to the server or
@@ -503,9 +490,10 @@ def main(heartbeat_stop_callback=None):
 if __name__ == "__main__":
   is_logger_setup = False
   try:
-    heartbeat_stop_callback = bind_signal_handlers(agentPid)
+    initializer_module = InitializerModule()
+    heartbeat_stop_callback = bind_signal_handlers(agentPid, initializer_module.stop_event)
 
-    main(heartbeat_stop_callback)
+    main(initializer_module, heartbeat_stop_callback)
   except SystemExit:
     raise
   except BaseException:

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
index 87417fc..5db53c8 100644
--- a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
+++ b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
@@ -54,7 +54,6 @@ class BaseStompServerTestCase(unittest.TestCase):
   """
 
   def setUp(self):
-
     self.clients = []
     self.server = None  # This gets set in the server thread.
     self.server_address = None  # This gets set in the server thread.
@@ -100,7 +99,7 @@ class BaseStompServerTestCase(unittest.TestCase):
   def tearDown(self):
     for c in self.clients:
       c.close()
-    self.server.shutdown() # server_close takes too much time
+    self.server.server_close()
     self.server_thread.join()
     self.ready_event.clear()
     del self.server_thread
@@ -157,6 +156,17 @@ class BaseStompServerTestCase(unittest.TestCase):
       if os.path.isfile(filepath):
         os.remove(filepath)
 
+  def assert_with_retries(self, func, tries, try_sleep):
+    # wait for 2 seconds
+    for i in range(tries):
+      try:
+        func()
+        break
+      except AssertionError:
+        time.sleep(try_sleep)
+    else:
+      func()
+
 
 class TestStompServer(ThreadedStompServer):
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index 87c7f57..c969c75 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -36,16 +36,21 @@ from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
 from mock.mock import MagicMock, patch
 
 class TestAgentStompResponses(BaseStompServerTestCase):
-  """
-  @patch.object(CustomServiceOrchestrator, "runCommand")
-  def test_mock_server_can_start(self, runCommand_mock):
-    runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 'structuredOut' : '{}', 'exitcode':1}
-
+  def setUp(self):
     self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json'])
 
     if not os.path.exists("/tmp/ambari-agent"):
       os.mkdir("/tmp/ambari-agent")
 
+    with open("/tmp/ambari-agent/version", "w") as fp:
+      fp.write("2.5.0.0")
+
+    return super(TestAgentStompResponses, self).setUp()
+
+  @patch.object(CustomServiceOrchestrator, "runCommand")
+  def test_mock_server_can_start(self, runCommand_mock):
+    runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 'structuredOut' : '{}', 'exitcode':1}
+
     initializer_module = InitializerModule()
     heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
     heartbeat_thread.start()
@@ -101,7 +106,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     action_status_failed_frame = json.loads(self.server.frames_queue.get().body)
     initializer_module.stop_event.set()
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'heartbeat-response':'true'}))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'0'}))
     self.server.topic_manager.send(f)
 
     heartbeat_thread.join()
@@ -118,8 +123,8 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     self.assertEquals(dn_start_failed_frame[0]['status'], 'FAILED')
 
 
-    ============================================================================================
-    ============================================================================================
+    #============================================================================================
+    #============================================================================================
 
 
     initializer_module = InitializerModule()
@@ -163,24 +168,15 @@ class TestAgentStompResponses(BaseStompServerTestCase):
 
     initializer_module.stop_event.set()
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'heartbeat-response':'true'}))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'0'}))
     self.server.topic_manager.send(f)
 
     heartbeat_thread.join()
     component_status_executor.join()
     command_status_reporter.join()
     action_queue.join()
-    """
-
-  @patch.object(CustomServiceOrchestrator, "runCommand")
-  def test_topology_update_and_delete(self, runCommand_mock):
-    runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 'structuredOut' : '{}', 'exitcode':1}
-
-    self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json'])
-
-    if not os.path.exists("/tmp/ambari-agent"):
-      os.mkdir("/tmp/ambari-agent")
 
+  def test_topology_update_and_delete(self):
     initializer_module = InitializerModule()
     heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
     heartbeat_thread.start()
@@ -211,34 +207,40 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     while not initializer_module.is_registered:
       time.sleep(0.1)
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_add_component.json"))
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_add_component.json"))
     self.server.topic_manager.send(f)
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_add_component_host.json"))
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_add_component_host.json"))
     self.server.topic_manager.send(f)
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_add_host.json"))
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_add_host.json"))
     self.server.topic_manager.send(f)
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_delete_host.json"))
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_delete_host.json"))
     self.server.topic_manager.send(f)
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_delete_component.json"))
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_delete_component.json"))
     self.server.topic_manager.send(f)
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_delete_component_host.json"))
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_delete_component_host.json"))
     self.server.topic_manager.send(f)
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_delete_cluster.json"))
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_delete_cluster.json"))
     self.server.topic_manager.send(f)
 
-    time.sleep(0.1)
-    self.assertEquals(json.dumps(initializer_module.topology_cache, indent=2, sort_keys=True), json.dumps(self.get_dict_from_file("topology_cache_expected.json"), indent=2, sort_keys=True))
-    #self.assertEquals(initializer_module.topology_cache, self.get_dict_from_file("topology_cache_expected.json"))
+    def is_json_equal():
+      json_topology = json.dumps(initializer_module.topology_cache, indent=2, sort_keys=True)
+      json_excepted_lopology = json.dumps(self.get_dict_from_file("topology_cache_expected.json"), indent=2, sort_keys=True)
+      #print json_topology
+      #print json_excepted_lopology
+      self.assertEquals(json_topology, json_excepted_lopology)
+      #self.assertEquals(initializer_module.topology_cache, self.get_dict_from_file("topology_cache_expected.json"))
+
+    self.assert_with_retries(is_json_equal, tries=40, try_sleep=0.1)
 
     initializer_module.stop_event.set()
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'heartbeat-response':'true'}))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'0'}))
     self.server.topic_manager.send(f)
 
     heartbeat_thread.join()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
index 525af5c..536233d 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
@@ -1,115 +1,89 @@
 {
-  "0":{
-    "hostLevelParams":{
-      "agent_stack_retry_on_unavailability":"false",
-      "unlimited_key_jce_required":"false",
-      "group_list":"[\"hadoop\",\"users\"]",
-      "host_sys_prepped":"false",
-      "ambari_db_rca_username":"mapred",
-      "jdk_name":"jdk-8u112-linux-x64.tar.gz",
-      "mysql_jdbc_url":"http://gc6401:8080/resources//mysql-connector-java.jar",
-      "agent_stack_retry_count":"5",
-      "user_groups":"{}",
-      "stack_version":"2.5",
-      "stack_name":"HDP",
-      "ambari_db_rca_driver":"org.postgresql.Driver",
-      "java_home":"/usr/jdk64/jdk1.8.0_112",
-      "jdk_location":"http://gc6401:8080/resources/",
-      "not_managed_hdfs_path_list":"[\"/tmp\"]",
-      "ambari_db_rca_url":"jdbc:postgresql://gc6401/ambarirca",
-      "java_version":"8",
-      "repo_info":"...",
-      "db_name":"ambari",
-      "agentCacheDir":"/var/lib/ambari-agent/cache",
-      "ambari_db_rca_password":"mapred",
-      "jce_name":"jce_policy-8.zip",
-      "oracle_jdbc_url":"http://gc6401:8080/resources//ojdbc6.jar",
-      "db_driver_filename":"mysql-connector-java.jar",
-      "user_list":"[\"zookeeper\",\"ambari-qa\",\"hdfs\"]",
-      "clientsToUpdateConfigs":"[\"*\"]"
-    },
-    "commands":[
-      {
-        "requestId":5,
-        "taskId":9,
-        "commandId":1,
-        "serviceName":"HDFS",
-        "role":"DATANODE",
-        "commandType":"EXECUTION_COMMAND",
-        "roleCommand":"START",
-        "clusterName": "c1",
-        "clusterId": 0,
-        "configuration_credentials":{
+  "clusters": {
+    "0":{
+      "commands":[
+        {
+          "requestId":5,
+          "taskId":9,
+          "commandId":1,
+          "serviceName":"HDFS",
+          "role":"DATANODE",
+          "commandType":"EXECUTION_COMMAND",
+          "roleCommand":"START",
+          "clusterName": "c1",
+          "clusterId": 0,
+          "configuration_credentials":{
 
+          },
+          "commandParams":{
+            "service_package_folder":"common-services/HDFS/2.1.0.2.0/package",
+            "hooks_folder":"HDP/2.0.6/hooks",
+            "script":"scripts/datanode.py",
+            "phase":"INITIAL_START",
+            "max_duration_for_retries":"600",
+            "command_retry_enabled":"false",
+            "command_timeout":"1200",
+            "refresh_topology":"True",
+            "script_type":"PYTHON"
+          }
         },
-        "commandParams":{
-          "service_package_folder":"common-services/HDFS/2.1.0.2.0/package",
-          "hooks_folder":"HDP/2.0.6/hooks",
-          "script":"scripts/datanode.py",
-          "phase":"INITIAL_START",
-          "max_duration_for_retries":"600",
-          "command_retry_enabled":"false",
-          "command_timeout":"1200",
-          "refresh_topology":"True",
-          "script_type":"PYTHON"
-        }
-      },
-      {
-        "requestId":6,
-        "taskId":9,
-        "commandId":0,
-        "clusterId": "null",
-        "serviceName":"ZOOKEEPER",
-        "role":"ZOOKEEPER_SERVER",
-        "commandType":"EXECUTION_COMMAND",
-        "roleCommand":"START",
-        "clusterName": "c1",
-        "configuration_credentials":{
+        {
+          "requestId":6,
+          "taskId":9,
+          "commandId":0,
+          "clusterId": "null",
+          "serviceName":"ZOOKEEPER",
+          "role":"ZOOKEEPER_SERVER",
+          "commandType":"EXECUTION_COMMAND",
+          "roleCommand":"START",
+          "clusterName": "c1",
+          "configuration_credentials":{
 
-        },
-        "commandParams":{
-          "service_package_folder":"common-services/ZOOKEEPER/3.4.5/package",
-          "hooks_folder":"HDP/2.0.6/hooks",
-          "script":"scripts/datanode.py",
-          "phase":"INITIAL_START",
-          "max_duration_for_retries":"600",
-          "command_retry_enabled":"false",
-          "command_timeout":"1200",
-          "refresh_topology":"True",
-          "script_type":"PYTHON"
+          },
+          "commandParams":{
+            "service_package_folder":"common-services/ZOOKEEPER/3.4.5/package",
+            "hooks_folder":"HDP/2.0.6/hooks",
+            "script":"scripts/datanode.py",
+            "phase":"INITIAL_START",
+            "max_duration_for_retries":"600",
+            "command_retry_enabled":"false",
+            "command_timeout":"1200",
+            "refresh_topology":"True",
+            "script_type":"PYTHON"
+          }
         }
-      }
-    ]
-  },
-  "-1":{
-    "hostLevelParams":{
-      "agent_stack_retry_count":"5",
-      "agent_stack_retry_on_unavailability":"false",
-      "agentCacheDir":"/var/lib/ambari-agent/cache"
+      ]
     },
-    "commands":[
-      {
-        "roleCommand":"ACTIONEXECUTE",
-        "serviceName":"null",
-        "role":"check_host",
-        "commandType":"EXECUTION_COMMAND",
-        "taskId":2,
-        "commandId":1,
-        "clusterId": "null",
-        "commandParams":{
-          "script":"check_host.py",
-          "check_execute_list":"host_resolution_check",
-          "threshold":"20",
-          "hosts":"gc6401",
-          "command_timeout":"900",
-          "script_type":"PYTHON"
-        },
-        "roleParams":{
-          "threshold":"20",
-          "check_execute_list":"host_resolution_check",
-          "hosts":"gc6401"
+    "-1":{
+      "commands":[
+        {
+          "roleCommand":"ACTIONEXECUTE",
+          "serviceName":"null",
+          "role":"check_host",
+          "commandType":"EXECUTION_COMMAND",
+          "taskId":2,
+          "commandId":1,
+          "clusterId": "null",
+          "commandParams":{
+            "script":"check_host.py",
+            "check_execute_list":"host_resolution_check",
+            "threshold":"20",
+            "hosts":"gc6401",
+            "command_timeout":"900",
+            "script_type":"PYTHON"
+          },
+          "roleParams":{
+            "threshold":"20",
+            "check_execute_list":"host_resolution_check",
+            "hosts":"gc6401"
+          },
+          "hostLevelParams":{
+            "agent_stack_retry_count":"5",
+            "agent_stack_retry_on_unavailability":"false",
+            "agentCacheDir":"/var/lib/ambari-agent/cache"
+          }
         }
-      }
-    ]
+      ]
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/5d096242/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/registration_response.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/registration_response.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/registration_response.json
index 6873d7b..5564794 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/registration_response.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/registration_response.json
@@ -1,5 +1,5 @@
 {
-  "responseId":0,
+  "id":0,
   "exitstatus":0,
   "errorMessage":"",
   "clustersList":[