You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/05/29 08:29:23 UTC

ambari git commit: AMBARI-21134. Get initial metadata, topology, configs from another endpoint (aonishuk)

Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf b094c753e -> 6bad191bb


AMBARI-21134. Get initial metadata, topology, configs from another endpoint (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6bad191b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6bad191b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6bad191b

Branch: refs/heads/branch-3.0-perf
Commit: 6bad191bb3cf009cbe7d1d7a92942d47770ab31d
Parents: b094c75
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Mon May 29 11:28:44 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Mon May 29 11:28:44 2017 +0300

----------------------------------------------------------------------
 .../main/python/ambari_agent/ClusterCache.py    |   3 +-
 .../python/ambari_agent/CommandStatusDict.py    |   3 +-
 .../ambari_agent/CommandStatusReporter.py       |   7 +-
 .../ambari_agent/ComponentStatusExecutor.py     |   7 +-
 .../src/main/python/ambari_agent/Constants.py   |  10 +-
 .../main/python/ambari_agent/HeartbeatThread.py |  48 ++++----
 .../python/ambari_agent/InitializerModule.py    |   2 +
 .../src/main/python/ambari_agent/Utils.py       |   3 +
 .../listeners/ServerResponsesListener.py        |  23 +++-
 .../python/ambari_agent/listeners/__init__.py   |  13 ++-
 .../src/main/python/ambari_agent/security.py    |   8 +-
 .../ambari_agent/BaseStompServerTestCase.py     |  35 +++++-
 .../ambari_agent/TestAgentStompResponses.py     | 115 ++++++++-----------
 13 files changed, 169 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
index 70b44b4..4b88f71 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
@@ -97,11 +97,10 @@ class ClusterCache(dict):
       with os.fdopen(os.open(self.__current_cache_json_file, os.O_WRONLY | os.O_CREAT, 0o600), "w") as f:
         json.dump(self, f, indent=2)
 
-  def get_md5_hashsum(self, cluster_id):
+  def get_md5_hashsum(self):
     """
     Thread-safe method for writing out the specified cluster cache
     and updating the in-memory representation.
-    :param cluster_id:
     :param cache:
     :return:
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index bb0cea3..afaf77d 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -21,7 +21,6 @@ limitations under the License.
 import logging
 import threading
 import copy
-import json
 from Grep import Grep
 
 from ambari_agent import Constants
@@ -57,7 +56,7 @@ class CommandStatusDict():
     self.force_update_to_server([new_report])
 
   def force_update_to_server(self, reports):
-    self.initializer_module.connection.send(body=json.dumps(reports), destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
+    self.initializer_module.connection.send(message=reports, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
 
   def get_command_status(self, taskId):
     with self.lock:

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
index acee3b1..216f20b 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
@@ -18,7 +18,6 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
-import json
 import logging
 import threading
 
@@ -43,10 +42,10 @@ class CommandStatusReporter(threading.Thread):
 
     while not self.stop_event.is_set():
       try:
-        # TODO STOMP: what if not registered?
+        # TODO STOMP: if not registered, reports should not be on agent until next registration
         report = self.commandStatuses.generate_report()
-        if report:
-          self.initializer_module.connection.send(body=json.dumps(report), destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
+        if report and self.initializer_module.is_registered:
+          self.initializer_module.connection.send(message=report, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
         self.stop_event.wait(self.command_reports_interval)
       except:
         logger.exception("Exception in CommandStatusReporter. Re-running it")

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 1f6a7dc..6783138 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -18,7 +18,6 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
-import json
 import random
 import logging
 import threading
@@ -98,8 +97,10 @@ class ComponentStatusExecutor(threading.Thread):
 
   def send_updates_to_server(self, cluster_reports):
     # TODO STOMP: override send to send dicts and lists? and not use json.dump
-    # TODO STOMP: skip this if server is down?
-    self.initializer_module.connection.send(body=json.dumps(cluster_reports), destination=Constants.COMPONENT_STATUS_REPORTS_ENDPOINT)
+    if not cluster_reports or not self.initializer_module.is_registered:
+      return
+
+    self.initializer_module.connection.send(message=cluster_reports, destination=Constants.COMPONENT_STATUS_REPORTS_ENDPOINT)
 
     for cluster_id, reports in cluster_reports.iteritems():
       for report in reports:

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 6a054cc..e15d1d8 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -21,12 +21,16 @@ limitations under the License.
 
 COMMANDS_TOPIC = '/user/commands'
 CONFIGURATIONS_TOPIC = '/user/configs'
-METADATA_TOPIC = '/user/metadata'
-TOPOLOGIES_TOPIC = '/user/topologies'
+METADATA_TOPIC = '/events/metadata'
+TOPOLOGIES_TOPIC = '/events/topology'
 SERVER_RESPONSES_TOPIC = '/user/'
 
-TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC, COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC]
+PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC]
+POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC]
 
+TOPOLOGY_REQUEST_ENDPOINT = '/agents/topology'
+METADATA_REQUEST_ENDPOINT = '/agents/metadata'
+CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs'
 COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status'
 COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status'
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 70fe7e7..e88fee7 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -18,11 +18,9 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
-import json
 import logging
 import ambari_stomp
 import threading
-from collections import defaultdict
 
 from ambari_agent import Constants
 from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener
@@ -41,7 +39,6 @@ class HeartbeatThread(threading.Thread):
   """
   def __init__(self, initializer_module):
     threading.Thread.__init__(self)
-    self.is_registered = False
     self.heartbeat_interval = HEARTBEAT_INTERVAL
     self.stop_event = initializer_module.stop_event
 
@@ -55,6 +52,12 @@ class HeartbeatThread(threading.Thread):
     self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache)
     self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache)
     self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener]
+    self.post_registration_requests = [
+    (Constants.TOPOLOGY_REQUEST_ENDPOINT, initializer_module.topology_cache, self.topology_events_listener),
+    (Constants.METADATA_REQUEST_ENDPOINT, initializer_module.metadata_cache, self.metadata_events_listener),
+    (Constants.CONFIGURATIONS_REQUEST_ENDPOINT, initializer_module.configurations_cache, self.configuration_events_listener)
+    ]
+
 
   def run(self):
     """
@@ -63,7 +66,7 @@ class HeartbeatThread(threading.Thread):
     # TODO STOMP: stop the thread on SIGTERM
     while not self.stop_event.is_set():
       try:
-        if not self.is_registered:
+        if not self.initializer_module.is_registered:
           self.register()
 
         heartbeat_body = self.get_heartbeat_body()
@@ -75,16 +78,19 @@ class HeartbeatThread(threading.Thread):
         # TODO STOMP: handle heartbeat reponse
       except:
         logger.exception("Exception in HeartbeatThread. Re-running the registration")
-        # TODO STOMP: re-connect here
-        self.is_registered = False
+        self.initializer_module.is_registered = False
+        self.initializer_module.connection.disconnect()
         pass
+
+    self.initializer_module.connection.disconnect()
     logger.info("HeartbeatThread has successfully finished")
 
   def register(self):
     """
     Subscribe to topics, register with server, wait for server's response.
     """
-    self.subscribe_and_listen()
+    self.add_listeners()
+    self.subscribe_to_topics(Constants.PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE)
 
     registration_request = self.get_registration_request()
     logger.info("Sending registration request")
@@ -96,20 +102,19 @@ class HeartbeatThread(threading.Thread):
     logger.debug("Registration response is {0}".format(response))
 
     self.registration_response = response
-    self.registered = True
+
+    for endpoint, cache, listener in self.post_registration_requests:
+      response = self.blocking_request({'hash': cache.get_md5_hashsum()}, endpoint)
+      listener.on_event({}, response)
+
+    self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
+    self.initializer_module.is_registered = True
 
   def get_registration_request(self):
     """
     Get registration request body to send it to server
     """
-    request = {'clusters':defaultdict(lambda:{})}
-
-    for cache in self.caches:
-      cache_key_name = cache.get_cache_name() + '_hash'
-      for cluster_id in cache.get_cluster_ids():
-        request['clusters'][cluster_id][cache_key_name] = cache.get_md5_hashsum(cluster_id)
-
-    return request
+    return {'registration-response':'true'}
 
   def get_heartbeat_body(self):
     """
@@ -117,19 +122,20 @@ class HeartbeatThread(threading.Thread):
     """
     return {'hostname':'true'}
 
-  def subscribe_and_listen(self):
+  def add_listeners(self):
     """
     Subscribe to topics and set listener classes.
     """
     for listener in self.listeners:
       self.initializer_module.connection.add_listener(listener)
 
-    for topic_name in Constants.TOPICS_TO_SUBSCRIBE:
+  def subscribe_to_topics(self, topics_list):
+    for topic_name in topics_list:
       self.initializer_module.connection.subscribe(destination=topic_name, id='sub', ack='client-individual')
 
-  def blocking_request(self, body, destination):
+  def blocking_request(self, message, destination):
     """
     Send a request to server and waits for the response from it. The response it detected by the correspondence of correlation_id.
     """
-    self.initializer_module.connection.send(body=json.dumps(body), destination=destination)
-    return self.server_responses_listener.responses.blocking_pop(str(self.initializer_module.connection.correlation_id))
\ No newline at end of file
+    correlation_id = self.initializer_module.connection.send(message=message, destination=destination)
+    return self.server_responses_listener.responses.blocking_pop(str(correlation_id))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index c36bd68..4d0ac9b 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -63,6 +63,8 @@ class InitializerModule:
     """
     self.stop_event = threading.Event()
 
+    self.is_registered = False
+
     self.metadata_cache = ClusterMetadataCache(self.cluster_cache_dir)
     self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir)
     self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir)

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/Utils.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py
index d6f0294..e48aa5f 100644
--- a/ambari-agent/src/main/python/ambari_agent/Utils.py
+++ b/ambari-agent/src/main/python/ambari_agent/Utils.py
@@ -43,6 +43,9 @@ class BlockingDictionary():
     """
     Block until a key in dictionary is available and than pop it.
     """
+    if key in self.dict:
+      return self.dict.pop(key)
+
     while True:
       self.put_event.wait()
       self.put_event.clear()

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
index 8502507..6d23c37 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
@@ -33,6 +33,7 @@ class ServerResponsesListener(EventListener):
   """
   def __init__(self):
     self.responses = Utils.BlockingDictionary()
+    self.listener_functions = {}
 
   def on_event(self, headers, message):
     """
@@ -44,9 +45,25 @@ class ServerResponsesListener(EventListener):
     @param message: message payload dictionary
     """
     if Constants.CORRELATION_ID_STRING in headers:
-      self.responses.put(headers[Constants.CORRELATION_ID_STRING], message)
+      correlation_id = headers[Constants.CORRELATION_ID_STRING]
+      self.responses.put(correlation_id, message)
+
+      if correlation_id in self.listener_functions:
+        self.listener_functions[correlation_id](headers, message)
+        del self.listener_functions[correlation_id]
     else:
-      logger.warn("Received a message from server without a '{0}' header. Ignoring the message".format(Constants.CORRELATION_ID_STRING))\
+      logger.warn("Received a message from server without a '{0}' header. Ignoring the message".format(Constants.CORRELATION_ID_STRING))
 
   def get_handled_path(self):
-    return Constants.SERVER_RESPONSES_TOPIC
\ No newline at end of file
+    return Constants.SERVER_RESPONSES_TOPIC
+
+  def get_log_message(self, headers, message_json):
+    """
+    This string will be used to log received messsage of this type
+    """
+    if Constants.CORRELATION_ID_STRING in headers:
+      correlation_id = headers[Constants.CORRELATION_ID_STRING]
+      return " (correlation_id={0}): {1}".format(correlation_id, message_json)
+    return str(message_json)
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
index 45b38ed..f05f8da 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
@@ -46,12 +46,11 @@ class EventListener(ambari_stomp.ConnectionListener):
         logger.exception("Received event from server does not  a valid json as a message. Message is:\n{0}".format(message))
         return
 
-      logger.info("Received event from {0}".format(destination))
-      logger.debug("Received event from {0}: headers={1} ; message={2}".format(destination, headers, message))
+      logger.info("Event from server at {0}{1}".format(destination, self.get_log_message(headers, message_json)))
       try:
         self.on_event(headers, message_json)
       except:
-        logger.exception("Exception while handing event from {0}: headers={1} ; message={2}".format(destination, headers, message))
+        logger.exception("Exception while handing event from {0} {1}".format(destination, headers, message))
 
   def on_event(self, headers, message):
     """
@@ -60,4 +59,10 @@ class EventListener(ambari_stomp.ConnectionListener):
     @param headers: headers dictionary
     @param message: message payload dictionary
     """
-    raise NotImplementedError()
\ No newline at end of file
+    raise NotImplementedError()
+
+  def get_log_message(self, headers, message_json):
+    """
+    This string will be used to log received messsage of this type
+    """
+    return ": " + str(message_json)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/security.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py
index b3cb16e..df45699 100644
--- a/ambari-agent/src/main/python/ambari_agent/security.py
+++ b/ambari-agent/src/main/python/ambari_agent/security.py
@@ -106,10 +106,16 @@ class AmbariStompConnection(WsConnection):
     self.correlation_id = -1
     WsConnection.__init__(self, url)
 
-  def send(self, destination, body, content_type=None, headers=None, **keyword_headers):
+  def send(self, destination, message, content_type=None, headers=None, **keyword_headers):
     self.correlation_id += 1
+
+    logger.info("Event to server at {0} (correlation_id={1}): {2}".format(destination, self.correlation_id, message))
+
+    body = json.dumps(message)
     WsConnection.send(self, destination, body, content_type=content_type, headers=headers, correlationId=self.correlation_id, **keyword_headers)
 
+    return self.correlation_id
+
   def add_listener(self, listener):
     self.set_listener(listener.__class__.__name__, listener)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
index 671387a..7380727 100644
--- a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
+++ b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
@@ -18,6 +18,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
+import json
 import ambari_stomp
 import os
 import sys
@@ -42,6 +43,8 @@ from coilmq.store.memory import MemoryQueue
 from coilmq.scheduler import FavorReliableSubscriberScheduler, RandomQueueScheduler
 from coilmq.protocol import STOMP10
 
+logger = logging.getLogger(__name__)
+
 class BaseStompServerTestCase(unittest.TestCase):
   """
   Base class for test cases provides the fixtures for setting up the multi-threaded
@@ -58,6 +61,7 @@ class BaseStompServerTestCase(unittest.TestCase):
     self.ready_event = threading.Event()
 
     addr_bound = threading.Event()
+    self.init_stdout_logger()
 
     def start_server():
       self.server = TestStompServer(('127.0.0.1', 21613),
@@ -123,6 +127,30 @@ class BaseStompServerTestCase(unittest.TestCase):
     with open(filepath) as f:
       return f.read()
 
+  def init_stdout_logger(self):
+    format='%(levelname)s %(asctime)s - %(message)s'
+
+    logger = logging.getLogger()
+    logger.setLevel(logging.INFO)
+    formatter = logging.Formatter(format)
+    chout = logging.StreamHandler(sys.stdout)
+    chout.setLevel(logging.INFO)
+    chout.setFormatter(formatter)
+    cherr = logging.StreamHandler(sys.stderr)
+    cherr.setLevel(logging.ERROR)
+    cherr.setFormatter(formatter)
+    logger.handlers = []
+    logger.addHandler(cherr)
+    logger.addHandler(chout)
+
+    logging.getLogger('stomp.py').setLevel(logging.WARN)
+    logging.getLogger('coilmq').setLevel(logging.INFO)
+
+  def remove_files(self, filepathes):
+    for filepath in filepathes:
+      if os.path.isfile(filepath):
+        os.remove(filepath)
+
 
 class TestStompServer(ThreadedStompServer):
     """
@@ -235,9 +263,14 @@ class TestCaseTcpConnection(ambari_stomp.Connection):
     self.correlation_id = -1
     ambari_stomp.Connection.__init__(self, host_and_ports=[('127.0.0.1', 21613)])
 
-  def send(self, destination, body, content_type=None, headers=None, **keyword_headers):
+  def send(self, destination, message, content_type=None, headers=None, **keyword_headers):
     self.correlation_id += 1
+
+    logger.info("Event to server at {0} (correlation_id={1}): {2}".format(destination, self.correlation_id, message))
+
+    body = json.dumps(message)
     ambari_stomp.Connection.send(self, destination, body, content_type=content_type, headers=headers, correlationId=self.correlation_id, **keyword_headers)
+    return self.correlation_id
 
   def add_listener(self, listener):
     self.set_listener(listener.__class__.__name__, listener)

http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index 9d59222..1f3a6e7 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -21,6 +21,7 @@ import os
 import sys
 import logging
 import json
+import time
 from coilmq.util import frames
 from coilmq.util.frames import Frame
 
@@ -38,9 +39,8 @@ class TestAgentStompResponses(BaseStompServerTestCase):
   @patch.object(CustomServiceOrchestrator, "runCommand")
   def test_mock_server_can_start(self, runCommand_mock):
     runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 'structuredOut' : '{}', 'exitcode':1}
-    self.init_stdout_logger()
 
-    self.remove(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json'])
+    self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json'])
 
     if not os.path.exists("/tmp/ambari-agent"):
       os.mkdir("/tmp/ambari-agent")
@@ -52,48 +52,55 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     action_queue = initializer_module.action_queue
     action_queue.start()
 
-    connect_frame = self.server.frames_queue.get()
-    users_subscribe_frame = self.server.frames_queue.get()
-    commands_subscribe_frame = self.server.frames_queue.get()
-    configurations_subscribe_frame = self.server.frames_queue.get()
-    metadata_subscribe_frame = self.server.frames_queue.get()
-    topologies_subscribe_frame = self.server.frames_queue.get()
-    registration_frame = self.server.frames_queue.get()
-
     component_status_executor = ComponentStatusExecutor(initializer_module)
     component_status_executor.start()
 
     command_status_reporter = CommandStatusReporter(initializer_module)
     command_status_reporter.start()
 
-    status_reports_frame = self.server.frames_queue.get()
+    connect_frame = self.server.frames_queue.get()
+    users_subscribe_frame = self.server.frames_queue.get()
+    registration_frame = self.server.frames_queue.get()
 
     # server sends registration response
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '0'}, body=self.get_json("registration_response.json"))
     self.server.topic_manager.send(f)
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/configs'}, body=self.get_json("configurations_update.json"))
+
+    # response to /initial_topology
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '1'}, body=self.get_json("topology_update.json"))
     self.server.topic_manager.send(f)
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/commands'}, body=self.get_json("execution_commands.json"))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body=self.get_json("metadata_after_registration.json"))
     self.server.topic_manager.send(f)
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/metadata'}, body=self.get_json("metadata_after_registration.json"))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body=self.get_json("configurations_update.json"))
     self.server.topic_manager.send(f)
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/topologies'}, body=self.get_json("topology_update.json"))
+    initial_topology_request = self.server.frames_queue.get()
+    initial_metadata_request = self.server.frames_queue.get()
+    initial_configs_request = self.server.frames_queue.get()
+
+    while not initializer_module.is_registered:
+      time.sleep(0.1)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/commands'}, body=self.get_json("execution_commands.json"))
     self.server.topic_manager.send(f)
 
+    commands_subscribe_frame = self.server.frames_queue.get()
+    configurations_subscribe_frame = self.server.frames_queue.get()
+    metadata_subscribe_frame = self.server.frames_queue.get()
+    topologies_subscribe_frame = self.server.frames_queue.get()
     heartbeat_frame = self.server.frames_queue.get()
-    dn_status_in_progress_frame = json.loads(self.server.frames_queue.get().body)
-    dn_status_failed_frame = json.loads(self.server.frames_queue.get().body)
-    zk_status_in_progress_frame = json.loads(self.server.frames_queue.get().body)
-    zk_status_failed_frame = json.loads(self.server.frames_queue.get().body)
+    dn_start_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+    dn_start_failed_frame = json.loads(self.server.frames_queue.get().body)
+    zk_start_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+    zk_start_failed_frame = json.loads(self.server.frames_queue.get().body)
     action_status_in_progress_frame = json.loads(self.server.frames_queue.get().body)
     action_status_failed_frame = json.loads(self.server.frames_queue.get().body)
     initializer_module.stop_event.set()
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body=json.dumps({'heartbeat-response':'true'}))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'heartbeat-response':'true'}))
     self.server.topic_manager.send(f)
 
     heartbeat_thread.join()
@@ -104,10 +111,10 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostname'], 'c6401.ambari.apache.org')
     self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'], ('STATUS',))
     self.assertEquals(initializer_module.configurations_cache['0']['configurations']['zoo.cfg']['clientPort'], '2181')
-    self.assertEquals(dn_status_in_progress_frame[0]['roleCommand'], 'START')
-    self.assertEquals(dn_status_in_progress_frame[0]['role'], 'DATANODE')
-    self.assertEquals(dn_status_in_progress_frame[0]['status'], 'IN_PROGRESS')
-    self.assertEquals(dn_status_failed_frame[0]['status'], 'FAILED')
+    self.assertEquals(dn_start_in_progress_frame[0]['roleCommand'], 'START')
+    self.assertEquals(dn_start_in_progress_frame[0]['role'], 'DATANODE')
+    self.assertEquals(dn_start_in_progress_frame[0]['status'], 'IN_PROGRESS')
+    self.assertEquals(dn_start_failed_frame[0]['status'], 'FAILED')
 
     """
     ============================================================================================
@@ -123,62 +130,42 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     action_queue = initializer_module.action_queue
     action_queue.start()
 
-    connect_frame = self.server.frames_queue.get()
-    users_subscribe_frame = self.server.frames_queue.get()
-    commands_subscribe_frame = self.server.frames_queue.get()
-    configurations_subscribe_frame = self.server.frames_queue.get()
-    metadata_subscribe_frame = self.server.frames_queue.get()
-    topologies_subscribe_frame = self.server.frames_queue.get()
-    registration_frame_json = json.loads(self.server.frames_queue.get().body)
-    clusters_hashes = registration_frame_json['clusters']['0']
-
     component_status_executor = ComponentStatusExecutor(initializer_module)
     component_status_executor.start()
 
     command_status_reporter = CommandStatusReporter(initializer_module)
     command_status_reporter.start()
 
-    status_reports_frame = self.server.frames_queue.get()
-
-    self.assertEquals(clusters_hashes['metadata_hash'], '21724f6ffa7aff0fe91a0c0c5b765dba')
-    self.assertEquals(clusters_hashes['configurations_hash'], '04c968412ded7c8ffe7858036bae03ce')
-    self.assertEquals(clusters_hashes['topology_hash'], '0de1df56fd594873fe594cf02ea61f4b')
+    connect_frame = self.server.frames_queue.get()
+    users_subscribe_frame = self.server.frames_queue.get()
+    registration_frame = self.server.frames_queue.get()
 
     # server sends registration response
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '0'}, body=self.get_json("registration_response.json"))
     self.server.topic_manager.send(f)
 
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '1'}, body='{}')
+    self.server.topic_manager.send(f)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body='{}')
+    self.server.topic_manager.send(f)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}')
+    self.server.topic_manager.send(f)
+
+    commands_subscribe_frame = self.server.frames_queue.get()
+    configurations_subscribe_frame = self.server.frames_queue.get()
+    metadata_subscribe_frame = self.server.frames_queue.get()
+    topologies_subscribe_frame = self.server.frames_queue.get()
     heartbeat_frame = self.server.frames_queue.get()
+    status_reports_frame = self.server.frames_queue.get()
+
     initializer_module.stop_event.set()
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body=json.dumps({'heartbeat-response':'true'}))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'heartbeat-response':'true'}))
     self.server.topic_manager.send(f)
 
     heartbeat_thread.join()
     component_status_executor.join()
     command_status_reporter.join()
-    action_queue.join()
-
-  def remove(self, filepathes):
-    for filepath in filepathes:
-      if os.path.isfile(filepath):
-        os.remove(filepath)
-
-  def init_stdout_logger(self):
-    format='%(levelname)s %(asctime)s - %(message)s'
-
-    logger = logging.getLogger()
-    logger.setLevel(logging.INFO)
-    formatter = logging.Formatter(format)
-    chout = logging.StreamHandler(sys.stdout)
-    chout.setLevel(logging.INFO)
-    chout.setFormatter(formatter)
-    cherr = logging.StreamHandler(sys.stderr)
-    cherr.setLevel(logging.ERROR)
-    cherr.setFormatter(formatter)
-    logger.handlers = []
-    logger.addHandler(cherr)
-    logger.addHandler(chout)
-
-    logging.getLogger('stomp.py').setLevel(logging.WARN)
-    logging.getLogger('coilmq').setLevel(logging.INFO)
\ No newline at end of file
+    action_queue.join()
\ No newline at end of file